Member since
07-30-2019
944
Posts
197
Kudos Received
91
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1441 | 10-05-2021 01:53 PM | |
16091 | 09-23-2019 06:03 AM | |
6649 | 05-04-2019 08:42 PM | |
1453 | 06-11-2018 12:45 PM | |
12334 | 06-04-2018 01:11 PM |
12-21-2017
08:20 PM
1 Kudo
@sally sally Check the back pressure object threshold value in the connection feeding the MergeContent processor to make sure it has been changed to a value that will allow enough files to queue up.
... View more
04-24-2018
08:57 PM
1 Kudo
@Jose
Gonzalez You can specify more then one host but it is not required. Once the RPG establish a connection to the target host it will retrieve the S2S details of the target cluster and store that locally. If the host you provided become unavailable at anytime after that initial connection , it will try anyone of the other nodes it learned about previously to get S2S details. Having multiple nodes configured helps when NiFi by giving the source Nifi more then one target node to establish initial connection with. - Your load-balancing issue is completely unrelated to how many nodes URLS you configured in your RPG. Here is an article that covers how load-balancing works with an RPG: https://community.hortonworks.com/content/kbentry/109629/how-to-achieve-better-load-balancing-using-nifis-s.html - Thanks, Matt
... View more
10-25-2017
07:52 AM
Thanks Adrian Oprea
... View more
10-24-2017
05:41 PM
@xav webmaster Straight answer: flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end More info on executeScript processor: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html In your particular case, in callback function where you read from input stream, you can scan from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
self.topic_name=''
pass
def get_topic_name(self):
return self.topic_name
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
Log2 = str(Log).split(',')
Brand = Log2[0]
Color = Log2[5]
Model = Log2[1]
if Brand == 'ford' and Color == 'gray':
NewLog = str(Log2)
self.topic_name = 'ford'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'audi' and Color == 'black':
NewLog = str(Log2)
self.topic_name = 'audi'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'bmw' and Color == 'white':
NewLog = str(Log2)
self.topic_name = 'bmw'
outputStream.write(bytearray((NewLog).encode('utf-8')))
# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
caller = PyStreamCallback()
flowFile = session.write(flowFile, caller)
topic_name = caller.get_topic_name()
flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.
... View more
10-06-2017
02:02 AM
I did. Used a executescript processor in which I will pass the directoryname as a dynamic parameter which I get from the listenHTTP and after the executescript I used fetchfile to read a single file and then used puthdfs to load into HDFS.
... View more
08-03-2018
09:44 PM
@Benjamin Hopp @Chad Woodhead I had the exact same issue and tried bringing down both of my two NiFi nodes, waiting a few minutes, and brought them back online. Then I tried turning on the PutHDFS processor and it worked properly. Has anyone figured out why this solves the issue or what is causing this problem?
... View more
09-22-2017
03:08 PM
Thanks Ryan. Can you please verify the following would work? The value of the FlowFileAttribute grok.expression is (?<severity>.{1}) (?<time>.{8}) (?<sequence>.{8}) (?<source>.{12}) (?<destination>.{12}) (?<action>.{30}) %{GREEDYDATA:data} Within Configure Processor of the ExtractGrok Processor, the value of Grok Expression is ${grok.expression} The expected behavior is that the ExtractGrok Processor would continue to work as though the Grok Expression were hardcoded with (?<severity>.{1}) (?<time>.{8}) (?<sequence>.{8}) (?<source>.{12}) (?<destination>.{12}) (?<action>.{30}) %{GREEDYDATA:data}
... View more
10-02-2017
07:12 PM
@John Carter Would you accept the answer so that others know it resolved your issue?
... View more
09-15-2017
08:46 PM
2 Kudos
Hi @sally sally, List Hdfs processor are developed as store the last state.. i.e when you configure ListHDFS processor you are going to specify directory name in properties. once the processor lists all the files existed in that directory at the time it will stores the state as maximum file time when it got stored into HDFS. you can view the state info by clicking on view state button. if you want to clear the state then you need to get into view state and click on clear the state. 2. so once it saves the state in listhdfs processor, if you are running the processor by scheduling as cron(or)timer driven it will only checks for the new files after the state timestamp. Note:- as we are running ListHDFS on primary node only, but this state value will be stored across all the nodes of NiFi cluster as primary node got changed, there won't be any issues regarding duplicates. Example:- hadoop fs -ls /user/yashu/test/ Found 1 items
-rw-r--r-- 3 yash hdfs 3 2017-09-15 16:16 /user/yashu/test/part1.txt when i configure ListHDFS processor to list all the files in the above directory if you see the state of ListHDFS processor that should be same as when part1.txt got stored in HDFS in our case that should be 2017-09-15 16:16 it would be unix time in milliseconds when we convert the state time to date time format that should be Unixtime in milliseconds:- 1505506613479 Timestamp :- 2017-09-15 16:16:53 so the processor has stored the state, when it will run again it will lists only the new files that got stored after the state timestamp in to the directory and updates the state with new state time (i.e maximum file created in hadoop directory).
... View more