Member since
07-30-2019
898
Posts
195
Kudos Received
91
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
700 | 10-05-2021 01:53 PM | |
12916 | 09-23-2019 06:03 AM | |
4824 | 05-04-2019 08:42 PM | |
910 | 06-11-2018 12:45 PM | |
8929 | 06-04-2018 01:11 PM |
10-25-2017
07:52 AM
Thanks Adrian Oprea
... View more
10-24-2017
05:41 PM
@xav webmaster Straight answer: flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end More info on executeScript processor: https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html In your particular case, in callback function where you read from input stream, you can scan from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
self.topic_name=''
pass
def get_topic_name(self):
return self.topic_name
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
Log2 = str(Log).split(',')
Brand = Log2[0]
Color = Log2[5]
Model = Log2[1]
if Brand == 'ford' and Color == 'gray':
NewLog = str(Log2)
self.topic_name = 'ford'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'audi' and Color == 'black':
NewLog = str(Log2)
self.topic_name = 'audi'
outputStream.write(bytearray((NewLog).encode('utf-8')))
if Brand == 'bmw' and Color == 'white':
NewLog = str(Log2)
self.topic_name = 'bmw'
outputStream.write(bytearray((NewLog).encode('utf-8')))
# add exception handling if needed for empty flowfile content, etc
if(flowFile != None):
caller = PyStreamCallback()
flowFile = session.write(flowFile, caller)
topic_name = caller.get_topic_name()
flowFile = session.putAttribute(flowFile, 'kafka_topic', topic_name)
Hope that will help.
... View more
10-06-2017
02:02 AM
I did. Used a executescript processor in which I will pass the directoryname as a dynamic parameter which I get from the listenHTTP and after the executescript I used fetchfile to read a single file and then used puthdfs to load into HDFS.
... View more
08-03-2018
09:44 PM
@Benjamin Hopp @Chad Woodhead I had the exact same issue and tried bringing down both of my two NiFi nodes, waiting a few minutes, and brought them back online. Then I tried turning on the PutHDFS processor and it worked properly. Has anyone figured out why this solves the issue or what is causing this problem?
... View more
09-22-2017
03:08 PM
Thanks Ryan. Can you please verify the following would work? The value of the FlowFileAttribute grok.expression is (?<severity>.{1}) (?<time>.{8}) (?<sequence>.{8}) (?<source>.{12}) (?<destination>.{12}) (?<action>.{30}) %{GREEDYDATA:data} Within Configure Processor of the ExtractGrok Processor, the value of Grok Expression is ${grok.expression} The expected behavior is that the ExtractGrok Processor would continue to work as though the Grok Expression were hardcoded with (?<severity>.{1}) (?<time>.{8}) (?<sequence>.{8}) (?<source>.{12}) (?<destination>.{12}) (?<action>.{30}) %{GREEDYDATA:data}
... View more
10-02-2017
07:12 PM
@John Carter Would you accept the answer so that others know it resolved your issue?
... View more
09-15-2017
08:46 PM
2 Kudos
Hi @sally sally, List Hdfs processor are developed as store the last state.. i.e when you configure ListHDFS processor you are going to specify directory name in properties. once the processor lists all the files existed in that directory at the time it will stores the state as maximum file time when it got stored into HDFS. you can view the state info by clicking on view state button. if you want to clear the state then you need to get into view state and click on clear the state. 2. so once it saves the state in listhdfs processor, if you are running the processor by scheduling as cron(or)timer driven it will only checks for the new files after the state timestamp. Note:- as we are running ListHDFS on primary node only, but this state value will be stored across all the nodes of NiFi cluster as primary node got changed, there won't be any issues regarding duplicates. Example:- hadoop fs -ls /user/yashu/test/ Found 1 items
-rw-r--r-- 3 yash hdfs 3 2017-09-15 16:16 /user/yashu/test/part1.txt when i configure ListHDFS processor to list all the files in the above directory if you see the state of ListHDFS processor that should be same as when part1.txt got stored in HDFS in our case that should be 2017-09-15 16:16 it would be unix time in milliseconds when we convert the state time to date time format that should be Unixtime in milliseconds:- 1505506613479 Timestamp :- 2017-09-15 16:16:53 so the processor has stored the state, when it will run again it will lists only the new files that got stored after the state timestamp in to the directory and updates the state with new state time (i.e maximum file created in hadoop directory).
... View more
09-14-2017
02:18 PM
1 Kudo
@Mitthu Wagh There is no way to change the 5 minute running average on the processors. What are you looking for? You can still see the stats from the processor by hovering the cursor over the processor and right=click, then a menu pops up and you can see the Status History of the processor: A new window opens with a choice of stats to view
... View more
09-18-2017
06:39 AM
3 Kudos
@Simon Jespersen This looks like an authentication issue. For the given topic can you add ACLs for anonymous user as the protocol is PLAINTEXT? bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<zookeeper:host> --add --allow-principal User:ANONYMOUS --operation Read --operation Write --operation Describe --topic <topic>
... View more