Support Questions

Find answers, ask questions, and share your expertise

SSE Client in Apache NiFi

avatar
Frequent Visitor

I was looking for a server sent event (SSE) client in Apache NiFi, however, I couldn't find any ready processor that can do that. 

 

I started implementing an SSE client using Python script and used ExecuteCommand processor to run this script. However, the script needs to be terminated in order to send the processor output to the next step through STDOUT. (ie. I can't use an infinite Loop "While True:" to listen to the SSE server and output the consumed events on stream).

 

Is there any ideas to implement the SSE client in NiFi such that consumed events are processed one by one to the next processors in real-time?

7 REPLIES 7

avatar
Cloudera Employee

you can technically run an infinite loop with python and just produce a print statement.  this will send out data.  

avatar

@mmaher22 You may want to run the python job inside of ExecuteScript.   In this manner, you can send output to a flowfile during your loops iterations with:

 

session.commit()

This command is inferred at the end of the code execution in ExecuteScript to send output to next processor (1 flow file).  So if you just put that in line with your loop, then the script will run, and send flowfiles for every instance.

 

For a full rundown of how to use ExecuteScript be sure to see these great articles:

 

https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html

avatar
Expert Contributor

Hi @mmaher22 

I spun my wheels on this for quite a while with no success; I can get the authorization token, but that's it.  Do you have an example of using a script in the ExecuteScript (or ExecuteGroovyScript) that can make an HTTP request for a token and then use that token to start an SSE stream?  I'd really appreciate whatever you are willing to share.  Many thanks!

 

Here is what I've come up with so far, but I can't get the SSE responses to output to flowfiles. 

@Grab(group='org.apache.httpcomponents', module='httpclient', version='4.5.13')
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.http.client.methods.HttpGet
import org.apache.http.HttpEntity
import org.apache.http.util.EntityUtils
import java.util.Base64

// Function to retrieve the access token
def retrieveAccessToken() {
	def tokenUrl = new URL("http://kc.example.com/realms/aqua-services/protocol/openid-connect/token")
	def clientId = "aqua-forma"
	def clientSecret = "ls4kdjfOWIE5TRU6s2lkjfL3ASK9"
	def grantType = "client_credentials"
    
	def credentials = "${clientId}:${clientSecret}"
	def credentialsBase64 = Base64.getEncoder().encodeToString(credentials.getBytes("utf-8"))
	def authHeader = "Basic ${credentialsBase64}"
    
	def data = "grant_type=${grantType}"
    
	def connection = tokenUrl.openConnection() as HttpURLConnection
	connection.setRequestMethod("POST")
	connection.setRequestProperty("Authorization", authHeader)
	connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded")
	connection.doOutput = true
    
	def writer = new OutputStreamWriter(connection.getOutputStream())
	writer.write(data)
	writer.flush()
    
	def responseCode = connection.getResponseCode()
    
	if (responseCode == 200) {
        	def inputStream = connection.getInputStream()
        	def reader = new BufferedReader(new InputStreamReader(inputStream))
        	def response = new StringBuilder()
        	String line
    
		while ((line = reader.readLine()) != null) {
			response.append(line)
		}
    
		reader.close()
		def tokenData = new groovy.json.JsonSlurper().parseText(response.toString())
		return tokenData.access_token
	} else {
		return null
	}
}


// SSE Code
def accessToken = retrieveAccessToken()

def sseUrl = "http://example.com/api/v1/read/search/sse?query=SELECT%20%2A%20FROM%20Game_Species"

// Create an HTTP client
CloseableHttpClient httpClient = HttpClients.createDefault()

try {
    // Create an HTTP GET request
    HttpGet httpGet = new HttpGet(sseUrl)
    httpGet.setHeader("Authorization", "Bearer " + accessToken)

    def response = httpClient.execute(httpGet)
    def entity = response.getEntity()

    if (entity != null) {
        entity.content.eachLine { line ->
            if (line.startsWith("data:")) {
                def payload = line.substring(6).trim()
                
                def flowFile = session.create()
                flowFile = session.write(flowFile, { outputStream ->
                    outputStream.write(payload.getBytes("UTF-8"))
                } as OutputStreamCallback)
                
                session.transfer(flowFile, REL_SUCCESS)
            }
        }
    }
    
} finally {
    httpClient.close()
}

avatar
Expert Contributor

Just to add to this, I created a Java version of this code which I verified works from the command line; I get the SSE feed printing to the console.  However, when I attempt to use this same code in an ExecuteStreamCommand processor then I get the exact same behavior, which is that the processor is running but there isn't any data coming out of it.  I'm missing a detail that I hope someone can shed some light on.

avatar
Reader

OK, my solution is in 2.4 by Python_extensions

import threading
import queue
import time
import json
import logging
import requests
import sseclient
import select

from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators

from nifiapi.relationship import Relationship

logger = logging.getLogger(__name__)

class SSEStreamClient(FlowFileSource):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileSource']

    class ProcessorDetails:
        version = '2.4.0-KPCS'
        dependencies = ['sseclient-py', 'requests']
        description = '''A Python FlowFileSource that generates FlowFiles by consuming events from an SSE stream.
        It handles connection re-establishment and batches multiple SSE events into one FlowFile.
        Reads SSE in a separate thread continuously to avoid losing messages.'''
        tags = ['sse', 'stream', 'generator', 'source', 'json', 'KPCS']
        
    #REL_FAILURE = Relationship(name="failure", description="FlowFiles are routed to failure when processing fails")

    #def getRelationships(self):
    #    return [self.REL_FAILURE]

    PROP_SSE_URL = PropertyDescriptor(
        name="SSE URL",
        description="The URL of the Server-Sent Events (SSE) stream.",
        allowable_values=None,
        default_value="https://",
        validators=[StandardValidators.URL_VALIDATOR],
        required=True
    )
    PROP_AUTH_TOKEN = PropertyDescriptor(
        name="Authorization Token",
        description="Bearer token for API authorization, for example: Bearer 111111",
        allowable_values=None,
        default_value="",
        sensitive=True,
        required=False
    )
    PROP_CONNECT_TIMEOUT = PropertyDescriptor(
        name="Connection Timeout",
        description="Maximum time in seconds to wait for connection to be established. Use 0 for no timeout.",
        allowable_values=None,
        default_value="10",
        validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
        required=True
    )
    PROP_DISABLE_LOGGING = PropertyDescriptor(
        name="Disable Logging",
        description="If set to true, disables logging to the log file.",
        allowable_values=["true", "false"],
        default_value="true",
        required=False
    )

    def _should_log(self):
        if not self.context:
            return True
        disable_logging = self.context.getProperty(self.PROP_DISABLE_LOGGING).getValue()
        return disable_logging.lower() != "true"        


    def __init__(self, **kwargs):
        if 'jvm' in kwargs:
            del kwargs['jvm']

        super().__init__(**kwargs)

        self.sse_response = None
        self.sse_client = None
        self.event_iterator = None
        self.property_descriptors = [
            self.PROP_SSE_URL,
            self.PROP_AUTH_TOKEN,
            self.PROP_CONNECT_TIMEOUT,
            self.PROP_DISABLE_LOGGING
        ]

        self.queue = queue.Queue()
        self.read_thread = None
        self.stop_thread = threading.Event()
        self.context = None
        if self._should_log():
            logger.info("SSEStreamClient: Initialized.")

    def getPropertyDescriptors(self):
        return self.property_descriptors
    

    def _establish_sse_connection(self, context):
        sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
        auth_token = context.getProperty(self.PROP_AUTH_TOKEN).evaluateAttributeExpressions().getValue()
        connect_timeout = int(context.getProperty(self.PROP_CONNECT_TIMEOUT).getValue())

        headers = {'Accept': 'text/event-stream'}
        if auth_token:
            headers['Authorization'] = f'{auth_token}'
        if self._should_log():
            logger.info(f"SSEStreamClient: Connecting to SSE URL: {sse_url}")

        self._close_sse_connection()

        try:
            self.sse_response = requests.get(sse_url, stream=True, headers=headers, timeout=(connect_timeout, None))
            self.sse_response.raise_for_status()

            self.sse_client = sseclient.SSEClient(self.sse_response)
            self.event_iterator = iter(self.sse_client.events())
            if self._should_log():
                logger.info("SSEStreamClient: SSE connection established.")
            return True
        except requests.exceptions.RequestException as e:
            if self._should_log():
                logger.error(f"SSEStreamClient: Connection error: {e}", exc_info=True)
            self._close_sse_connection()
            return False
        except Exception as e:
            if self._should_log():
                logger.error(f"SSEStreamClient: Unexpected error during connection: {e}", exc_info=True)
            self._close_sse_connection()
            return False

    def _close_sse_connection(self):
        if self.sse_response:
            try:
                self.sse_response.close()
                if self._should_log():
                    logger.debug("SSEStreamClient: SSE response closed.")
            except Exception as e:
                if self._should_log():
                    logger.warning(f"SSEStreamClient: Error closing SSE response: {e}")
            finally:
                self.sse_response = None

        self.sse_client = None
        self.event_iterator = None
        if self._should_log():        
            logger.info("SSEStreamClient: Connection closed and cleaned up.")

    def _read_loop(self):
        if self._should_log():
            logger.info("SSEStreamClient: Read thread started.")
        while not self.stop_thread.is_set():
            try:
                event = next(self.event_iterator)
                if event and event.data:
                    try:
                        data = json.loads(event.data)
                    except json.JSONDecodeError:
                        data = {"raw": event.data}
                    self.queue.put(data)
            except StopIteration:
                if self._should_log():
                    logger.info("SSEStreamClient: SSE stream ended, reconnecting.")
                self._close_sse_connection()
                if not self._establish_sse_connection(self.context):
                    if self._should_log():
                        logger.error("SSEStreamClient: Failed to reconnect SSE stream.")
                    time.sleep(5)
            except Exception as e:
                if self._should_log():
                    logger.error(f"SSEStreamClient: Error reading SSE events: {e}", exc_info=True)
                time.sleep(1)
        if self._should_log():
            logger.info("SSEStreamClient: Read thread stopped.")

    def onScheduled(self, context):
        self.context = context
        if not self._establish_sse_connection(context):
            if self._should_log():
                logger.error("SSEStreamClient: Failed initial connection onScheduled.")
            return
        self.stop_thread.clear()
        self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
        self.read_thread.start()

    def create(self, context):
        sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
        messages = []
        while not self.queue.empty():
            try:
                messages.append(self.queue.get_nowait())
            except queue.Empty:
                break

        if messages:
            content_str = json.dumps(messages, ensure_ascii=False)
            content_bytes = content_str.encode('utf-8')
            attributes = {
                'mime.type': 'application/json',
                'batch.size': str(len(messages)),
                'sse.url': sse_url
            }
            if self._should_log():
                logger.info(f"SSEStreamClient: Emitting FlowFile with {len(messages)} events.")
            return FlowFileSourceResult(
                relationship='success',
                attributes=attributes,
                contents=content_bytes
            )
        else:
            return

    def onStopped(self, context):
        if self._should_log():
            logger.info("SSEStreamClient: onStopped called. Stopping read thread and closing connection.")
        self.stop_thread.set()
        if self.read_thread:
            self.read_thread.join(timeout=5)
        self._close_sse_connection()

    def onUnscheduled(self, context):
        if self._should_log():
            logger.info("SSEStreamClient: onUnscheduled called. Stopping read thread and closing connection.")
        self.stop_thread.set()
        if self.read_thread:
            self.read_thread.join(timeout=5)
        self._close_sse_connection()

In the directory /opt/nifi/nifi-current/python_extensions, create a subfolder (e.g. SSEStreamClient) containing the following three files:

  • SSEStreamClient.py — a file containing the code provided above.
  • requirements.txt — a file containing a single line: sseclient-py.
  • __init__.py — an empty file.

Then, restart NiFi.

(This setup was created and tested on Dockerized NiFi version 2.4.0 with pip installed.)

avatar
New Contributor

Hi @mabucz 

Any idea why I am getting a java.lang.ClassNotFoundException: SSEStreamClient even though the SSEStreamClient module is successfully loaded and I even see that "SSEStreamClient: Initialized" logged by the module. I am running this in the NiFi 2.4.0 Docker image.

2025-10-02 20:28:26,695 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled StandardStatelessGroupNode[group=StandardProcessGroup[identifier=269832be-6b12-38c0-26df-cf4c47ff2b84,name=event-forwarder]] to run with 1 threads
2025-10-02 20:28:29,148 INFO [Initialize SSEStreamClient] org.apache.nifi.py4j.PythonProcess Successfully created Python Virtual Environment ./work/python/extensions/SSEStreamClient/2.4.0-KPCS
2025-10-02 20:28:29,150 INFO [Initialize SSEStreamClient] org.apache.nifi.py4j.PythonProcess Launching Python Process /opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS/bin/python3 /opt/nifi/nifi-current/./python/framework/Controller.py with working directory ./work/python/extensions/SSEStreamClient/2.4.0-KPCS to communicate with Java on Port 35515
2025-10-02 20:28:29,314 INFO [python-log-514] org.apache.nifi.py4j.Controller Listening for requests from Java side using Python Port 37553, communicating with Java on port 35515
2025-10-02 20:28:29,395 INFO [Initialize SSEStreamClient] org.apache.nifi.py4j.PythonProcess Successfully started and pinged Python Server. Python Process = Process[pid=514, exitValue="not exited"]
2025-10-02 20:28:29,541 INFO [python-log-514] python.ExtensionManager Installing dependencies ['-r', '/opt/nifi/nifi-current/python_extensions/SSEStreamProcessor/requirements.txt', 'sseclient-py', 'requests'] for SSEStreamClient to /opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS using command ['/opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS/bin/python3', '-m', 'pip', 'install', '--no-cache-dir', '--target', '/opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS', '-r', '/opt/nifi/nifi-current/python_extensions/SSEStreamProcessor/requirements.txt', 'sseclient-py', 'requests']
2025-10-02 20:28:29,934 INFO [Checkpoint FlowFile Repository] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2025-10-02 20:28:29,935 INFO [Checkpoint FlowFile Repository] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 0 milliseconds
2025-10-02 20:28:30,483 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting sseclient-py
2025-10-02 20:28:30,728 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading sseclient_py-1.8.0-py2.py3-none-any.whl (8.8 kB)
2025-10-02 20:28:30,843 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting requests
2025-10-02 20:28:30,869 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading requests-2.32.5-py3-none-any.whl (64 kB)
2025-10-02 20:28:30,915 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ???????????????????????????????????????? 64.7/64.7 kB 1.7 MB/s eta 0:00:00
2025-10-02 20:28:31,208 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting charset_normalizer<4,>=2
2025-10-02 20:28:31,234 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading charset_normalizer-3.4.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (150 kB)
2025-10-02 20:28:31,270 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ??????????????????????????????????????? 150.3/150.3 kB 4.4 MB/s eta 0:00:00
2025-10-02 20:28:31,312 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting idna<4,>=2.5
2025-10-02 20:28:31,336 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading idna-3.10-py3-none-any.whl (70 kB)
2025-10-02 20:28:31,341 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ???????????????????????????????????????? 70.4/70.4 kB 18.7 MB/s eta 0:00:00
2025-10-02 20:28:31,415 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting urllib3<3,>=1.21.1
2025-10-02 20:28:31,456 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading urllib3-2.5.0-py3-none-any.whl (129 kB)
2025-10-02 20:28:31,460 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ????????????????????????????????????? 129.8/129.8 kB 250.4 MB/s eta 0:00:00
2025-10-02 20:28:31,537 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting certifi>=2017.4.17
2025-10-02 20:28:31,571 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading certifi-2025.8.3-py3-none-any.whl (161 kB)
2025-10-02 20:28:31,579 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ?????????????????????????????????????? 161.2/161.2 kB 93.2 MB/s eta 0:00:00
2025-10-02 20:28:31,684 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Installing collected packages: sseclient-py, urllib3, idna, charset_normalizer, certifi, requests
2025-10-02 20:28:32,219 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Successfully installed certifi-2025.8.3 charset_normalizer-3.4.3 idna-3.10 requests-2.32.5 sseclient-py-1.8.0 urllib3-2.5.0
2025-10-02 20:28:32,222 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog WARNING: Target directory /opt/nifi/nifi-current/work/python/extensions/SSEStreamClient/2.4.0-KPCS/bin already exists. Specify --upgrade to force replacement.
2025-10-02 20:28:32,339 INFO [python-log-514] python.ExtensionManager Successfully installed requirements for SSEStreamClient to /opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS
2025-10-02 20:28:32,339 INFO [Initialize Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)] o.a.n.py4j.StandardPythonProcessorBridge Successfully downloaded dependencies for Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)
2025-10-02 20:28:32,433 INFO [python-log-514] python.ExtensionManager Loaded module SSEStreamClient
2025-10-02 20:28:32,436 INFO [python-log-514] SSEStreamClient SSEStreamClient: Initialized.
2025-10-02 20:28:32,457 INFO [Initialize Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)] o.a.n.py4j.StandardPythonProcessorBridge Successfully loaded Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)

