Member since
10-02-2024
1
Post
1
Kudos Received
0
Solutions
06-07-2025
04:42 PM
1 Kudo
OK, my solution is in 2.4 by Python_extensions import threading
import queue
import time
import json
import logging
import requests
import sseclient
import select
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators
from nifiapi.relationship import Relationship
logger = logging.getLogger(__name__)
class SSEStreamClient(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '2.4.0-KPCS'
dependencies = ['sseclient-py', 'requests']
description = '''A Python FlowFileSource that generates FlowFiles by consuming events from an SSE stream.
It handles connection re-establishment and batches multiple SSE events into one FlowFile.
Reads SSE in a separate thread continuously to avoid losing messages.'''
tags = ['sse', 'stream', 'generator', 'source', 'json', 'KPCS']
#REL_FAILURE = Relationship(name="failure", description="FlowFiles are routed to failure when processing fails")
#def getRelationships(self):
# return [self.REL_FAILURE]
PROP_SSE_URL = PropertyDescriptor(
name="SSE URL",
description="The URL of the Server-Sent Events (SSE) stream.",
allowable_values=None,
default_value="https://",
validators=[StandardValidators.URL_VALIDATOR],
required=True
)
PROP_AUTH_TOKEN = PropertyDescriptor(
name="Authorization Token",
description="Bearer token for API authorization, for example: Bearer 111111",
allowable_values=None,
default_value="",
sensitive=True,
required=False
)
PROP_CONNECT_TIMEOUT = PropertyDescriptor(
name="Connection Timeout",
description="Maximum time in seconds to wait for connection to be established. Use 0 for no timeout.",
allowable_values=None,
default_value="10",
validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
required=True
)
PROP_DISABLE_LOGGING = PropertyDescriptor(
name="Disable Logging",
description="If set to true, disables logging to the log file.",
allowable_values=["true", "false"],
default_value="true",
required=False
)
def _should_log(self):
if not self.context:
return True
disable_logging = self.context.getProperty(self.PROP_DISABLE_LOGGING).getValue()
return disable_logging.lower() != "true"
def __init__(self, **kwargs):
if 'jvm' in kwargs:
del kwargs['jvm']
super().__init__(**kwargs)
self.sse_response = None
self.sse_client = None
self.event_iterator = None
self.property_descriptors = [
self.PROP_SSE_URL,
self.PROP_AUTH_TOKEN,
self.PROP_CONNECT_TIMEOUT,
self.PROP_DISABLE_LOGGING
]
self.queue = queue.Queue()
self.read_thread = None
self.stop_thread = threading.Event()
self.context = None
if self._should_log():
logger.info("SSEStreamClient: Initialized.")
def getPropertyDescriptors(self):
return self.property_descriptors
def _establish_sse_connection(self, context):
sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
auth_token = context.getProperty(self.PROP_AUTH_TOKEN).evaluateAttributeExpressions().getValue()
connect_timeout = int(context.getProperty(self.PROP_CONNECT_TIMEOUT).getValue())
headers = {'Accept': 'text/event-stream'}
if auth_token:
headers['Authorization'] = f'{auth_token}'
if self._should_log():
logger.info(f"SSEStreamClient: Connecting to SSE URL: {sse_url}")
self._close_sse_connection()
try:
self.sse_response = requests.get(sse_url, stream=True, headers=headers, timeout=(connect_timeout, None))
self.sse_response.raise_for_status()
self.sse_client = sseclient.SSEClient(self.sse_response)
self.event_iterator = iter(self.sse_client.events())
if self._should_log():
logger.info("SSEStreamClient: SSE connection established.")
return True
except requests.exceptions.RequestException as e:
if self._should_log():
logger.error(f"SSEStreamClient: Connection error: {e}", exc_info=True)
self._close_sse_connection()
return False
except Exception as e:
if self._should_log():
logger.error(f"SSEStreamClient: Unexpected error during connection: {e}", exc_info=True)
self._close_sse_connection()
return False
def _close_sse_connection(self):
if self.sse_response:
try:
self.sse_response.close()
if self._should_log():
logger.debug("SSEStreamClient: SSE response closed.")
except Exception as e:
if self._should_log():
logger.warning(f"SSEStreamClient: Error closing SSE response: {e}")
finally:
self.sse_response = None
self.sse_client = None
self.event_iterator = None
if self._should_log():
logger.info("SSEStreamClient: Connection closed and cleaned up.")
def _read_loop(self):
if self._should_log():
logger.info("SSEStreamClient: Read thread started.")
while not self.stop_thread.is_set():
try:
event = next(self.event_iterator)
if event and event.data:
try:
data = json.loads(event.data)
except json.JSONDecodeError:
data = {"raw": event.data}
self.queue.put(data)
except StopIteration:
if self._should_log():
logger.info("SSEStreamClient: SSE stream ended, reconnecting.")
self._close_sse_connection()
if not self._establish_sse_connection(self.context):
if self._should_log():
logger.error("SSEStreamClient: Failed to reconnect SSE stream.")
time.sleep(5)
except Exception as e:
if self._should_log():
logger.error(f"SSEStreamClient: Error reading SSE events: {e}", exc_info=True)
time.sleep(1)
if self._should_log():
logger.info("SSEStreamClient: Read thread stopped.")
def onScheduled(self, context):
self.context = context
if not self._establish_sse_connection(context):
if self._should_log():
logger.error("SSEStreamClient: Failed initial connection onScheduled.")
return
self.stop_thread.clear()
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
def create(self, context):
sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
messages = []
while not self.queue.empty():
try:
messages.append(self.queue.get_nowait())
except queue.Empty:
break
if messages:
content_str = json.dumps(messages, ensure_ascii=False)
content_bytes = content_str.encode('utf-8')
attributes = {
'mime.type': 'application/json',
'batch.size': str(len(messages)),
'sse.url': sse_url
}
if self._should_log():
logger.info(f"SSEStreamClient: Emitting FlowFile with {len(messages)} events.")
return FlowFileSourceResult(
relationship='success',
attributes=attributes,
contents=content_bytes
)
else:
return
def onStopped(self, context):
if self._should_log():
logger.info("SSEStreamClient: onStopped called. Stopping read thread and closing connection.")
self.stop_thread.set()
if self.read_thread:
self.read_thread.join(timeout=5)
self._close_sse_connection()
def onUnscheduled(self, context):
if self._should_log():
logger.info("SSEStreamClient: onUnscheduled called. Stopping read thread and closing connection.")
self.stop_thread.set()
if self.read_thread:
self.read_thread.join(timeout=5)
self._close_sse_connection() In the directory /opt/nifi/nifi-current/python_extensions, create a subfolder (e.g. SSEStreamClient) containing the following three files: SSEStreamClient.py — a file containing the code provided above. requirements.txt — a file containing a single line: sseclient-py. __init__.py — an empty file. Then, restart NiFi. (This setup was created and tested on Dockerized NiFi version 2.4.0 with pip installed.)
... View more