Member since
06-26-2015
515
Posts
140
Kudos Received
114
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2581 | 09-20-2022 03:33 PM | |
| 6975 | 09-19-2022 04:47 PM | |
| 3680 | 09-11-2022 05:01 PM | |
| 4293 | 09-06-2022 02:23 PM | |
| 6801 | 09-06-2022 04:30 AM |
06-26-2022
02:17 PM
1 Kudo
@Luwi , please note that NiFi currently only supports Java 8 and 11. Cheers Andre
... View more
06-24-2022
10:44 PM
@HTalha , Another way to do this is to use the ExecuteScript processor with the following python script: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS) Cheers, André
... View more
06-24-2022
10:15 PM
@Brenigan , The issue that you are seeing is because you are instantiating PyStreamCallback twice. You should do it once and reference that object in the subsequent calls to the session functions. The code below works as you'd expect: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
#Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
self.length = 0
def process(self, inputStream, outputStream):
jsn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
array = json.loads(jsn) # type: dict
i = 0
while i <= 1:
root_key = list(array.keys())[0]
array = array[root_key]
i += 1
self.length = str(len(array))
def get_length_of_array(self):
return self.length
# end class
flowfile = session.get()
if(flowfile != None):
reader = PyStreamCallback()
flowfile = session.write(flowfile, reader)
flowfile = session.putAttribute(flowfile, "length", reader.get_length_of_array())
session.transfer(flowfile, REL_SUCCESS) There is a simpler way to do what you're trying to do, though. For example, say you have the following JSON object in the incoming flowfile: {
"root": {
"items": [1, 2]
}
} If you want to set the flowfile "length" attribute with the length of the "items" array, you can simply use the EvaluateJsonPath processor with the following configuration: Cheers, André
... View more
06-24-2022
11:08 AM
Hi @araujo I'm using Nginx Proxy Manager, in the same Docker, maybe was working before because the cert was not I'm trying to connect via localhost.
... View more
06-23-2022
12:30 AM
Hello @araujo, by the time I logged in to the node to check the entropy_avail value it became good, this issue seems to resolve fast as from the Cloudera Alert mail I can see good status within the next minute after this issue occurred. Also, from the screenshot attached you can the value was 1.
... View more
06-22-2022
07:31 PM
1 Kudo
@baymax277 , Try this: This is the regex: (?s)(^\[|\]$) You may have to increase the buffer size to accommodate your file. Cheers, André
... View more
06-22-2022
04:13 PM
1 Kudo
@nada , Please check this solution: https://community.cloudera.com/t5/Community-Articles/Decompressing-nested-ZIP-files-in-NiFi/ta-p/346169 Cheers, André
... View more
06-13-2022
08:52 AM
@migu Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. If you are still experiencing the issue, can you provide the information @araujo has requested? Thanks
... View more
06-11-2022
01:40 AM
@MattWho Thanks Matt, I understand what you're saying. in my case it's the other way around, I have clients (MiNiFi) initiating the connections when they want to send or receive data. This helps me deal with remote firewalls not allowing incoming connections and sporadic availability as they're the ones starting the connection. At the moment, with S2S, The MiNiFi's connect periodically (every 10s) to see I there are any flowfiles waiting for them. I would need to figure out how to periodically generate that action to perform the same flow. Example scenarios: 1. Client (MiNiFi side) is running a tail on a file. A new line comes into the file and needs to be sent over to NiFi for processing. 2. NiFi wants to send a file to the remote MiNiFi running a putfile processor. I have both scenarios working right now with S2S with no issues to a bunch of MiNiFi's, it's just that this requires hacky workarounds (scripts to modify the MiNiFi yml) to make this work and I would love to figure out a cleaner solution.
... View more