------------------------------

2025-10-02 20:28:36,428 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.stateless.engine.ComponentBuilder Created Processor of type org.apache.nifi.processors.standard.LogAttribute with identifier fe0cfbf3-c27f-36ae-a4ac-3a9c1b016fab
2025-10-02 20:28:36,429 INFO [Monitor Processor Lifecycle Thread-1] o.a.nifi.groups.StandardProcessGroup LogAttribute[id=fe0cfbf3-c27f-36ae-a4ac-3a9c1b016fab] added to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,430 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.f.s.StandardVersionedComponentSynchronizer Added LogAttribute[id=fe0cfbf3-c27f-36ae-a4ac-3a9c1b016fab] to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,430 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.stateless.engine.ComponentBuilder Created Processor of type org.apache.nifi.processors.standard.EvaluateJsonPath with identifier 1c3a7a3c-70d9-3165-c78a-4d4ea1353920
2025-10-02 20:28:36,431 INFO [Monitor Processor Lifecycle Thread-1] o.a.nifi.groups.StandardProcessGroup EvaluateJsonPath[id=1c3a7a3c-70d9-3165-c78a-4d4ea1353920] added to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,431 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.f.s.StandardVersionedComponentSynchronizer Added EvaluateJsonPath[id=1c3a7a3c-70d9-3165-c78a-4d4ea1353920] to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,432 ERROR [Monitor Processor Lifecycle Thread-1] o.a.n.groups.StandardStatelessGroupNode StandardStatelessGroupNode[group=StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]] Failed to start StandardStatelessGroupNode[group=StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]]; will try again in 10 seconds
java.lang.IllegalStateException: Could not create Processor of type SSEStreamClient
at org.apache.nifi.stateless.engine.StatelessFlowManager.createProcessor(StatelessFlowManager.java:197)
at org.apache.nifi.controller.flow.AbstractFlowManager.createProcessor(AbstractFlowManager.java:428)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.addProcessor(StandardVersionedComponentSynchronizer.java:2654)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.synchronizeProcessors(StandardVersionedComponentSynchronizer.java:1197)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.synchronize(StandardVersionedComponentSynchronizer.java:601)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.lambda$synchronize$10(StandardVersionedComponentSynchronizer.java:393)
at org.apache.nifi.controller.flow.AbstractFlowManager.withParameterContextResolution(AbstractFlowManager.java:668)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.synchronize(StandardVersionedComponentSynchronizer.java:388)
at org.apache.nifi.groups.StandardProcessGroup.synchronizeFlow(StandardProcessGroup.java:3881)
at org.apache.nifi.controller.flow.StandardStatelessGroupNodeFactory.createStatelessProcessGroup(StandardStatelessGroupNodeFactory.java:296)
at org.apache.nifi.controller.flow.StandardStatelessGroupNodeFactory$1.createStatelessProcessGroup(StandardStatelessGroupNodeFactory.java:153)
at org.apache.nifi.groups.StandardStatelessGroupNode.createStatelessFlow(StandardStatelessGroupNode.java:345)
at org.apache.nifi.groups.StandardStatelessGroupNode.initialize(StandardStatelessGroupNode.java:218)
at org.apache.nifi.groups.StandardStatelessGroupNode.lambda$initialize$1(StandardStatelessGroupNode.java:246)
at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.nifi.controller.exception.ProcessorInstantiationException: SSEStreamClient
at org.apache.nifi.stateless.engine.ComponentBuilder.createLoggableProcessor(ComponentBuilder.java:319)
at org.apache.nifi.stateless.engine.ComponentBuilder.buildProcessor(ComponentBuilder.java:131)
at org.apache.nifi.stateless.engine.StatelessFlowManager.createProcessor(StatelessFlowManager.java:173)
... 20 common frames omitted
Caused by: java.lang.ClassNotFoundException: SSEStreamClient

