Member since
04-02-2019
5
Posts
0
Kudos Received
0
Solutions
04-18-2019
07:50 PM
@Matt Burgess I certainly need some practice using the Jolt spec, but this does exactly what I needed. Thank you for you help!
... View more
04-18-2019
06:17 PM
I have the following example JSON in a flowfile (data has been randomly imputed for this example) : {
"TrackingRequestFirstIp":"192.10.01.01",
"TrackingRequestLastIp":"100.120.123.456",
"TrackingRequestCreationTime":"05:49:55",
"TrackingRequestCreationDate":"20171013",
"TrackingRequestTrackingNumber":"12349876"
} Here is my AVRO schema: {
"name": "IpReports",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{"name": "DataSourceInformation",
"type":{
"type":"map",
"values": {
"name":"DataSourceInformation",
"type":"record",
"fields": [
{"name":"DataSourceUuid", "type":["null","string"], "default":null},
{"name":"DataOwnerUuid", "type":["null","string"], "default":null},
{"name":"RecordUuid", "type":["null","string"], "default":null]
}}
},
{"name": "TrackingRequestInformation",
"type": {
"type":"map",
"values": {
"name":"TrackingRequestInformation",
"type":"record",
"fields": [
{"name": "TrackingRequestCreationTime", "type":["null", "string"], "default":null, "aliases":["time_cst"]},
{"name": "TrackingRequestCreationDate", "type":["null", "string"], "default":null, "aliases":["date_cst"]},
{"name": "TrackingRequestTrackingNumber", "type":["null", "string"], "default":null, "aliases":["fedex_airbill"]},
{"name": "TrackingRequestFirstIp", "type":["null", "string"], "default":null, "aliases":["source_ip"]},
{"name": "TrackingRequestLastIp", "type":["null", "string"], "default":null, "aliases":["dest_ip"]},
{"name": "TrackingRequestFirstCityName", "type":["null", "string"], "default":null, "aliases":["source_ip_city"]},
{"name": "TrackingRequestFirstIpLat", "type":["null", "double"], "default":null, "aliases":["source_ip_lat"]},
{"name": "TrackingRequestFirstIpLong", "type":["null", "double"], "default":null, "aliases":["source_ip_long"]},
{"name": "TrackingRequestFirstIpCountryName", "type":["null", "string"], "default":null, "aliases":["source_ip_country"]},
{"name": "TrackingRequestFirstIpCountryCode", "type":["null", "string"], "default":null, "aliases":["source_ip_country_iso"]},
{"name": "TrackingRequestFirstIpPostalCode", "type":["null", "string"], "default":null, "aliases":["source_ip_country_postal"]},
{"name": "TrackingRequestFirstIpGeoLocation", "type":["null", "string"], "default":null, "aliases":["source_location"]},
{"name": "TrackingRequestLastCityName", "type":["null", "string"], "default":null, "aliases":["dest_ip_city"]},
{"name": "TrackingRequestLastIpLat", "type":["null", "double"], "default":null, "aliases":["dest_ip_lat"]},
{"name": "TrackingRequestLastIpLong", "type":["null", "double"], "default":null, "aliases":["dest_ip_long"]},
{"name": "TrackingRequestLastIpCountryName", "type":["null", "string"], "default":null, "aliases":["dest_ip_country"]},
{"name": "TrackingRequestLastIpCountryCode", "type":["null", "string"], "default":null, "aliases":["dest_ip_country_iso"]},
{"name": "TrackingRequestLastIpPostalCode", "type":["null", "string"], "default":null, "aliases":["dest_ip_country_postal"]},
{"name": "TrackingRequestLastIpGeoLocation", "type":["null", "string"], "default":null, "aliases":["dest_location"]}]
}}
}]
} Further on in my nifi process group, I will be enriching the original JSON and adding the missing fields. I would like to convert the original JSON above into the JSON below using my AVRO schema: {
"DataSourceInformation": {
"DataSourceUuid":null,
"DataOwnerUuid":null,
"RecordUuid":null
},
"TrackingRequestInformation": {
"TrackingRequestFirstIp":"192.10.01.01",
"TrackingRequestLastIp":"100.120.123.456",
"TrackingRequestCreationTime":"05:49:55",
"TrackingRequestCreationDate":"20171013",
"TrackingRequestTrackingNumber":"12349876",
"TrackingRequestFirstIpLat":null,
"TrackingRequestFirstIpLong":null,
... ... ... ... ...
"TrackingRequestLastIpGeoLocation":null
}
} However, when I use a convert record processor, I get the following output: [ {
"DataSourceInformation" : null,
"TrackingRequestInformation" : null
} ] Any thoughts on what I am doing wrong or suggestions on how to correct it?
... View more
Labels:
- Labels:
-
Apache NiFi
04-09-2019
11:32 PM
Hi Matt. Works like a charm, thanks for your help!
... View more
04-03-2019
04:42 PM
My goal is to take in a large PDF (34MB), split it into smaller, 2-page PDFs, and send each of the new 2-page PDFs out as a new flowfile for further processing. I've provided the code I have so far below, most of which came from the ExecuteScript Cookbook series. It looks as though the upstream flowfile is not even entering the processor (photo attached). Any help would be greatly appreciated! import org.apache.pdfbox.pdmodel.PDDocument
import org.apache.pdfbox.multipdf.Splitter
import org.apache.commons.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def pdf = ""
session.read(flowFile, {inputStream ->
pdf = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)
def document = PDDocument.load(pdf)
def splitter = new Splitter()
splitter.setSplitAtPage(2)
def forms = splitter.split(document)
//Iterator<PDDocument> iterator = forms.listIterator();
for(form in forms)
{
form.close()
newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, {outputStream ->
outputStream.write(form.getBytes(StandardCharsets.UTF_8)) as OutputStreamCallback
})
session.transfer(newFlowFile, REL_SUCCESS)
}
... View more
Labels:
- Labels:
-
Apache NiFi