Member since
05-20-2022
66
Posts
6
Kudos Received
6
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1975 | 07-17-2023 11:25 PM | |
2102 | 04-17-2023 02:29 PM | |
8443 | 02-15-2023 08:47 PM | |
1476 | 02-08-2023 06:02 PM | |
7422 | 10-17-2022 11:48 AM |
06-07-2025
04:42 PM
1 Kudo
OK, my solution is in 2.4 by Python_extensions import threading
import queue
import time
import json
import logging
import requests
import sseclient
import select
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators
from nifiapi.relationship import Relationship
logger = logging.getLogger(__name__)
class SSEStreamClient(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '2.4.0-KPCS'
dependencies = ['sseclient-py', 'requests']
description = '''A Python FlowFileSource that generates FlowFiles by consuming events from an SSE stream.
It handles connection re-establishment and batches multiple SSE events into one FlowFile.
Reads SSE in a separate thread continuously to avoid losing messages.'''
tags = ['sse', 'stream', 'generator', 'source', 'json', 'KPCS']
#REL_FAILURE = Relationship(name="failure", description="FlowFiles are routed to failure when processing fails")
#def getRelationships(self):
# return [self.REL_FAILURE]
PROP_SSE_URL = PropertyDescriptor(
name="SSE URL",
description="The URL of the Server-Sent Events (SSE) stream.",
allowable_values=None,
default_value="https://",
validators=[StandardValidators.URL_VALIDATOR],
required=True
)
PROP_AUTH_TOKEN = PropertyDescriptor(
name="Authorization Token",
description="Bearer token for API authorization, for example: Bearer 111111",
allowable_values=None,
default_value="",
sensitive=True,
required=False
)
PROP_CONNECT_TIMEOUT = PropertyDescriptor(
name="Connection Timeout",
description="Maximum time in seconds to wait for connection to be established. Use 0 for no timeout.",
allowable_values=None,
default_value="10",
validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
required=True
)
PROP_DISABLE_LOGGING = PropertyDescriptor(
name="Disable Logging",
description="If set to true, disables logging to the log file.",
allowable_values=["true", "false"],
default_value="true",
required=False
)
def _should_log(self):
if not self.context:
return True
disable_logging = self.context.getProperty(self.PROP_DISABLE_LOGGING).getValue()
return disable_logging.lower() != "true"
def __init__(self, **kwargs):
if 'jvm' in kwargs:
del kwargs['jvm']
super().__init__(**kwargs)
self.sse_response = None
self.sse_client = None
self.event_iterator = None
self.property_descriptors = [
self.PROP_SSE_URL,
self.PROP_AUTH_TOKEN,
self.PROP_CONNECT_TIMEOUT,
self.PROP_DISABLE_LOGGING
]
self.queue = queue.Queue()
self.read_thread = None
self.stop_thread = threading.Event()
self.context = None
if self._should_log():
logger.info("SSEStreamClient: Initialized.")
def getPropertyDescriptors(self):
return self.property_descriptors
def _establish_sse_connection(self, context):
sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
auth_token = context.getProperty(self.PROP_AUTH_TOKEN).evaluateAttributeExpressions().getValue()
connect_timeout = int(context.getProperty(self.PROP_CONNECT_TIMEOUT).getValue())
headers = {'Accept': 'text/event-stream'}
if auth_token:
headers['Authorization'] = f'{auth_token}'
if self._should_log():
logger.info(f"SSEStreamClient: Connecting to SSE URL: {sse_url}")
self._close_sse_connection()
try:
self.sse_response = requests.get(sse_url, stream=True, headers=headers, timeout=(connect_timeout, None))
self.sse_response.raise_for_status()
self.sse_client = sseclient.SSEClient(self.sse_response)
self.event_iterator = iter(self.sse_client.events())
if self._should_log():
logger.info("SSEStreamClient: SSE connection established.")
return True
except requests.exceptions.RequestException as e:
if self._should_log():
logger.error(f"SSEStreamClient: Connection error: {e}", exc_info=True)
self._close_sse_connection()
return False
except Exception as e:
if self._should_log():
logger.error(f"SSEStreamClient: Unexpected error during connection: {e}", exc_info=True)
self._close_sse_connection()
return False
def _close_sse_connection(self):
if self.sse_response:
try:
self.sse_response.close()
if self._should_log():
logger.debug("SSEStreamClient: SSE response closed.")
except Exception as e:
if self._should_log():
logger.warning(f"SSEStreamClient: Error closing SSE response: {e}")
finally:
self.sse_response = None
self.sse_client = None
self.event_iterator = None
if self._should_log():
logger.info("SSEStreamClient: Connection closed and cleaned up.")
def _read_loop(self):
if self._should_log():
logger.info("SSEStreamClient: Read thread started.")
while not self.stop_thread.is_set():
try:
event = next(self.event_iterator)
if event and event.data:
try:
data = json.loads(event.data)
except json.JSONDecodeError:
data = {"raw": event.data}
self.queue.put(data)
except StopIteration:
if self._should_log():
logger.info("SSEStreamClient: SSE stream ended, reconnecting.")
self._close_sse_connection()
if not self._establish_sse_connection(self.context):
if self._should_log():
logger.error("SSEStreamClient: Failed to reconnect SSE stream.")
time.sleep(5)
except Exception as e:
if self._should_log():
logger.error(f"SSEStreamClient: Error reading SSE events: {e}", exc_info=True)
time.sleep(1)
if self._should_log():
logger.info("SSEStreamClient: Read thread stopped.")
def onScheduled(self, context):
self.context = context
if not self._establish_sse_connection(context):
if self._should_log():
logger.error("SSEStreamClient: Failed initial connection onScheduled.")
return
self.stop_thread.clear()
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
def create(self, context):
sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
messages = []
while not self.queue.empty():
try:
messages.append(self.queue.get_nowait())
except queue.Empty:
break
if messages:
content_str = json.dumps(messages, ensure_ascii=False)
content_bytes = content_str.encode('utf-8')
attributes = {
'mime.type': 'application/json',
'batch.size': str(len(messages)),
'sse.url': sse_url
}
if self._should_log():
logger.info(f"SSEStreamClient: Emitting FlowFile with {len(messages)} events.")
return FlowFileSourceResult(
relationship='success',
attributes=attributes,
contents=content_bytes
)
else:
return
def onStopped(self, context):
if self._should_log():
logger.info("SSEStreamClient: onStopped called. Stopping read thread and closing connection.")
self.stop_thread.set()
if self.read_thread:
self.read_thread.join(timeout=5)
self._close_sse_connection()
def onUnscheduled(self, context):
if self._should_log():
logger.info("SSEStreamClient: onUnscheduled called. Stopping read thread and closing connection.")
self.stop_thread.set()
if self.read_thread:
self.read_thread.join(timeout=5)
self._close_sse_connection() In the directory /opt/nifi/nifi-current/python_extensions, create a subfolder (e.g. SSEStreamClient) containing the following three files: SSEStreamClient.py — a file containing the code provided above. requirements.txt — a file containing a single line: sseclient-py. __init__.py — an empty file. Then, restart NiFi. (This setup was created and tested on Dockerized NiFi version 2.4.0 with pip installed.)
... View more
09-08-2024
08:38 PM
can I keep the JSON file as a constatnts in nifi itself? so that I can avoid the invokeHTTP call. every time I received a flow data, I just need to check the id from the flow data and using that i need to find the correspondent entry from the conastant json file and append those in to the flow data.
... View more
12-01-2023
01:13 PM
You are awesome @SAMSAL Thanks so much for the great information. This helps me a lot. Thank goodness for this community, because I think the documentation is nearly adequate.
... View more
08-07-2023
11:37 AM
The only way to use OAuth2 that I'm aware of is with the InvokeHTTP processor, which in theory should work for connecting to SSE end points. However, I haven't been able to get it to work and I've tried adjusting many of the Request and Response parameters to no avail. If you figure it out then please share.
... View more
08-07-2023
08:30 AM
Are you referring to validation failures? If this is what you are referring to then the answer is, you can't. If you get failures then you'll need to queue the failures up and validate them manually to see why they failed. I use xmllint. For example: xmllint --noout --schema my_data_schema.xsd my_data.xml
... View more
07-17-2023
11:25 PM
1 Kudo
I managed to get it working by putting the parameter context reference in quotes and then using an evaluateString() function as shown below. ${#{'Entity Service'}:evaluateELString()}
... View more
06-01-2023
10:33 AM
https://issues.apache.org/jira/browse/NIFI-11627 Good Idea. Here is the reference.
... View more
05-30-2023
09:28 PM
Thanks for taking the time to reply Matt, but this doesn't address my question. Let me use an example and hopefully that will clarify what I'm trying to do. Within a process group named "transit", there is a "Process Group Parameter Context" named "transit_variables". This parameter context (transit_variables) includes all the high level variables used throughout the process group, but it also contains a large number of Avro schemas which are referenced in an AvroSchemaRegistry controller service, which is defined within the main process group. For the sake of organizational purposes I'd like to move these Avro schemas into their own parameter context, BUT (this is the catch), I want access to them from the AvroSchemaRegistry whose scope is associated with the process group, whose scope is set to the parent parameter context. I can "solve" this problem by creating a child process group (e.g. transit_data_parser) with a new child parameter context (e.g. transit_schemas), then add additional AvroSchemaRegistry, JSONTreeReader, JSONRecordsetWriter controller services, which are coupled with the child parameter context "transit_schemas". The process group would contain a single processor with an input port and and output port, and would be used solely for the purpose of reading a flowfile into a NiFi Recordset object with Avro validation. This so-called solution is very tightly coupled and as such not very flexible. It means that every time I want to use a schema I need to create one of these tightly couple process groups, which defeats the whole purpose of the context parameter concept. What would be better is if I could reference these child schemas using a hierarchal object syntax, e.g. #{transit_variables.transit_schemas}. Using this syntax from within a single AvroSchemaRegistry controller service I can reference parameters at the parent level and all parameters at the child levels.
... View more
05-24-2023
07:31 AM
Hi @ChuckE, yes i had seen that note before and imagine that they fixed a similar issue but didnt catch this specific case, i am posting an issue in Jira to hopefully find a fix.
... View more
05-22-2023
09:07 AM
What you are doing is mostly correct. I see a couple of items that may be creating some issues, e.g. mis-assigned values. For example, you are assigning accel.x to all three attributes values ax, ay, az. But it seems like you probably meant to assign ax = $.accel.x , ay =$.accel.y , az =$.accel.z It seems you've mixed up "random" and "rand". The field name in your JSON is "random" so your attribute assignment should be rand = $.random Where is "btn" in your JSON?
... View more