avatar
New Contributor

Hi @mabucz 

Any idea why I am getting a java.lang.ClassNotFoundException: SSEStreamClient even though the SSEStreamClient module is successfully loaded and it even logs that line SSEStreamClient: Initialized. I am running that in a NiFi 2.4.0 docker container.

2025-10-02 20:28:26,695 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled StandardStatelessGroupNode[group=StandardProcessGroup[identifier=269832be-6b12-38c0-26df-cf4c47ff2b84,name=event-forwarder]] to run with 1 threads
2025-10-02 20:28:29,148 INFO [Initialize SSEStreamClient] org.apache.nifi.py4j.PythonProcess Successfully created Python Virtual Environment ./work/python/extensions/SSEStreamClient/2.4.0-KPCS
2025-10-02 20:28:29,150 INFO [Initialize SSEStreamClient] org.apache.nifi.py4j.PythonProcess Launching Python Process /opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS/bin/python3 /opt/nifi/nifi-current/./python/framework/Controller.py with working directory ./work/python/extensions/SSEStreamClient/2.4.0-KPCS to communicate with Java on Port 35515
2025-10-02 20:28:29,314 INFO [python-log-514] org.apache.nifi.py4j.Controller Listening for requests from Java side using Python Port 37553, communicating with Java on port 35515
2025-10-02 20:28:29,395 INFO [Initialize SSEStreamClient] org.apache.nifi.py4j.PythonProcess Successfully started and pinged Python Server. Python Process = Process[pid=514, exitValue="not exited"]
2025-10-02 20:28:29,541 INFO [python-log-514] python.ExtensionManager Installing dependencies ['-r', '/opt/nifi/nifi-current/python_extensions/SSEStreamProcessor/requirements.txt', 'sseclient-py', 'requests'] for SSEStreamClient to /opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS using command ['/opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS/bin/python3', '-m', 'pip', 'install', '--no-cache-dir', '--target', '/opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS', '-r', '/opt/nifi/nifi-current/python_extensions/SSEStreamProcessor/requirements.txt', 'sseclient-py', 'requests']
2025-10-02 20:28:29,934 INFO [Checkpoint FlowFile Repository] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2025-10-02 20:28:29,935 INFO [Checkpoint FlowFile Repository] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 0 milliseconds
2025-10-02 20:28:30,483 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting sseclient-py
2025-10-02 20:28:30,728 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading sseclient_py-1.8.0-py2.py3-none-any.whl (8.8 kB)
2025-10-02 20:28:30,843 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting requests
2025-10-02 20:28:30,869 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading requests-2.32.5-py3-none-any.whl (64 kB)
2025-10-02 20:28:30,915 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ???????????????????????????????????????? 64.7/64.7 kB 1.7 MB/s eta 0:00:00
2025-10-02 20:28:31,208 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting charset_normalizer<4,>=2
2025-10-02 20:28:31,234 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading charset_normalizer-3.4.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (150 kB)
2025-10-02 20:28:31,270 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ??????????????????????????????????????? 150.3/150.3 kB 4.4 MB/s eta 0:00:00
2025-10-02 20:28:31,312 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting idna<4,>=2.5
2025-10-02 20:28:31,336 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading idna-3.10-py3-none-any.whl (70 kB)
2025-10-02 20:28:31,341 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ???????????????????????????????????????? 70.4/70.4 kB 18.7 MB/s eta 0:00:00
2025-10-02 20:28:31,415 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting urllib3<3,>=1.21.1
2025-10-02 20:28:31,456 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading urllib3-2.5.0-py3-none-any.whl (129 kB)
2025-10-02 20:28:31,460 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ????????????????????????????????????? 129.8/129.8 kB 250.4 MB/s eta 0:00:00
2025-10-02 20:28:31,537 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Collecting certifi>=2017.4.17
2025-10-02 20:28:31,571 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Downloading certifi-2025.8.3-py3-none-any.whl (161 kB)
2025-10-02 20:28:31,579 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog ?????????????????????????????????????? 161.2/161.2 kB 93.2 MB/s eta 0:00:00
2025-10-02 20:28:31,684 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Installing collected packages: sseclient-py, urllib3, idna, charset_normalizer, certifi, requests
2025-10-02 20:28:32,219 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog Successfully installed certifi-2025.8.3 charset_normalizer-3.4.3 idna-3.10 requests-2.32.5 sseclient-py-1.8.0 urllib3-2.5.0
2025-10-02 20:28:32,222 INFO [python-log-514] org.apache.nifi.py4j.ProcessLog WARNING: Target directory /opt/nifi/nifi-current/work/python/extensions/SSEStreamClient/2.4.0-KPCS/bin already exists. Specify --upgrade to force replacement.
2025-10-02 20:28:32,339 INFO [python-log-514] python.ExtensionManager Successfully installed requirements for SSEStreamClient to /opt/nifi/nifi-current/./work/python/extensions/SSEStreamClient/2.4.0-KPCS
2025-10-02 20:28:32,339 INFO [Initialize Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)] o.a.n.py4j.StandardPythonProcessorBridge Successfully downloaded dependencies for Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)
2025-10-02 20:28:32,433 INFO [python-log-514] python.ExtensionManager Loaded module SSEStreamClient
2025-10-02 20:28:32,436 INFO [python-log-514] SSEStreamClient SSEStreamClient: Initialized.
2025-10-02 20:28:32,457 INFO [Initialize Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)] o.a.n.py4j.StandardPythonProcessorBridge Successfully loaded Python Processor c7d879e8-97fa-3ad6-e00b-0582b78adde7 (SSEStreamClient)

