Member since
07-28-2017
47
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
10162 | 03-13-2018 12:04 PM | |
5433 | 10-12-2017 08:52 AM |
03-13-2018
12:04 PM
Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.
... View more
03-13-2018
11:09 AM
@Matt Burgess Seems i could do all API operations within the process function of the CallBack class, but in this case i need access to the flowfile attributes besides the content within the class scope. Tried to pass a flowfile reference to the class definition, but failed. Any ideas?
... View more
03-12-2018
12:21 PM
I am trying to create a Python script in NiFi that: Reads some attributes from an incoming flowfile Read the json content of the flowfile & extract specific fields Write attributes to outgoing flowfile Overwrite incoming flowfile with new content that is created in the script (e.g. API call that returns new json) and send it to SUCCESS relationship OR remove the old flowfile and create new with desired content What i ve done so far: import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit() My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach?
Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ?
Any help much appreciated!
... View more
Labels:
- Labels:
-
Apache NiFi
02-27-2018
11:09 AM
Our latest use case includes real time ingestion of messages:
Sensor data from a Hub/Gateway that is a central point of aggregation for individual sensor devices Events processing from the organization's Enterprise Service Bus NiFi (standalone 1 node) & Kafka (4 brokers) are available in our cluster, however by design and due to security limitations Kafka is restricted from Internet access. I would like some ideas/tips about what approach is the best. Since we cannot send the data directly to Kafka, NiFi could act as intermediary and publish the data to Kafka topics with the built in processors. This could be done by using the ListenHTTP processor (or any other Listening processor that supports protocols supported by the ESB adaptor), but i read in the docs that only POST operations are allowed there and i was wondering if there are other limitations that restrict listening functionality to the very basics and thus would not be suitable for our use case. Have you seen use cases where NiFi is used in such a manner, or would you suggest another approach? For the sensor data, i guess we could use the ConsumeMQTT (or again ListenHTTP/TCP) depending on what is exactly supported at the Gateway. Thanks in advance! @Timothy Spann @Pierre Villard
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi
02-19-2018
02:14 PM
@Timothy Spann Great article, but i am a bit confused between spark-streaming and spark-structured streaming intetgration with Kafka. Which one is advised to use for similar use cases, is spark-streaming planned to be deprecated soon?
... View more
01-29-2018
03:59 PM
Hello, Trying to setup the NiFi dataflow for the HDF tutorial, interested in the Kafka part but as far as i understand i must first complete the NiFi part. However the ExecuteProcess processor gives 'Permission denied' error for the generate.sh script: Even though i ve given all permissions to the whole repo: Any ideas?
... View more
Labels:
- Labels:
-
Apache NiFi
-
Cloudera DataFlow (CDF)
01-11-2018
09:02 AM
@Matt Clarke That is exactly why i want to split the logs based on regex, so that trace log lines are properly handled. I think i ll go with a fixed sequence for now looking for `2018-` and i hope till next year something has changed! I could also use ExtractText to group logs like ( ( (*date*)(*log*) )(*date*) ) and then Extract text to flowfile content keeping only ( (*date*)(*log*) ) part, but i dont know if ExtractText can produce multiple flowfiles this way with each match found in the initial logfile. Or save the matches as attributes and then construct new flowfiles with these values.
... View more
01-10-2018
04:14 PM
2 Kudos
My use case is to configure custom logs based on NiFi loggings, I would like to: 1) split the logfiles into multiple flowfiles, one per every log record in the loggings 2) Reconstruct the logfiles based on some common pattern in the log record (e.g. ERROR/INFO, processor name, etc) For #1 i run into problem as the loggings have this form: 2018-01-08 13:43:55,215 ERROR bla bla bla
bla bla
bla bla
******************SPLIT HERE**************
2018-01-08 13:43:55,215 ERROR bla bla bla
bla bla
bla bla
bla bla bla
something else
bla bla bla
******************SPLIT HERE**************
2018-01-08 13:43:55,215 ERROR bla bla bla
bla
******************SPLIT HERE**************
Each new log record starts with date/time/level information so i could use that to split the big logfile into seperate ones per individual logging, but `SplitText` processor accepts only # of lines and not a regex pattern. `ExtractText` could be another option but then is a bit unclear how all the different regex groups should be configured to get the necessary text only. Any ideas for this or another approach?
... View more
Labels:
- Labels:
-
Apache NiFi
01-10-2018
02:45 PM
@Greg Keys Your blog is great to clarify a bit the approach one can use, however i run into problems with diverse format of logs that span through multiple lines and then with this approach one logline can be split in multiple flowfiles which is not desired. Is there a way to use SplitText to split the files in specific patterns?
... View more
01-04-2018
03:14 PM
@Greg Keys I would like to apply the same approach, but i prefer to extract logs per process group/flow instead at the processor level. However i see no process group/flow related information in the NiFi logs. Further, i see no processor name as well but only processor type, meaning i have around 20 ExtractText processors with different (customizable) names, it is quite some effort to extract information based on processor IDs only.
... View more
12-14-2017
03:49 PM
1 Kudo
Hi @Shu yes that was exactly the problem, now the individual CSVs are created just fine but in the meantime another problem occured. When the individual CSVs are merged with the MergedContent processor then the Merged CSV is all in one line instead of seperate lines. Is there a way to bypass this? MergeContent:
... View more
12-14-2017
10:19 AM
Hi @Matt Burgess, here is an example of the incoming JSON files, all have same attributes: {
"features": [
{
"feature": {
"paths": [
[
[
214985.27600000054,
427573.33100000024
],
[
215011.98900000006,
427568.84200000018
],
[
215035.35300000012,
427565.00499999896
],
[
215128.48900000006,
427549.4290000014
],
[
215134.43699999899,
427548.65599999949
],
[
215150.86800000072,
427546.87900000066
],
[
215179.33199999854,
427544.19799999893
]
]
]
},
"attributes": {
"attribute1": "value",
"attribute2": "value",
"attribute3": "value",
"attribute4": "value",
}
}
]
}
EvaluateJSONpath: Where i add properties for each attribute i want to parse: attribute1: $.features[0].attributes.attribute1 etc. etc. ReplaceText: I think something goes wrong in my configuration here, because even before the MergeContent the single CSVs created per JSON file contain hundreds of duplicate rows, whereas it should be just one row per CSV that they are gonna be later merged into a big CSV file.
... View more
12-13-2017
12:42 PM
i ve no idea why my screenshots are doubleposted, whatever i tried to fix it fails 🙂
... View more
12-13-2017
12:39 PM
I have a use case where JSON files are read from an API, transformed to CSV and imported to Hive tables, however my flow fails at the replace text processor. Can you give some advice on the configuration of the processor or on where my approach fails? InvokeHTTP --> EvaluateJsonPath --> ReplaceText --> MergeContent --> UpdateAttribute --> PutHDFS My flow does several HTTP calls with InvokeHTTP (Each call with different ID), extracts attributes from each JSON that is returned (each JSON is unique) and then creates the csv's in the ReplaceText processor as following: ${attribute1},${attribute2},${attribute3},${attribute4},${attribute5},${attribute6},${attribute7} However after the MergeContent processor inthe merged CSV there is really a lot of duplicate data while all incoming JSONs contain unique data.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache NiFi
12-13-2017
10:36 AM
@Timothy Spann I have a similar use case, however my flow fails at the replace text processor. Can you give some advice on the configuration of the processor? InvokeHTTP --> EvaluateJsonPath --> ReplaceText --> MergeContent --> UpdateAttribute --> PutHDFS My flow does several HTTP calls with InvokeHTTP (Each call with different ID), extracts attributes from each JSON that is returned (each JSON is unique) and then creates the csv's like in your example. However after the MergeContent processor the merged CSV there is really a lot of duplicate data while all incoming JSONs contain unique data. ReplaceText conf: MergeContent conf:
... View more
12-06-2017
03:43 PM
How do we connect a ConvertJsonToSQL processor to a Hive DB? You cannot select a HiveConnectionPool as the value of the JDBC Connection Pool property and if i try to configure a DBCP connection pool with Hive like this: I get a driver error: Cannot load JDBC driver class for the class i have configured. My flow uses an incoming JSON file to create the SQL statement with the ConvertJsonToSQL processor, and then i want to send the flowfile with the SQL statement to PutHiveQL to insert the data into Hive. Is there something wrong with my approach? Thanks in advance!
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache NiFi
12-06-2017
03:34 PM
Hi @Matt Clarke, in this case how we connect a ConvertJsonToSQL processor to Hive DB? You cannot select a HiveConnectionPool as the value of the JDBC driver property and if i try to setup configure a DBCP connection pool with Hive like this: I get a driver error: Cannot load JDBC driver class for the class i have configured. My flow uses an incoming JSON file to create the SQL statement with the ConvertJsonToSQL processor, and then i want to send the flowfile with the SQL statement to PutHiveQL to insert the data into Hive. Is there something wrong with my approach? Thanks in advance!
... View more
11-29-2017
04:19 PM
I need to read an API where my first call will return a JSON with objectIDs in this form: {
objectIdFieldName: "ID",
objectIds: [ 64916,
67266,
67237,
64511, .... ..] } I need to use the objectIds above to send requests for each of these IDs to the API that will return the data that i need. I was thinking a flow: GetHTTP (get response JSON) --> EvaluateJSONpath (parse only objectIds field: $.objectIds) --> ? --> InvokeHTTP (new query per ID) My problem comes after that as what i get is a sort of objectIds array in the form of: [64916, 67266, 67237, 64511,...,] How to i manage to split/parse each ID from this array in a flowfile attribute so that i can send it along with other data/headers to the InvokeHTTP processor? I thought to use SplitJson processor but i am having difficulties to understand its usage in this case. Any help much appreciated!
... View more
Labels:
- Labels:
-
Apache NiFi
11-02-2017
09:55 AM
1 Kudo
My dataflow takes some files from HDFS and after some processing i want to send en email to specific accounts as notification but with the flowfile itself included as an attachment in the mail. This is the configuration of the PutEmail processor: Lets say i wanna use my gmail account to send the emails, then the SMTP authorization should be my gmail credentials and the sending address the one of my gmail account right? This is the error message i receive, any ideas on what is going wrong here?
... View more
Labels:
- Labels:
-
Apache NiFi
11-01-2017
03:14 PM
Hi @Abdelkrim Hadjidj for now i will implement it with GetFTP, Nifi is in service provider network and i cannot upgrade at will 😞 Do you maybe know a way to tell GetFTP not to download files that have already been downloaded in the past to avoid unneccesary buffers?
... View more
11-01-2017
01:48 PM
Hi @Abdelkrim Hadjidj thanks for the reply. My Nifi version is 1.1.0 so what you say makes sense. My flows are not so time-sensitive, meaning i can delay the ingestion for a couple of hours, but i want to understand a bit better the operations: This is the timestamp in the FTP server of the last file transfered by this Nifi flow (via Data Provenance) Now, if i schedule the LisFTP processor to fire e.g. today at 15:00 i would expect that the file would be parsed with no problem. This bug means that the file would be never parsed as long as it is the last modified file in this location? So in other words, ListFTP/HDFS/whatever performs a listing only if it sees that there are files with most recent timestamp than the last transfered in the directory? Also you mention to scheule cron for 3 & a bit later, is there an option to have 2 scheduling plans for one processor? As far as i know, with cron you can only say something like: run this every 5 mins of that hour or so. Thanks in advance!
... View more
11-01-2017
09:44 AM
Trying to get files from an FTP server with ListFTP/FetchFTP, but these speicifc processors are so confusing to me. I have scheduled ListFTP to fire also with Cron and time-driven every 10 secs or so, but even though it shows a task is executed in the ListFTP processor, no flowfiles come out of it! GetFTP works fine, but i wanna implement it with List/Fetch to get only the new files in the dir. Initially i scheduled the flow with cron at 03:00 last night when i knew a new file would become available in the FTP server around 1. However this morning i saw that nothing was transfered to HDFS. So i changed scheduling to every 60 secs to test it right away and guess what, i got the new file! So i thought Cron is the problem, deleted the test-file from HDFS and scheduled the flow to run at 10:00 this morning, as expected a task was created but no flowfiles were passed to FetchFTP. Switching back to timer-driven scheduling to get the file i deleted back to HDFS from the FTP server, but this time there are no flowfiles created even for this scheduling option. What is going on here guys?
... View more
Labels:
- Labels:
-
Apache NiFi
10-12-2017
03:10 PM
SplitText in between did the trick, amazing tip thanks a lot! Btw the ls -l output contains also other stuff like permissions etc so a rule for ExtractText to parse only the zip filenames is also needed, but thanks anyway!
... View more
10-12-2017
02:27 PM
Hi @Shu i ve posted a new question here, hope what i am trying to do in this use case is clear!
... View more
10-12-2017
02:25 PM
Hi @Alexandru Anghel, ive uploaded a new question with my whole use case and logic here. Any help really appreciated!
... View more
10-12-2017
02:18 PM
In the flow below, the goal is to take some zip archives from an FTP server, unzip them and send them to HDFS.
The implication is that due to rerouting issues in this specific FTP location, i cannot use the built-in Get/List/FetchFTP processors as
they fail to follow the proper rerouting. What i can do is use a command line utility from the Nifi server that can handle rerouting, and indeed ncftp in this case does the trick.
So my plan is to use ExecuteProcess to run the bash script: ncftpget -f login.txt /home/user/test /path/to/remote/*.zip > /dev/null 2>&1
ls /home/user/test | grep ".zip" The first line gets the wanted zip archives from the FTP server while redirects all output/error streams, as we want to parse only the output of
the second line which lists the contents of the specified directory and parses the ones with 'zip' extension.
What i am trying to do is to recreate the proper filenames between ExtractText --> UpdateAttribute and pass them to FetchFile --> UnpackContent --> PutHDFS. So the output of the ExecuteProcess is something like: file1_timestamp.zip
file2_timestamp.zip
file3_timestamp.zip
.....
file100_timestamp.zip Next, ExtractText processor with an added property 'filename': '\w+.zip' looks for this regex in the flowfile content and outputs a flowfile with new attributes
filename1,filename2...filename100 for each match. Subsequently, UpdateAttribute specifies the local path the zip archives have been placed
from our bash script ('/home/user/test' in this case), as well as the proper filename so that ${path}/${filename} are passed to the rest of the flow for fetching, unpacking and finally putting to HDFS. The problem i have is that only the first match is passed to the rest of the flow as only this match corresponds to the 'filename' attribute. The other filenames are parsed according to the ExtractText
processor to the attributes 'filename.2', 'filename.3'... 'filename.100'.
I would like to find a way to update the attributes passed to FetchFile with some kind of incremental counter. I tried to configure the FetchFile processor with File to Fetch property
as ${path}/${filename: nextInt()} but this just looks for 'file_timestamp.zip#' filenames in the specified path that ofc are not there.
... View more
Labels:
- Labels:
-
Apache NiFi
10-12-2017
09:05 AM
CompressContent works fine for gzip archives. Thanks a lot, still exploring Nifi processors possibilities
... View more
10-12-2017
08:52 AM
I found a way to create empty flowfiles, update the attributes with proper authorization headers and send the request with InvokeHTTP.
... View more
10-10-2017
03:13 PM
Hi @Shu thanks for all the help, i am not sure i follow your logic here though: The flow starts with the ListHDFS processor where the directory is specified in HDFS: e.g. /user/foivos The FetchHDFS processor follows with the HDFS filename specification. Lets say we want to take only csv files so the HDFS filename property is ${path}/${filename:endsWith('csv')} The final processor is the ExecuteStreamCommand where:
Command path: /path/to/script.sh Command arguments: ${filename}, but where does the processor takes this value from?? Is it the ${filename} property parsed from the FetchHDFS processor where also the filtering is being done for csv files?
... View more
10-10-2017
10:19 AM
I have a directory with zip archives in the local filesystem of the Nifi server, and i would like to create a flow that unzips these archives with a bash script and then puts them in HDFS. The problem i have is that i cannot direct the output of the bash script in a correct way to the PutHDFS processor so that it parses the unzipped files. 1) With the use of ExecuteStreamCommand processor i have 2 options for the outgoing flow, the original relationship that contains the initial zipped archive and the outputstream relationship which it should be what i am looking for but it transfers only an empty file with the same name with the original. How should be this processor be configured when it runs a bash script/command to correctly contain the files produced from this script/command? 2) With the use of ExecuteProcess processor, where there is only a success/failure relationship and also this does not help to pass the outgoing flow as input of the PutHDFS processor to move the unzipped files to HDFS. Any help would be greately appreciated!
... View more
Labels:
- Labels:
-
Apache NiFi