Member since
06-26-2015
515
Posts
140
Kudos Received
114
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2581 | 09-20-2022 03:33 PM | |
| 6974 | 09-19-2022 04:47 PM | |
| 3680 | 09-11-2022 05:01 PM | |
| 4291 | 09-06-2022 02:23 PM | |
| 6800 | 09-06-2022 04:30 AM |
07-03-2022
10:26 PM
@pk87 , The HandleHttpRequest processor will only produce a flowfile when an HTTP call is received on its port. If you want this to be called every 60 minutes you can add an InvokeHTTP processor, scheduled to run every 60 minutes, that will call your API endpoint in the same way that Postman does. You actually don't need the HandleHttpProcessor to run this on a regular basis. You could possibly replace that with a GenerateFlowFile that executes every 60 minutes and will trigger the process for you. Cheers, André
... View more
06-29-2022
06:14 PM
@MattWho , No sure this is what's happening here, but if the disk is filled with other stuff that's outside of NiFi's control and the overall disk usage is still hitting the configured NiFi limits, the same thing would happen, right? André
... View more
06-29-2022
01:41 AM
@roshanbi , You must configure your Kafka consumer to use a consumer group and enable offset commits. This way the client will periodically save the last read offset internally in Kafka so that it can pick up from where it left off upon restarts. Please check the Kafka documentation for the meaning of the properties below: group.id enable.auto.commit auto.offset.reset Cheers, André
... View more
06-29-2022
01:35 AM
1 Kudo
@harvey , You are probably running into the problem described in this Technical Service Bulletin. Please check the bulletin for the solution/workaround. Cheers, André
... View more
06-28-2022
05:14 PM
1 Kudo
@samaan_filho , You can have 2 docker containers running NiFi, each one using the same local port 8443. You cannot expose both of those ports with the same port number to your local machine, though. You'd have to map to different port numbers when exposing them. Can you share more details? Alternatively, please take a look at this article: https://community.cloudera.com/t5/Community-Articles/NiFi-cluster-sandbox-on-Docker/ta-p/346271 Cheers, André
... View more
06-27-2022
04:40 AM
@jacektrocinski , It would be better to report the bug creating a bug Jira in the Apache project: https://issues.apache.org/jira/projects/NIFI/summary Cheers, André
... View more
06-26-2022
02:17 PM
1 Kudo
@Luwi , please note that NiFi currently only supports Java 8 and 11. Cheers Andre
... View more
06-24-2022
10:47 PM
@Eric_B , This is a behaviour inherited from Java. Java caches DNS resolved names internally. You can adjust the cache TTL value by specifying the corresponding Java property during NiFi startup. Please see a description of the property here: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html Cheers, André
... View more
06-24-2022
10:44 PM
@HTalha , Another way to do this is to use the ExecuteScript processor with the following python script: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS) Cheers, André
... View more
06-24-2022
10:15 PM
@Brenigan , The issue that you are seeing is because you are instantiating PyStreamCallback twice. You should do it once and reference that object in the subsequent calls to the session functions. The code below works as you'd expect: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
#Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
self.length = 0
def process(self, inputStream, outputStream):
jsn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
array = json.loads(jsn) # type: dict
i = 0
while i <= 1:
root_key = list(array.keys())[0]
array = array[root_key]
i += 1
self.length = str(len(array))
def get_length_of_array(self):
return self.length
# end class
flowfile = session.get()
if(flowfile != None):
reader = PyStreamCallback()
flowfile = session.write(flowfile, reader)
flowfile = session.putAttribute(flowfile, "length", reader.get_length_of_array())
session.transfer(flowfile, REL_SUCCESS) There is a simpler way to do what you're trying to do, though. For example, say you have the following JSON object in the incoming flowfile: {
"root": {
"items": [1, 2]
}
} If you want to set the flowfile "length" attribute with the length of the "items" array, you can simply use the EvaluateJsonPath processor with the following configuration: Cheers, André
... View more