Created 07-04-2018 10:18 PM
I am open to any other nifi processor that that can achieve this as long as it can perform queries in postgresql and result routed to flow file attribute or even modifying the the source code of executesql to achieve this. Thanks!
Created on 07-04-2018 10:51 PM - edited 08-18-2019 02:30 AM
You can achieve this by using these processors
Flow:
1.ExecuteSql //output flowfile is in Avro format 2.SplitRecord //Read the incoming Avro datafile then write as Json/CSV ...etc and Records per split as 1(if you are getting OOM issues use series of Split record processors to split flowfile to 1 record each)
3.ExtractText //add new property with (.*) this regex to extract all the contents of flowfile and add to flowfile attributes(change the Maximum Buffer Size according to flowfile size)
By using this flow you are having each record as flowfile attributes now.
Converting Json records in splitrecord processor would be easier to access the values of keys because once we extract the contents of flowfile in Extracttext then by using jsonPath expression language you can extract the values for the keys easily.
Example:
i'm having json flowfile with {"id":1} and extracted as content attribute to the flowfile then used Updateattribute processor to create id attribute by using the below expression language.
${content:jsonPath('$.id')}
Keep in mind all the attributes associated with the flowfile are stored in memory as if you are adding more attributes to the flowfile results utilizing more memory and it's better to delete all the unused attributes associated to the flowfile to utilize less memory.
Please refer to this link how to use series of split processors.
Created 07-05-2018 06:44 PM
Sorry I wasn't clear, I need to retain the contents if the incoming flowfile in ExecuteSQL, that's why I want the results of the query to output in a flowfile attribute otherwise all the data that I need within the original flowfile will be deleted. Thanks.
Created 07-08-2018 02:40 AM
How large are the incoming flow file contents? If they are fairly small, you could use ExtractText before ExecuteSQL to get the contents into an attribute (i.e. saving them for later), then Shu's approach (with/out the SplitRecord depending on your use case), including ExtractText to get the ExecuteSQL results into an attribute, followed by ReplaceText to restore the original content. Note that even if your use case supports this much use of memory, you'll likely want an UpdateAttribute to delete any attributes you're not using after these suggestions.
PutSQL (as of NiFi 1.5.0 via NIFI-4522) has the kind of capability you're describing (i.e. retain the original content but execute SQL statements), but your use case seems a bit more esoteric with respect to what ExecuteSQL does. Is it possible that LookupAttribute would work for what you're trying to do?
Created 07-09-2018 03:12 AM
The contents of the flow file is 200mb in size, what I am looking to do is retrieve some relevant information for this data in a postgresql so that I can parse the data correctly down the flow, so its necessary that I keep the contents. I have tried to save the contents in an attribute, but it slows the flow down and even causes nodes to disconnect. Haven't looked into LookupAttribute yet, but I tried to modify ExecuteSQL processor so that the query result gets stored into an attribute. I am having issues with that though, as discussed here https://community.hortonworks.com/questions/201941/created-a-custom-nifi-processor-after-placing-nar....
Created on 07-09-2018 01:22 PM - edited 08-18-2019 02:30 AM
-
Short answer is no. The ExecuteSQL processor is written to write the output to the FlowFile's content.
-
There is an alternative solution.
You have some processor currently feeding FlowFiles to your ExecuteSQL processor via a connection. My suggestion would be to feed that same connection to two different paths. The first connection feeds to a "MergeContent" processor via a funnel and the second feeds to your "ExecuteSQL" processor. The ExecuteSQL processor performs the query and retrieves the data you are looking for writing it to the content of the FlowFile. You then use a processor like "ExtractText" to extract that FlowFIles new content to FlowFile Attributes. Finally you use a processor like "ModifyBytes" to remove all content of this FlowFile. Finally you feed this processor to the same funnel as the other path. The MergeContent processor could then merge these two flowfiles using the "Correlation Attribute Name" property (assuming "filename" is unique, that could be used), min/max entries set to 2, and "Attribute Strategy" set to "Keep All Unique Attributes". The result should be what you are looking for.
-
Flow would look something like following:
Having multiple identical connections does not trigger NiFi to write the 200 mb of content twice to the the content repository. a new FlowFile is created but it points to the sam content claim. New content is only generated when the executeSQL is run against one of the FlowFiles. So this flow does not produce any additional write load on the content repo other then when the executeSQL writes its output which i am assuming is relatively small?
-
Thank you,
Matt