Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

SSE Client in Apache NiFi

avatar
New Contributor

I was looking for a server sent event (SSE) client in Apache NiFi, however, I couldn't find any ready processor that can do that. 

 

I started implementing an SSE client using Python script and used ExecuteCommand processor to run this script. However, the script needs to be terminated in order to send the processor output to the next step through STDOUT. (ie. I can't use an infinite Loop "While True:" to listen to the SSE server and output the consumed events on stream).

 

Is there any ideas to implement the SSE client in NiFi such that consumed events are processed one by one to the next processors in real-time?

4 REPLIES 4

avatar
Cloudera Employee

you can technically run an infinite loop with python and just produce a print statement.  this will send out data.  

avatar

@mmaher22 You may want to run the python job inside of ExecuteScript.   In this manner, you can send output to a flowfile during your loops iterations with:

 

session.commit()

This command is inferred at the end of the code execution in ExecuteScript to send output to next processor (1 flow file).  So if you just put that in line with your loop, then the script will run, and send flowfiles for every instance.

 

For a full rundown of how to use ExecuteScript be sure to see these great articles:

 

https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html

avatar
Expert Contributor

Hi @mmaher22 

I spun my wheels on this for quite a while with no success; I can get the authorization token, but that's it.  Do you have an example of using a script in the ExecuteScript (or ExecuteGroovyScript) that can make an HTTP request for a token and then use that token to start an SSE stream?  I'd really appreciate whatever you are willing to share.  Many thanks!

 

Here is what I've come up with so far, but I can't get the SSE responses to output to flowfiles. 

@Grab(group='org.apache.httpcomponents', module='httpclient', version='4.5.13')
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.http.client.methods.HttpGet
import org.apache.http.HttpEntity
import org.apache.http.util.EntityUtils
import java.util.Base64

// Function to retrieve the access token
def retrieveAccessToken() {
	def tokenUrl = new URL("http://kc.example.com/realms/aqua-services/protocol/openid-connect/token")
	def clientId = "aqua-forma"
	def clientSecret = "ls4kdjfOWIE5TRU6s2lkjfL3ASK9"
	def grantType = "client_credentials"
    
	def credentials = "${clientId}:${clientSecret}"
	def credentialsBase64 = Base64.getEncoder().encodeToString(credentials.getBytes("utf-8"))
	def authHeader = "Basic ${credentialsBase64}"
    
	def data = "grant_type=${grantType}"
    
	def connection = tokenUrl.openConnection() as HttpURLConnection
	connection.setRequestMethod("POST")
	connection.setRequestProperty("Authorization", authHeader)
	connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded")
	connection.doOutput = true
    
	def writer = new OutputStreamWriter(connection.getOutputStream())
	writer.write(data)
	writer.flush()
    
	def responseCode = connection.getResponseCode()
    
	if (responseCode == 200) {
        	def inputStream = connection.getInputStream()
        	def reader = new BufferedReader(new InputStreamReader(inputStream))
        	def response = new StringBuilder()
        	String line
    
		while ((line = reader.readLine()) != null) {
			response.append(line)
		}
    
		reader.close()
		def tokenData = new groovy.json.JsonSlurper().parseText(response.toString())
		return tokenData.access_token
	} else {
		return null
	}
}


// SSE Code
def accessToken = retrieveAccessToken()

def sseUrl = "http://example.com/api/v1/read/search/sse?query=SELECT%20%2A%20FROM%20Game_Species"

// Create an HTTP client
CloseableHttpClient httpClient = HttpClients.createDefault()

try {
    // Create an HTTP GET request
    HttpGet httpGet = new HttpGet(sseUrl)
    httpGet.setHeader("Authorization", "Bearer " + accessToken)

    def response = httpClient.execute(httpGet)
    def entity = response.getEntity()

    if (entity != null) {
        entity.content.eachLine { line ->
            if (line.startsWith("data:")) {
                def payload = line.substring(6).trim()
                
                def flowFile = session.create()
                flowFile = session.write(flowFile, { outputStream ->
                    outputStream.write(payload.getBytes("UTF-8"))
                } as OutputStreamCallback)
                
                session.transfer(flowFile, REL_SUCCESS)
            }
        }
    }
    
} finally {
    httpClient.close()
}

avatar
Expert Contributor

Just to add to this, I created a Java version of this code which I verified works from the command line; I get the SSE feed printing to the console.  However, when I attempt to use this same code in an ExecuteStreamCommand processor then I get the exact same behavior, which is that the processor is running but there isn't any data coming out of it.  I'm missing a detail that I hope someone can shed some light on.