Member since
04-02-2019
5
Posts
0
Kudos Received
0
Solutions
08-25-2020
02:11 PM
I am using a wait/notify process to trigger a notification indicating that a group of FlowFiles has completed processing. I am using multiple Signal Counter Names (e.g. failed, succeeded) and FlowFiles are released by the corresponding Wait processor once the sum total of all the unique Signal Counters for a given Release Signal Identifier equals the Target Signal Count (in this case, the target signal count is equal to the number of FlowFiles that must be processed. Below is an example of my wait processor configured to use the attribute "keyId" (shared across all related FlowFiles) as the Release Signal Identifier and update the Signal Counter named "succeeded" under the given Release Signal Identifier. I would like to be able to query the value of a specific Signal Counter (e.g. succeeded) for a given Release Signal Identifier from the DistributedMapCache configured in the Wait/Notify processors. Is this possible, and if so, how can I do this (preferably using built in NiFi components or alternatively in Groovy)?
... View more
Labels:
- Labels:
-
Apache NiFi
04-18-2019
07:50 PM
@Matt Burgess I certainly need some practice using the Jolt spec, but this does exactly what I needed. Thank you for you help!
... View more
04-18-2019
06:17 PM
I have the following example JSON in a flowfile (data has been randomly imputed for this example) : {
"TrackingRequestFirstIp":"192.10.01.01",
"TrackingRequestLastIp":"100.120.123.456",
"TrackingRequestCreationTime":"05:49:55",
"TrackingRequestCreationDate":"20171013",
"TrackingRequestTrackingNumber":"12349876"
} Here is my AVRO schema: {
"name": "IpReports",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{"name": "DataSourceInformation",
"type":{
"type":"map",
"values": {
"name":"DataSourceInformation",
"type":"record",
"fields": [
{"name":"DataSourceUuid", "type":["null","string"], "default":null},
{"name":"DataOwnerUuid", "type":["null","string"], "default":null},
{"name":"RecordUuid", "type":["null","string"], "default":null]
}}
},
{"name": "TrackingRequestInformation",
"type": {
"type":"map",
"values": {
"name":"TrackingRequestInformation",
"type":"record",
"fields": [
{"name": "TrackingRequestCreationTime", "type":["null", "string"], "default":null, "aliases":["time_cst"]},
{"name": "TrackingRequestCreationDate", "type":["null", "string"], "default":null, "aliases":["date_cst"]},
{"name": "TrackingRequestTrackingNumber", "type":["null", "string"], "default":null, "aliases":["fedex_airbill"]},
{"name": "TrackingRequestFirstIp", "type":["null", "string"], "default":null, "aliases":["source_ip"]},
{"name": "TrackingRequestLastIp", "type":["null", "string"], "default":null, "aliases":["dest_ip"]},
{"name": "TrackingRequestFirstCityName", "type":["null", "string"], "default":null, "aliases":["source_ip_city"]},
{"name": "TrackingRequestFirstIpLat", "type":["null", "double"], "default":null, "aliases":["source_ip_lat"]},
{"name": "TrackingRequestFirstIpLong", "type":["null", "double"], "default":null, "aliases":["source_ip_long"]},
{"name": "TrackingRequestFirstIpCountryName", "type":["null", "string"], "default":null, "aliases":["source_ip_country"]},
{"name": "TrackingRequestFirstIpCountryCode", "type":["null", "string"], "default":null, "aliases":["source_ip_country_iso"]},
{"name": "TrackingRequestFirstIpPostalCode", "type":["null", "string"], "default":null, "aliases":["source_ip_country_postal"]},
{"name": "TrackingRequestFirstIpGeoLocation", "type":["null", "string"], "default":null, "aliases":["source_location"]},
{"name": "TrackingRequestLastCityName", "type":["null", "string"], "default":null, "aliases":["dest_ip_city"]},
{"name": "TrackingRequestLastIpLat", "type":["null", "double"], "default":null, "aliases":["dest_ip_lat"]},
{"name": "TrackingRequestLastIpLong", "type":["null", "double"], "default":null, "aliases":["dest_ip_long"]},
{"name": "TrackingRequestLastIpCountryName", "type":["null", "string"], "default":null, "aliases":["dest_ip_country"]},
{"name": "TrackingRequestLastIpCountryCode", "type":["null", "string"], "default":null, "aliases":["dest_ip_country_iso"]},
{"name": "TrackingRequestLastIpPostalCode", "type":["null", "string"], "default":null, "aliases":["dest_ip_country_postal"]},
{"name": "TrackingRequestLastIpGeoLocation", "type":["null", "string"], "default":null, "aliases":["dest_location"]}]
}}
}]
} Further on in my nifi process group, I will be enriching the original JSON and adding the missing fields. I would like to convert the original JSON above into the JSON below using my AVRO schema: {
"DataSourceInformation": {
"DataSourceUuid":null,
"DataOwnerUuid":null,
"RecordUuid":null
},
"TrackingRequestInformation": {
"TrackingRequestFirstIp":"192.10.01.01",
"TrackingRequestLastIp":"100.120.123.456",
"TrackingRequestCreationTime":"05:49:55",
"TrackingRequestCreationDate":"20171013",
"TrackingRequestTrackingNumber":"12349876",
"TrackingRequestFirstIpLat":null,
"TrackingRequestFirstIpLong":null,
... ... ... ... ...
"TrackingRequestLastIpGeoLocation":null
}
} However, when I use a convert record processor, I get the following output: [ {
"DataSourceInformation" : null,
"TrackingRequestInformation" : null
} ] Any thoughts on what I am doing wrong or suggestions on how to correct it?
... View more
Labels:
- Labels:
-
Apache NiFi
04-09-2019
11:32 PM
Hi Matt. Works like a charm, thanks for your help!
... View more
04-03-2019
04:42 PM
My goal is to take in a large PDF (34MB), split it into smaller, 2-page PDFs, and send each of the new 2-page PDFs out as a new flowfile for further processing. I've provided the code I have so far below, most of which came from the ExecuteScript Cookbook series. It looks as though the upstream flowfile is not even entering the processor (photo attached). Any help would be greatly appreciated! import org.apache.pdfbox.pdmodel.PDDocument
import org.apache.pdfbox.multipdf.Splitter
import org.apache.commons.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def pdf = ""
session.read(flowFile, {inputStream ->
pdf = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)
def document = PDDocument.load(pdf)
def splitter = new Splitter()
splitter.setSplitAtPage(2)
def forms = splitter.split(document)
//Iterator<PDDocument> iterator = forms.listIterator();
for(form in forms)
{
form.close()
newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, {outputStream ->
outputStream.write(form.getBytes(StandardCharsets.UTF_8)) as OutputStreamCallback
})
session.transfer(newFlowFile, REL_SUCCESS)
}
... View more
Labels:
- Labels:
-
Apache NiFi