Member since
06-26-2015
515
Posts
139
Kudos Received
114
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2346 | 09-20-2022 03:33 PM | |
| 6283 | 09-19-2022 04:47 PM | |
| 3366 | 09-11-2022 05:01 PM | |
| 3885 | 09-06-2022 02:23 PM | |
| 6003 | 09-06-2022 04:30 AM |
07-05-2022
05:17 AM
@dida The following connector configuration worked for me. My schema was stored in Schema Registry and the connector fetched it from there. {
"connector.class": "com.cloudera.dim.kafka.connect.hdfs.HdfsSinkConnector",
"hdfs.output": "/tmp/topics_output/",
"hdfs.uri": "hdfs://nn1:8020",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "asd",
"output.avro.passthrough.enabled": "true",
"output.storage": "com.cloudera.dim.kafka.connect.hdfs.HdfsPartitionStorage",
"output.writer": "com.cloudera.dim.kafka.connect.hdfs.parquet.ParquetPartitionWriter",
"tasks.max": "1",
"topics": "avro-topic",
"value.converter": "com.cloudera.dim.kafka.connect.converts.AvroConverter",
"value.converter.passthrough.enabled": "false",
"value.converter.schema.registry.url": "http://sr-1:7788/api/v1"
} Cheers, André
... View more
07-01-2022
10:02 AM
@roshanbi Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks
... View more
06-30-2022
03:58 PM
@snm1523 , Sorry, I don't remember either. Unfortunately I don't have a cluster handy now to confirm this. Cheers, André
... View more
06-29-2022
01:35 AM
1 Kudo
@harvey , You are probably running into the problem described in this Technical Service Bulletin. Please check the bulletin for the solution/workaround. Cheers, André
... View more
06-28-2022
05:14 PM
1 Kudo
@samaan_filho , You can have 2 docker containers running NiFi, each one using the same local port 8443. You cannot expose both of those ports with the same port number to your local machine, though. You'd have to map to different port numbers when exposing them. Can you share more details? Alternatively, please take a look at this article: https://community.cloudera.com/t5/Community-Articles/NiFi-cluster-sandbox-on-Docker/ta-p/346271 Cheers, André
... View more
06-27-2022
06:35 AM
Thanks, I have filed a JIRA bug: https://issues.apache.org/jira/browse/NIFI-10171
... View more
06-26-2022
02:17 PM
1 Kudo
@Luwi , please note that NiFi currently only supports Java 8 and 11. Cheers Andre
... View more
06-24-2022
10:44 PM
@HTalha , Another way to do this is to use the ExecuteScript processor with the following python script: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import re
class SplitAndConvertToJson(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input = re.sub(r'(^\s*\{\s*|\s*\}\s*)', '', input)
fields = input.split(',')
obj = dict([('val_%s' % (i,), v.rstrip()) for i, v in enumerate(fields)])
outputStream.write(bytearray(json.dumps(obj).encode('utf-8')))
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, SplitAndConvertToJson())
session.transfer(flowfile, REL_SUCCESS) Cheers, André
... View more
06-24-2022
10:27 PM
@rookie-xxd , Could you provide the following information: Are you using TLS? A screenshot of the "Hosts" page in Cloudera Manager showing the registered hosts The content of the /etc/hosts file on the machine that is failing to heartbeat Cheers, André
... View more
06-24-2022
10:15 PM
@Brenigan , The issue that you are seeing is because you are instantiating PyStreamCallback twice. You should do it once and reference that object in the subsequent calls to the session functions. The code below works as you'd expect: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
#Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
self.length = 0
def process(self, inputStream, outputStream):
jsn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
array = json.loads(jsn) # type: dict
i = 0
while i <= 1:
root_key = list(array.keys())[0]
array = array[root_key]
i += 1
self.length = str(len(array))
def get_length_of_array(self):
return self.length
# end class
flowfile = session.get()
if(flowfile != None):
reader = PyStreamCallback()
flowfile = session.write(flowfile, reader)
flowfile = session.putAttribute(flowfile, "length", reader.get_length_of_array())
session.transfer(flowfile, REL_SUCCESS) There is a simpler way to do what you're trying to do, though. For example, say you have the following JSON object in the incoming flowfile: {
"root": {
"items": [1, 2]
}
} If you want to set the flowfile "length" attribute with the length of the "items" array, you can simply use the EvaluateJsonPath processor with the following configuration: Cheers, André
... View more