Member since
07-28-2017
47
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
15103 | 03-13-2018 12:04 PM | |
8535 | 10-12-2017 08:52 AM |
03-13-2018
12:04 PM
Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.
... View more
03-13-2018
11:09 AM
@Matt Burgess Seems i could do all API operations within the process function of the CallBack class, but in this case i need access to the flowfile attributes besides the content within the class scope. Tried to pass a flowfile reference to the class definition, but failed. Any ideas?
... View more
03-12-2018
12:21 PM
I am trying to create a Python script in NiFi that: Reads some attributes from an incoming flowfile Read the json content of the flowfile & extract specific fields Write attributes to outgoing flowfile Overwrite incoming flowfile with new content that is created in the script (e.g. API call that returns new json) and send it to SUCCESS relationship OR remove the old flowfile and create new with desired content What i ve done so far: import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit() My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach?
Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ?
Any help much appreciated!
... View more
Labels:
- Labels:
-
Apache NiFi
02-19-2018
02:14 PM
@Timothy Spann Great article, but i am a bit confused between spark-streaming and spark-structured streaming intetgration with Kafka. Which one is advised to use for similar use cases, is spark-streaming planned to be deprecated soon?
... View more
01-29-2018
03:59 PM
Hello, Trying to setup the NiFi dataflow for the HDF tutorial, interested in the Kafka part but as far as i understand i must first complete the NiFi part. However the ExecuteProcess processor gives 'Permission denied' error for the generate.sh script: Even though i ve given all permissions to the whole repo: Any ideas?
... View more
Labels:
- Labels:
-
Apache NiFi
-
Cloudera DataFlow (CDF)
01-10-2018
02:45 PM
@Greg Keys Your blog is great to clarify a bit the approach one can use, however i run into problems with diverse format of logs that span through multiple lines and then with this approach one logline can be split in multiple flowfiles which is not desired. Is there a way to use SplitText to split the files in specific patterns?
... View more
01-04-2018
03:14 PM
@Greg Keys I would like to apply the same approach, but i prefer to extract logs per process group/flow instead at the processor level. However i see no process group/flow related information in the NiFi logs. Further, i see no processor name as well but only processor type, meaning i have around 20 ExtractText processors with different (customizable) names, it is quite some effort to extract information based on processor IDs only.
... View more
12-14-2017
03:49 PM
1 Kudo
Hi @Shu yes that was exactly the problem, now the individual CSVs are created just fine but in the meantime another problem occured. When the individual CSVs are merged with the MergedContent processor then the Merged CSV is all in one line instead of seperate lines. Is there a way to bypass this? MergeContent:
... View more
12-14-2017
10:19 AM
Hi @Matt Burgess, here is an example of the incoming JSON files, all have same attributes: {
"features": [
{
"feature": {
"paths": [
[
[
214985.27600000054,
427573.33100000024
],
[
215011.98900000006,
427568.84200000018
],
[
215035.35300000012,
427565.00499999896
],
[
215128.48900000006,
427549.4290000014
],
[
215134.43699999899,
427548.65599999949
],
[
215150.86800000072,
427546.87900000066
],
[
215179.33199999854,
427544.19799999893
]
]
]
},
"attributes": {
"attribute1": "value",
"attribute2": "value",
"attribute3": "value",
"attribute4": "value",
}
}
]
}
EvaluateJSONpath: Where i add properties for each attribute i want to parse: attribute1: $.features[0].attributes.attribute1 etc. etc. ReplaceText: I think something goes wrong in my configuration here, because even before the MergeContent the single CSVs created per JSON file contain hundreds of duplicate rows, whereas it should be just one row per CSV that they are gonna be later merged into a big CSV file.
... View more
12-13-2017
12:42 PM
i ve no idea why my screenshots are doubleposted, whatever i tried to fix it fails 🙂
... View more