Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 3580 | 12-20-2024 05:49 AM | |
| 3826 | 12-19-2024 08:33 PM | |
| 3626 | 12-19-2024 06:48 AM | |
| 2356 | 12-17-2024 12:56 PM | |
| 3115 | 12-16-2024 04:38 AM |
02-21-2024
06:51 AM
Hi @vijay_loyalty , I have never tried what you are doing so I might be wrong here but my understanding is when you override the OnTrigger method in your custom invokehttp processor and use the following statement: FlowFile requestFlowFile = session.get(); Its like you are emptying the queue from the flowfile and the flowfile no longer exists in the session, therefore when you pass the call to the parent class and the parent class try to do the same thing using the same statement there wont be a flowfile hence the error you are getting. I actually tried the following script in an ExecuteGroovyScrip where I called the session.get() twice and it failed: flowFile = session.get()
if(!flowFile) return
flowFile = session.get()
// Processing occurs here
session.transfer(flowFile, REL_SUCCESS) The error message was different but I think both error are related to the same issue. Not sure what you can do about though, I was thinking you have two options: 1- Dont Inherit , instead copy the whole code from the original and create your custom invokehttp processor using the original code plus whatever you need to add. 2- I did not test this so Im not sure if its going to work, but before calling the super.OnTrigger(...) method try to write the flowfile back to the session after adding the attributes using session.write(...) If you find this is helpful please accept solution.
... View more
02-18-2024
09:36 PM
1 Kudo
Hi, The processor sure changed in 2.0 from before , so few things I will check : - Make sure the Request Bondy Enabled is set to true - Make sure the Request Content-Type is set to : multipart/form-data - Make sure to set the Request Multipart Form-Data Name to: file - Once you set the property above, the property Request Multipart Form-Data Filename Enabled becomes visible , make sure it sets to true and it will use the filename attribute in the Content-Disposition header. - Make sure to set the Response Generation Required property to: true (default false) - Finally, I dont think you need the Content-Disposition dynamic property since its already been set through the above properties. I honestly dont like how in 2.0 there is a lot of visibility dependency with some of the properties where their visibility depends on setting the value of others. It makes it confusing specially when you read the documentation and struggle to find all the mentioned properties until you set the right values. I would rather if its all showing or at least less visibility dependency on some of them. Let me know if that works for you or not. If it does, please accept solution. Thanks
... View more
02-18-2024
06:53 AM
1 Kudo
Hi @enam , First thing I would check is if you are using the Response relationship. I cant figure that out from the first screenshot but all I can see is the : Failure, No Retry, Original... Are you capturing the Response relationship? Also have you looked at this old post from you around the multipart form http invokation: https://community.cloudera.com/t5/Support-Questions/Apache-NiFi-processor-InvokeHTTP-POST-with-MultipartFile/m-p/381112/highlight/true Hope that helps.
... View more
02-14-2024
05:24 AM
3 Kudos
Hi @iriszhuhao , The wait processor is supposed to be used with the Notify processor to work correctly and its more suited for when you have a data that will be split downstream where each split flowfile will have a common identifier and split count that the Wait-Notify processors can go by as explained in this article: https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-merge/ When you get the data from SQL do you get it all in one flowfile or its already split where each record in one flowfile. If you are not getting a lot of data then I would recommend to get it all in one flowfile and then split it so that you can use this pattern and to run the final SP once. So the design will look something like this: In this case the Wait-Notify Signal identifier will be the value of the fragment.identifier and the Wait Target Signal Count will be the value of the fragment.count which both written by the Split processor. If you find this helpful please accept solution. Thanks
... View more
02-13-2024
09:05 AM
1 Kudo
Hi @kekotron , There is no simple out of the box solution for this that I can think of. The easiest way is to use ExecuteScript processor that parse the json as a map, then loop through each key and check if the value of that key is of type map as well- which means nested json - to then convert the map to json string and re assign back to the same key. The ExecuteScript below is written using groovy but you can do the same with other languages as well. import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
flowFile = session.get()
if(!flowFile) return
// Cast a closure with an inputStream and outputStream parameter to StreamCallback
flowFile = session.write(flowFile, {inputStream, outputStream ->
jsonText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
jsonMap = new JsonSlurper().parseText(jsonText)
jsonMap.each{k,v->
if(jsonMap[k] instanceof Map)
jsonMap[k] = JsonOutput.toJson(jsonMap[k])
}
outputStream.write(JsonOutput.toJson(jsonMap).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS) For more info on how to write write script inside ExecuteScript: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 If you find this helpful please accept solution. Thanks
... View more
02-05-2024
01:33 PM
Hi, Using pythong extension Im trying to write my custom processor , I noticed when I return failure rel as follows: return FlowFileTransformResult("failure", contents=None,attributes={"Feeling.Good.About.PY.Ext":"No"}) I cant find any attribute "Feeling.Good.About.PY.Ext" in the failure rel flowfile with value "No". I can guarantee that its not because the attribute name or the value 🙂 . Not sure if this is done by design or what.Any thoughts on this? I have also posted another question related to splitting byte input into multiple flowfiles here but no answer: https://community.cloudera.com/t5/Support-Questions/python-extension-generate-multiple-flowfiles-from-bytes/m-p/383095 @MattWho, @steven-matison , @cotopaul , @joseomjr Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
02-05-2024
01:18 PM
Hi , I'm having a hard time writing my custom processor using python extension. it seems this feature is not mature as I thought or maybe I'm miss understanding something. To summarize I have encountered two issues but they are both related to the same error, as follows: 1- I was experimenting with developing a custom processor using the RecordTransform class to partition a json array, my processor is simple, it takes json array of records where each record is a json object that has a "name" and "age" fields. The processor then try to return RecordTransformResult from the transform method as follows: def transform(self, context, record, schema, attributemap):
return RecordTransformResult(schema=None,record=record,relationship="success",partition={'name':record['name']}) I'm trying to use partition to split the record and passing the partition key just as described in the developer guide: https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.html#record-transform When I place the processor in the extensions folder and run nifi, Im getting the following error in the nifi-python.log: AttributeError: 'dict' object has no attribute '_get_object_id' After a lot time researching the issue I found that the py4j expects a java type dictionary and a conversion should happen based on the post: https://stackoverflow.com/questions/57502975/py4j-serialization-attributeerror-dict-object-has-no-attribute-get-object I was not sure how to access the java gateway object from the processor but after looking at the code from the framework folder I found in the controller.py how to instantiate it so I copied the same code and my transform method became like this: def transform(self, context, record, schema, attributemap):
java_port = int(os.getenv('JAVA_PORT'))
auth_token = os.getenv('AUTH_TOKEN')
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
auto_convert=True)
input_dict = {'name':record['name']}
mc_run_map_dict = MapConverter().convert(input_dict,gateway._gateway_client)
return RecordTransformResult(schema=None,record=record,relationship="success",partition=mc_run_map_dict)
The code above worked and the record was split accordingly. My question is this the correct way or am I missing something because I thought I followed the steps in the developer guide? 2- The other scenario is when I tried to override the getRelationships to define my custom ones again as described in the developer guide, as follows : def getRelationships(self):
failedrel = Relationship(name="MyFailedRel",description="custom failed rel")
successrel = Relationship(name="success",description="custom success rel")
return [failedrel,successrel] First I got the error that List doesnt have the attribute "_get_object_id" , so I tried to covert the list using the method above but now I'm getting the following error: AttributeError: 'Relationship' object has no attribute '_get_object_id' Im not sure what to do about this. It could be related to the first scenario. Any suggestion would be really appreciated as I have been struggling with this for couple of days. @MattWho , @cotopaul , @steven-matison , @joseomjr Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
02-03-2024
12:23 PM
2 Kudos
Hi, Im trying to write my own python extension that takes an excel bytes and produce an output flowfile for each sheet represented in html. Im using pandas dataframe for the conversion. I have been looking to the python extension developer guide but I cant find anything that will point me in the right direction: https://nifi.apache.org/documentation/nifi-2.0.0-M2/html/python-developer-guide.html#record-transform The TransformRecord talks about partitioning flowfile input but it seems like it has to be nifi readable format (json, csv , avro...etc). In executescript processor you could have easily generated multiple flow files by passing array of flowfiles to the session.transfer method: https://community.cloudera.com/t5/Support-Questions/Split-one-Nifi-flow-file-into-Multiple-flow-file-based-on/td-p/203387 However neither RecordTransformResult nor FlowFileTransformResult can do that. Can someone provide me with the code if applicable. Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
01-30-2024
06:12 AM
1 Kudo
My Pleasure . I'm glad you were able to get it with the help of Jolt. I agree jolt is a little intimidating initially, but with practice , trial and error you grow to love it 🙂 . A simpler way to represent your spec: [
{
"operation": "shift",
"spec": {
"store": {
"book": {
"*": "[]"
}
}
}
}
] Feel free to post any jolt question or challenges in the future.
... View more
01-30-2024
02:26 AM
2 Kudos
Hi @noncitizen , I think using JsonPathReader is not the right choice for this requirement. This service according to the documentation will always evaluate against the root element: "...If the root of the FlowFile's JSON is a JSON Array, each JSON Object found in that array will be treated as a separate Record, not as a single record made up of an array. If the root of the FlowFile's JSON is a JSON Object, it will be evaluated as a single Record..." ref: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.23.2/org.apache.nifi.json.JsonPathReader/additionalDetails.html So since your root element "store" is an object it will always return a single record , and if it happens that one of the fields is an array , it will be returned as a single record as an array representation: ["references", "fiction", "fiction"] It seems the JsonPathReader is more suited when your root element is an array. If you want to make it work so that it returns multiple records you probably need to do jolt transformation to dump all book array items into root array. Keep in mind if you want just the category or any other certain field you have to define an Avro schema with the expected fields otherwise all none specified fields will be returned with blank values. What you need is JsonTreeReader service instead where its configured to use Nested Field as Starting Field Strategy, then specify "book" as the Starting Field Name, as follows: The QueryRecord then simply can be configured as follows: which will give you desired output in CSV format: category
reference
fiction
fiction If that helps please accept solution. Thanks
... View more