Member since
09-18-2018
92
Posts
5
Kudos Received
0
Solutions
07-24-2019
07:29 PM
I've verified that if the key is set to a field that is in the root level of my JSON then I am able to update/upsert properly. For example, I tried this with the evt_type field: This worked fine. So my only question is how do I handle a field that is one level down in the JSON as is the case with stdds_id {"evt_data": {"stdds_id": "stdds_id_value" , ...}}
... View more
07-24-2019
07:16 PM
By the way, (if I need to) I am able to use EvaluateJSONPath to extract the stdds_id and place it as an attribute on the flowfile (see STDDS_ID) : Id use STDDS_ID as the key into mongo, but I'm not sure how to add this to the flowfile content, so that I can update/upsert using this key.
... View more
07-24-2019
05:57 PM
I have a sequence of flowfiles that I need to put to mongo. A sample is at the bottom of this question. The flowfile contains JSON with a field "evt_data": {"stdds_id": "stdds_id_value" .. I need that stdds_id_value to be the key for update/upsert into mongo. I'm looking for help with the flow configuration that will make this work. My current PutMongo (non-working attempt) configuration is this: I'm trying to put a record that is keyed on the evt_data.stdds_id value as see in the following flowfile JSON: In other words the key for the document in Mongo would be "TARGET-STDDS-KCLT-157481920517" This doesnt work, and I end up seeing multiple documents in mongo for that same key. I should only see one per distinct key. What is the proper way to set the update key?
... View more
Labels:
- Labels:
-
Apache NiFi
07-23-2019
06:51 PM
PutMongo does not like my $or
... View more
07-23-2019
05:38 PM
I need to do an update/upsert into Mongo. Essentially the command that I need to run is seen in the following command (this works in the mongo command line client): Notice in the first update I search for the document that matches the specified STDDS_DST_ID. In the second update, I match any of several Ids including the one that was already matched. In this simple example. I have a set of linked Ids: TFM_ID, FVIEW_ID, STDDS_DST_ID. The set of linked IDs is unique. For in this example that per-set distinction guarantees that you wont find STDDS_DIST_ID 100 associated with another FVIEW_ID, or TFM_ID. You'll only find it with FVIEW_ID 3000 and TFM_ID 300000. So assuming that I have a flowfile that contains some number of fields (e.g. fld1, fld2, fl3), and one or more of the ids: TFM_ID, STDDS_DST_ID, FVIEW_ID, how can I configure PutMongo so that it will update/upsert the appropriate document (that one that matches one or more of these IDs)? Again, in my mind, PutMongo simply needs to be configured consistent with the update you see in the image above. I just dont have much experience with PutMongo. Looking at the documentation, I believe I must do the following: Set Mode to update Set Upsert to true Leave Update Query Key Empty Set the Update Query to something such as the first argument in the sample command: { $or: [{"TFM_ID": "300000"}, {"FVIEW_ID": "3000"},{"STDDS_DST_ID": "100"}]} Set the flowfile content to the data to place in the document (including the $set): {$set: {"fld2": "fld2_val", "fld3": "fld3_val", "TFM_ID": "300000", "FVIEW_ID": "3000","STDDS_DST_ID": "100"}} Are my assumptions on the mark?
... View more
Labels:
- Labels:
-
Apache NiFi
07-16-2019
04:23 PM
Thanks @Nico Verwer. I'll give that a shot, and will be sure to accept your response as an answer, once I verify.
... View more
07-04-2019
10:29 AM
HI @Shu. Could you please explain what sysdate, current_date, etc would do for me with the spark job? I dont fully understand how to use them and the benefits that this technique would offer.
... View more
07-03-2019
01:00 PM
@Shu I like your idea of creating daily archives (Option 3 above). How do I ensure that spark jobs that I create to process those daily files run on the datanode that they are stored on? Does yarn do this by default? I've not yet used yarn. I've only used HDFS. I am hoping to eventually use k8s (kubernetes).
... View more
07-03-2019
12:56 PM
Hi @Shu. Thank you very much for your thoughts. This is the kind of feedback that I was hoping for. I'll absolutely do my best to understand your recommendation. It sounds like I am not completely off-base in the way that I hope to use HDFS. It does sound like you are confirming that I must figure out how to accumulate large files, prior to driving them into HDFS. I will look at the tools and methods that you suggest. Thanks for your insights
... View more
07-02-2019
06:29 PM
I've used the PutHDFS processor as I've started to understand how to deal with big data environments. Up until now I've been putting very small files into HDFS. This seems to be architecturally bad practice. The HDFS block size defaults to about 128 MB, and the hadoop community recommendation seems to be that applications (that write to HDFS) should write files that are GB in size, or even TB. I'm trying to understand how to do this with Nifi. Part of my concern is a concern for the data analysts. What is the best way to logically structure files that are appropriate for HDFS? Currently the files that I am writing contain small JSON objects, or lists. I use MergeRecord to intentionally make the file I write larger. However my JSON objects accumulate fast thousands of JSON records per second potentially. For the Big Data/Nifi experts, I'd appreciate any thoughts relative to the best way to use Nifi to support streaming large data objects into HDFS.
... View more
Labels:
- Labels:
-
Apache NiFi
06-04-2019
03:45 PM
Perhaps im just looking for .. too easy. Tutorials, such as this one, suggest that the process is not simple... or at least not without a lot of steps https://www.youtube.com/watch?v=zdJHrbTC0eE
... View more
06-04-2019
03:41 PM
I'm trying to do a PutHDFS to a directory on a computer running HADOOP in single node fashion. I've verified that hadoop is properly setup. I have also verified the connection from the Nifi PutHDFS processor to hadoop. However I see a permission denied. I'd like to solve this the "right way" using kerberos. I'm looking for the cleanest tutorial focused on this. I've found a bunch of fragments on the web, but nothing that is clean and complete.
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache NiFi
06-03-2019
01:41 PM
I've finally worked through this challenge. It was a bit of a struggle for me, since there is inadequate documentation on some of these lower-level developer details. However I did take the time to capture what I learned along the way. Currently I placed this learning here. If I capture more detail on this process, i will place it in the same NiFi issue.
... View more
05-31-2019
03:48 PM
Hi @Matt Burgess. I've been able to attach a debugger.. yet i cant seem to set breakpoints. I've attempted to set breakpoints in RunNIFI (in the nifi-bootstrap project). I've set breakpoints at the "status", and "stop" locations, and at many other locations. I've also tried to suspend start to allow for an attach. I've commented on this in detail under a NIFI issue I created, since I think it deserves better attention from the developers (documentation support for new developers). Anyways if you can give me some guidance relative to actually getting my first breakpoint placed in a location where I actually hit the breakpoint, I'd appreciate it.
... View more
05-30-2019
12:43 PM
@Matt Burgess I've got a couple of questions for you. What nifi maven project contains the core application that listens on port 8000? In other words, what component implements support for the "Enable Remote Debugging" property that you describe above? Do you have a preferred IDE for NIFI development? I'd think that the IDE with the best support for maven integration would be the one. Perhaps they are all comparable. I do have an option to use other than eclipse: eclipse (on centos 7) is what I'm trying for now.
... View more
05-30-2019
12:25 PM
Thank you @Matt Burgess. I'll try this today hopefully. We've built our first custom processor, and it seems like the best way to work with that is to run everything inside a single debug environment. Anyways, as usual, I appreciate your prompt and thorough response.
... View more
05-29-2019
12:38 PM
I've successfully cloned and built the NIFI system. It built cleanly. Though initially I experienced a few random unit test failures. That was solved (it was actually a network configuration issue on my end that caused this). The instructions I followed are here: https://nifi.apache.org/quickstart.html The built system is in nifi-assembly/target. I want to run the primary NIFI backend application in the debugger (not the web app, but rather the backend). I have the nifi set of maven projects loaded into eclipse. Can someone provide me information that would assist in getting the app up and running in the debugger? For example: What project contains the main entry point? Aside from running the main application, how do I launch the web app? I'm assuming it is launched separately from the main application.
... View more
Labels:
- Labels:
-
Apache NiFi
04-25-2019
03:01 PM
Please see the template i added in my comment above.. it has the source generator that you need (Test_Convert_Record_2.xml)
... View more
04-25-2019
02:19 PM
Hi @Sumit Das Thanks for your response. I came to the same conclusion. I have indeed used XSLT before along with TransformXML. I was hoping that the new capability with XMLReader would give me an alternative option. Given the current state of the inference capability, I think that you are right. Thanks again for taking the time to respond. I did write an issue on the NIFI issue board: https://issues.apache.org/jira/browse/NIFI-6241. Hopefully this will get some attention.
... View more
04-24-2019
06:02 PM
I've added a simple template that someone can use to replicate this. Test_Convert_Record_2.xml
... View more
04-24-2019
06:01 PM
My simple flow: The input XML: The output JSON: ConvertRecord Configuration: XMLReader Configuration: Several questions arise: Why are position and ncsmTrackData null? Why are all the attributes on fltdMessage being ignored? How can I see the inferred schema? What is a proper schema to correctly transform those two null fields, (along with the rest of the transformation as it currently looks ok)?
... View more
Labels:
- Labels:
-
Apache NiFi
04-11-2019
07:35 PM
Wow.. I figured it out. The essence of the solution was to reduce my data to the form seen in the following, and then to create an attribute on each flow file that corresponded to the facilityIdentifier. Then I was able to use MergeRecord with an appropriate binning strategy. I've gotten the flow to a point where I've reduced my flow-file record to the following: {
"aircraftId" : "JBU975",
"facilityIdentifier" : "KZDC",
"departure_airport" : "KPHL",
"arrival_airport" : "KFLL",
"gufi" : "KN53659300",
"igtd" : "2019-04-09T18:00:00Z",
"speed" : 428,
"assigned_altitude" : 245,
"latitude" : "MapRecord[{seconds=41, minutes=47, degrees=38, direction=NORTH}]",
"longitude" : "MapRecord[{seconds=35, minutes=26, degrees=075, direction=WEST}]",
"timeAtPosition" : "2019-04-09T18:28:53Z"
} I have a queue of these that I now need to process. My goal is to effectively route on the facilityIdentifier value. Each flow-file has an attribute that is the facilityIdentifier. In the following you see that FACILITY_ID is KZDC. Now the MergeRecord configuration (notice the Correlation Attribute Name): The end result, is I placed over 90 flow-files into just 9 MergeRecord preserved the attribute FACILITY_ID. So now I can use this to push data to an appropriate Kafka topic
... View more
04-11-2019
03:01 PM
Two other things I'm not sure how to do: Discard flowfile if "trackInformation": null eliminate leading 0 from an integer
... View more
04-11-2019
02:45 PM
The following is what I have so far: The piece that I am struggling with is the "MapRecord" within latitudeDMS, and longitudeDMS [
{
"operation": "shift",
"spec": {
"trackInformation": {
"qualifiedAircraftId": {
"aircraftId": "aircraftId",
"computerId" : {
"facilityIdentifier": "facilityIdentifier"
},
"departurePoint" : {
"airport": "departure_airport"
},
"arrivalPoint" : {
"airport": "arrival_airport"
}
},
"speed": "speed"
}
}
}
]
... View more
04-11-2019
02:28 PM
My flow-file object looks like the following {
"trackInformation" : {
"qualifiedAircraftId" : {
"aircraftId" : "UAL894",
"computerId" : {
"facilityIdentifier" : "KZDC",
"idNumber" : "35Y"
},
"gufi" : "KW59049300",
"igtd" : "2019-04-09T18:15:00Z",
"departurePoint" : {
"airport" : "KIAD"
},
"arrivalPoint" : {
"airport" : "KLAX"
}
},
"speed" : 372,
"reportedAltitude" : {
"assignedAltitude" : {
"simpleAltitude" : 254
}
},
"position" : {
"latitude" : {
"latitudeDMS" : "MapRecord[{seconds=13, minutes=46, degrees=38, direction=NORTH}]"
},
"longitude" : {
"longitudeDMS" : "MapRecord[{seconds=44, minutes=52, degrees=078, direction=WEST}]"
}
},
"timeAtPosition" : "2019-04-09T18:28:53Z",
"ncsmTrackData" : {
"eta" : "MapRecord[{etaType=ESTIMATED, timeValue=2019-04-09T23:14:03Z}]",
"rvsmData" : "MapRecord[{futureCompliance=true, equipped=true, currentCompliance=true}]",
"arrivalFixAndTime" : "MapRecord[{fixName=HLYWD, arrTime=2019-04-09T22:53:03Z}]",
"departureFixAndTime" : "MapRecord[{fixName=OTTTO, arrTime=2019-04-09T18:23:32Z}]",
"nextEvent" : "MapRecord[{longitudeDecimal=-78.88557158170266, latitudeDecimal=38.76905063195677}]"
}
},
"flightPlanAmendmentInformation" : null,
"boundaryCrossingUpdate" : null,
"ncsmFlightModify" : null
} I need to remove the fields: flightPlanAmendmentInformation, boundaryCrossingUpdate, ncsmFlightModify. What is the spec that will allow me to do that? I will then want to add a shift spec, so that fields like "speed" and "position" are at the top of the output . The ultimate desired output is as follows: {
"aircraftId": "UAL894",
"facilityIdentifier": "KZDC",
"gufi": "KW59049300",
"igtd": "2019-04-09T18:15:00Z",
"departure_airport": "KIAD",
"arrival_airport": "KLAX",
"speed": 372,
"assignedAltitude": 254,
"latitude": {
"seconds": 13,
"minutes": 46,
"degrees": 38,
"direction": "NORTH"
},
"longitude": {
"seconds": 44,
"minutes": 52,
"degrees": 78,
"direction": "WEST"
},
"timeAtPosition": "2019-04-09T18:28:53Z"
}
... View more
Labels:
- Labels:
-
Apache NiFi
04-10-2019
05:39 PM
I think this may be a bug in the NIFI JoltTransformRecord processor. What is the best way to register this bug?
... View more
04-10-2019
02:26 PM
24:40,136 ERROR [Timer-Driven Process Thread-5] o.a.n.p.jolt.record.JoltTransformRecord JoltTransformRecord[id=06fd25cf-016a-1000-780f-11032c386125] Unable to write transformed records StandardFlowFileRecord[uuid=6626d078-5d3d-4156-988f-af57bd8efd7e,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1554901130806-97, container=default, section=97], offset=171543, length=86903],offset=0,name=6626d078-5d3d-4156-988f-af57bd8efd7e,size=86903] due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [MapRecord[{timeAtPosition=2019-04-09T18:24:27Z, reportedAltitude=MapRecord[{assignedAltitude=MapRecord[{simpleAltitude=350}]}], qualifiedAircraftId=MapRecord[{computerId=MapRecord[{facilityIdentifier=CCZM}], gufi=KN44418300, aircraftId=ICL922, departurePoint=MapRecord[{airport=KJFK}], arrivalPoint=MapRecord[{airport=EBLG}], igtd=2019-04-09T17:00:00Z}], ncsmTrackData=MapRecord[{eta=MapRecord[{etaType=ESTIMATED, timeValue=2019-04-09T23:18:14Z}], departureFixAndTime=MapRecord[{fixName=MERIT, arrTime=2019-04-09T17:03:10Z}], nextEvent=MapRecord[{longitudeDecimal=-55.45632194020566, latitudeDecimal=46.91005273931668}], rvsmData=MapRecord[{futureCompliance=true, equipped=true, currentCompliance=true}]}], position=MapRecord[{latitude=MapRecord[{latitudeDMS=MapRecord[{minutes=38, degrees=46, direction=NORTH}]}], longitude=MapRecord[{longitudeDMS=MapRecord[{minutes=32, degrees=056, direction=WEST}]}]}], speed=550}]] of type CHOICE[RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD] to Map for field trackInformation because the type is not supported: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [MapRecord[{timeAtPosition=2019-04-09T18:24:27Z, reportedAltitude=MapRecord[{assignedAltitude=MapRecord[{simpleAltitude=350}]}], qualifiedAircraftId=MapRecord[{computerId=MapRecord[{facilityIdentifier=CCZM}], gufi=KN44418300, aircraftId=ICL922, departurePoint=MapRecord[{airport=KJFK}], arrivalPoint=MapRecord[{airport=EBLG}], igtd=2019-04-09T17:00:00Z}], ncsmTrackData=MapRecord[{eta=MapRecord[{etaType=ESTIMATED, timeValue=2019-04-09T23:18:14Z}], departureFixAndTime=MapRecord[{fixName=MERIT, arrTime=2019-04-09T17:03:10Z}], nextEvent=MapRecord[{longitudeDecimal=-55.45632194020566, latitudeDecimal=46.91005273931668}], rvsmData=MapRecord[{futureCompliance=true, equipped=true, currentCompliance=true}]}], position=MapRecord[{latitude=MapRecord[{latitudeDMS=MapRecord[{minutes=38, degrees=46, direction=NORTH}]}], longitude=MapRecord[{longitudeDMS=MapRecord[{minutes=32, degrees=056, direction=WEST}]}]}], speed=550}]] of type CHOICE[RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD] to Map for field trackInformation because the type is not supported
org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [MapRecord[{timeAtPosition=2019-04-09T18:24:27Z, reportedAltitude=MapRecord[{assignedAltitude=MapRecord[{simpleAltitude=350}]}], qualifiedAircraftId=MapRecord[{computerId=MapRecord[{facilityIdentifier=CCZM}], gufi=KN44418300, aircraftId=ICL922, departurePoint=MapRecord[{airport=KJFK}], arrivalPoint=MapRecord[{airport=EBLG}], igtd=2019-04-09T17:00:00Z}], ncsmTrackData=MapRecord[{eta=MapRecord[{etaType=ESTIMATED, timeValue=2019-04-09T23:18:14Z}], departureFixAndTime=MapRecord[{fixName=MERIT, arrTime=2019-04-09T17:03:10Z}], nextEvent=MapRecord[{longitudeDecimal=-55.45632194020566, latitudeDecimal=46.91005273931668}], rvsmData=MapRecord[{futureCompliance=true, equipped=true, currentCompliance=true}]}], position=MapRecord[{latitude=MapRecord[{latitudeDMS=MapRecord[{minutes=38, degrees=46, direction=NORTH}]}], longitude=MapRecord[{longitudeDMS=MapRecord[{minutes=32, degrees=056, direction=WEST}]}]}], speed=550}]] of type CHOICE[RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD, RECORD] to Map for field trackInformation because the type is not supported
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertRecordFieldtoObject(DataTypeUtils.java:693)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertRecordArrayToJavaArray(DataTypeUtils.java:732)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertRecordFieldtoObject(DataTypeUtils.java:691)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertRecordFieldtoObject(DataTypeUtils.java:686)
at org.apache.nifi.processors.jolt.record.JoltTransformRecord.transform(JoltTransformRecord.java:380)
at org.apache.nifi.processors.jolt.record.JoltTransformRecord.onTrigger(JoltTransformRecord.java:333)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
... View more
04-10-2019
02:17 PM
I have seen a similar problem reported here. Worthy of note, is that I think the problem is data specific. There are XML records that seem to work fine in the test flow I've provided. This does not make sense to me however since as I said the JoltTransformJSON works fine for the generated JSON (even for the problem data).
... View more
04-10-2019
01:44 PM
The specific error I see is as follows: Here is the associated template for the flow above. You can use it to verify the problem I am having: Test_Jolt_Transform_Record.xml
... View more
04-10-2019
01:05 PM
Hi @Matt Burgess. I've tried JoltTransformRecord. Its not behaving as I'd expect. In the following you'll see that I generate a single XML record, I convert it using JoltTransformRecord (that fails). I also convert it using the same XMLReader and JSONSetWriter using ConvertRecord. I then pipe that converted JSON to a separate JoltTransformJSON, that is using the same JOLT Transform as the original JoltTransformRecord. The JoltTransformJSON succeeds. The configuration of the JoltTransformRecord is as follows: The overall flow is as follows: What am I missing with the use of the JoltTransformRecord?
... View more