Member since
02-01-2022
274
Posts
97
Kudos Received
60
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
401 | 05-15-2025 05:45 AM | |
3385 | 06-12-2024 06:43 AM | |
5912 | 04-12-2024 06:05 AM | |
4053 | 12-07-2023 04:50 AM | |
2176 | 12-05-2023 06:22 AM |
08-19-2025
01:05 AM
Several keys needed to be added: This is an example of the properties we used in KConnect in DH ---------------------------- 1- producer.override.sasl.jaas.config org.apache.kafka.common.security.plain.PlainLoginModule required username="<your-workload-name>" password="<password>"; 2- producer.override.security.protocol SASL_SSL 3- producer.override.sasl.mechanism PLAIN ----------------------------
... View more
06-07-2025
04:42 PM
1 Kudo
OK, my solution is in 2.4 by Python_extensions import threading
import queue
import time
import json
import logging
import requests
import sseclient
import select
from nifiapi.componentstate import Scope, StateManager, StateException
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
from nifiapi.properties import PropertyDescriptor, StandardValidators
from nifiapi.relationship import Relationship
logger = logging.getLogger(__name__)
class SSEStreamClient(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']
class ProcessorDetails:
version = '2.4.0-KPCS'
dependencies = ['sseclient-py', 'requests']
description = '''A Python FlowFileSource that generates FlowFiles by consuming events from an SSE stream.
It handles connection re-establishment and batches multiple SSE events into one FlowFile.
Reads SSE in a separate thread continuously to avoid losing messages.'''
tags = ['sse', 'stream', 'generator', 'source', 'json', 'KPCS']
#REL_FAILURE = Relationship(name="failure", description="FlowFiles are routed to failure when processing fails")
#def getRelationships(self):
# return [self.REL_FAILURE]
PROP_SSE_URL = PropertyDescriptor(
name="SSE URL",
description="The URL of the Server-Sent Events (SSE) stream.",
allowable_values=None,
default_value="https://",
validators=[StandardValidators.URL_VALIDATOR],
required=True
)
PROP_AUTH_TOKEN = PropertyDescriptor(
name="Authorization Token",
description="Bearer token for API authorization, for example: Bearer 111111",
allowable_values=None,
default_value="",
sensitive=True,
required=False
)
PROP_CONNECT_TIMEOUT = PropertyDescriptor(
name="Connection Timeout",
description="Maximum time in seconds to wait for connection to be established. Use 0 for no timeout.",
allowable_values=None,
default_value="10",
validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
required=True
)
PROP_DISABLE_LOGGING = PropertyDescriptor(
name="Disable Logging",
description="If set to true, disables logging to the log file.",
allowable_values=["true", "false"],
default_value="true",
required=False
)
def _should_log(self):
if not self.context:
return True
disable_logging = self.context.getProperty(self.PROP_DISABLE_LOGGING).getValue()
return disable_logging.lower() != "true"
def __init__(self, **kwargs):
if 'jvm' in kwargs:
del kwargs['jvm']
super().__init__(**kwargs)
self.sse_response = None
self.sse_client = None
self.event_iterator = None
self.property_descriptors = [
self.PROP_SSE_URL,
self.PROP_AUTH_TOKEN,
self.PROP_CONNECT_TIMEOUT,
self.PROP_DISABLE_LOGGING
]
self.queue = queue.Queue()
self.read_thread = None
self.stop_thread = threading.Event()
self.context = None
if self._should_log():
logger.info("SSEStreamClient: Initialized.")
def getPropertyDescriptors(self):
return self.property_descriptors
def _establish_sse_connection(self, context):
sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
auth_token = context.getProperty(self.PROP_AUTH_TOKEN).evaluateAttributeExpressions().getValue()
connect_timeout = int(context.getProperty(self.PROP_CONNECT_TIMEOUT).getValue())
headers = {'Accept': 'text/event-stream'}
if auth_token:
headers['Authorization'] = f'{auth_token}'
if self._should_log():
logger.info(f"SSEStreamClient: Connecting to SSE URL: {sse_url}")
self._close_sse_connection()
try:
self.sse_response = requests.get(sse_url, stream=True, headers=headers, timeout=(connect_timeout, None))
self.sse_response.raise_for_status()
self.sse_client = sseclient.SSEClient(self.sse_response)
self.event_iterator = iter(self.sse_client.events())
if self._should_log():
logger.info("SSEStreamClient: SSE connection established.")
return True
except requests.exceptions.RequestException as e:
if self._should_log():
logger.error(f"SSEStreamClient: Connection error: {e}", exc_info=True)
self._close_sse_connection()
return False
except Exception as e:
if self._should_log():
logger.error(f"SSEStreamClient: Unexpected error during connection: {e}", exc_info=True)
self._close_sse_connection()
return False
def _close_sse_connection(self):
if self.sse_response:
try:
self.sse_response.close()
if self._should_log():
logger.debug("SSEStreamClient: SSE response closed.")
except Exception as e:
if self._should_log():
logger.warning(f"SSEStreamClient: Error closing SSE response: {e}")
finally:
self.sse_response = None
self.sse_client = None
self.event_iterator = None
if self._should_log():
logger.info("SSEStreamClient: Connection closed and cleaned up.")
def _read_loop(self):
if self._should_log():
logger.info("SSEStreamClient: Read thread started.")
while not self.stop_thread.is_set():
try:
event = next(self.event_iterator)
if event and event.data:
try:
data = json.loads(event.data)
except json.JSONDecodeError:
data = {"raw": event.data}
self.queue.put(data)
except StopIteration:
if self._should_log():
logger.info("SSEStreamClient: SSE stream ended, reconnecting.")
self._close_sse_connection()
if not self._establish_sse_connection(self.context):
if self._should_log():
logger.error("SSEStreamClient: Failed to reconnect SSE stream.")
time.sleep(5)
except Exception as e:
if self._should_log():
logger.error(f"SSEStreamClient: Error reading SSE events: {e}", exc_info=True)
time.sleep(1)
if self._should_log():
logger.info("SSEStreamClient: Read thread stopped.")
def onScheduled(self, context):
self.context = context
if not self._establish_sse_connection(context):
if self._should_log():
logger.error("SSEStreamClient: Failed initial connection onScheduled.")
return
self.stop_thread.clear()
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
def create(self, context):
sse_url = context.getProperty(self.PROP_SSE_URL).evaluateAttributeExpressions().getValue()
messages = []
while not self.queue.empty():
try:
messages.append(self.queue.get_nowait())
except queue.Empty:
break
if messages:
content_str = json.dumps(messages, ensure_ascii=False)
content_bytes = content_str.encode('utf-8')
attributes = {
'mime.type': 'application/json',
'batch.size': str(len(messages)),
'sse.url': sse_url
}
if self._should_log():
logger.info(f"SSEStreamClient: Emitting FlowFile with {len(messages)} events.")
return FlowFileSourceResult(
relationship='success',
attributes=attributes,
contents=content_bytes
)
else:
return
def onStopped(self, context):
if self._should_log():
logger.info("SSEStreamClient: onStopped called. Stopping read thread and closing connection.")
self.stop_thread.set()
if self.read_thread:
self.read_thread.join(timeout=5)
self._close_sse_connection()
def onUnscheduled(self, context):
if self._should_log():
logger.info("SSEStreamClient: onUnscheduled called. Stopping read thread and closing connection.")
self.stop_thread.set()
if self.read_thread:
self.read_thread.join(timeout=5)
self._close_sse_connection() In the directory /opt/nifi/nifi-current/python_extensions, create a subfolder (e.g. SSEStreamClient) containing the following three files: SSEStreamClient.py — a file containing the code provided above. requirements.txt — a file containing a single line: sseclient-py. __init__.py — an empty file. Then, restart NiFi. (This setup was created and tested on Dockerized NiFi version 2.4.0 with pip installed.)
... View more
06-04-2025
06:27 AM
Excellent article @zzeng 👍 !!
... View more
05-15-2025
05:45 AM
1 Kudo
@brajs Yes, it is possible to make custom flow analysis rules. This is still tech preview in nifi 2.0 so documentation is limited. I would recommend to take a look at some existing rules, tear them down to source code, modify to suit, and build and package your custom rules. Once you have a new rules nar file, just deliver it to the nifi /lib location and it should be available to use.
... View more
02-20-2025
06:31 AM
I just know from many years of setting up ranger within ambari with usersync. I doubt current CDP docs will strictly call it out.
... View more
02-20-2025
03:03 AM
1 Kudo
I have solved this problem.
... View more
12-03-2024
05:24 AM
1 Kudo
@mikecolux Can you include a screenshot of how the List/FetchSFTP processors are configured?
... View more
09-25-2024
07:46 AM
@jasonjojo Check out trycloudera.com... These are trials for CDP Public Cloud which will give you a great chance to see what the new Cloudera looks like. TL;DR; its not hadoop anymore.
... View more
09-16-2024
05:05 AM
@DataEngineer07 Have you been able to resolved your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
... View more
09-11-2024
06:17 AM
@moshell281 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more