-------------------------

2025-10-02 20:28:36,428 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.stateless.engine.ComponentBuilder Created Processor of type org.apache.nifi.processors.standard.LogAttribute with identifier fe0cfbf3-c27f-36ae-a4ac-3a9c1b016fab
2025-10-02 20:28:36,429 INFO [Monitor Processor Lifecycle Thread-1] o.a.nifi.groups.StandardProcessGroup LogAttribute[id=fe0cfbf3-c27f-36ae-a4ac-3a9c1b016fab] added to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,430 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.f.s.StandardVersionedComponentSynchronizer Added LogAttribute[id=fe0cfbf3-c27f-36ae-a4ac-3a9c1b016fab] to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,430 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.stateless.engine.ComponentBuilder Created Processor of type org.apache.nifi.processors.standard.EvaluateJsonPath with identifier 1c3a7a3c-70d9-3165-c78a-4d4ea1353920
2025-10-02 20:28:36,431 INFO [Monitor Processor Lifecycle Thread-1] o.a.nifi.groups.StandardProcessGroup EvaluateJsonPath[id=1c3a7a3c-70d9-3165-c78a-4d4ea1353920] added to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,431 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.f.s.StandardVersionedComponentSynchronizer Added EvaluateJsonPath[id=1c3a7a3c-70d9-3165-c78a-4d4ea1353920] to StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]
2025-10-02 20:28:36,432 ERROR [Monitor Processor Lifecycle Thread-1] o.a.n.groups.StandardStatelessGroupNode StandardStatelessGroupNode[group=StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]] Failed to start StandardStatelessGroupNode[group=StandardProcessGroup[identifier=af74d24b-16e6-3044-b8a1-41f31c182d6c,name=app]]; will try again in 10 seconds
java.lang.IllegalStateException: Could not create Processor of type SSEStreamClient
at org.apache.nifi.stateless.engine.StatelessFlowManager.createProcessor(StatelessFlowManager.java:197)
at org.apache.nifi.controller.flow.AbstractFlowManager.createProcessor(AbstractFlowManager.java:428)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.addProcessor(StandardVersionedComponentSynchronizer.java:2654)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.synchronizeProcessors(StandardVersionedComponentSynchronizer.java:1197)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.synchronize(StandardVersionedComponentSynchronizer.java:601)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.lambda$synchronize$10(StandardVersionedComponentSynchronizer.java:393)
at org.apache.nifi.controller.flow.AbstractFlowManager.withParameterContextResolution(AbstractFlowManager.java:668)
at org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.synchronize(StandardVersionedComponentSynchronizer.java:388)
at org.apache.nifi.groups.StandardProcessGroup.synchronizeFlow(StandardProcessGroup.java:3881)
at org.apache.nifi.controller.flow.StandardStatelessGroupNodeFactory.createStatelessProcessGroup(StandardStatelessGroupNodeFactory.java:296)
at org.apache.nifi.controller.flow.StandardStatelessGroupNodeFactory$1.createStatelessProcessGroup(StandardStatelessGroupNodeFactory.java:153)
at org.apache.nifi.groups.StandardStatelessGroupNode.createStatelessFlow(StandardStatelessGroupNode.java:345)
at org.apache.nifi.groups.StandardStatelessGroupNode.initialize(StandardStatelessGroupNode.java:218)
at org.apache.nifi.groups.StandardStatelessGroupNode.lambda$initialize$1(StandardStatelessGroupNode.java:246)
at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.nifi.controller.exception.ProcessorInstantiationException: SSEStreamClient
at org.apache.nifi.stateless.engine.ComponentBuilder.createLoggableProcessor(ComponentBuilder.java:319)
at org.apache.nifi.stateless.engine.ComponentBuilder.buildProcessor(ComponentBuilder.java:131)
at org.apache.nifi.stateless.engine.StatelessFlowManager.createProcessor(StatelessFlowManager.java:173)
... 20 common frames omitted
Caused by: java.lang.ClassNotFoundException: SSEStreamClient