Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2115 | 12-20-2024 05:49 AM | |
| 2413 | 12-19-2024 08:33 PM | |
| 2161 | 12-19-2024 06:48 AM | |
| 1447 | 12-17-2024 12:56 PM | |
| 2062 | 12-16-2024 04:38 AM |
02-13-2024
09:05 AM
1 Kudo
Hi @kekotron , There is no simple out of the box solution for this that I can think of. The easiest way is to use ExecuteScript processor that parse the json as a map, then loop through each key and check if the value of that key is of type map as well- which means nested json - to then convert the map to json string and re assign back to the same key. The ExecuteScript below is written using groovy but you can do the same with other languages as well. import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
flowFile = session.get()
if(!flowFile) return
// Cast a closure with an inputStream and outputStream parameter to StreamCallback
flowFile = session.write(flowFile, {inputStream, outputStream ->
jsonText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
jsonMap = new JsonSlurper().parseText(jsonText)
jsonMap.each{k,v->
if(jsonMap[k] instanceof Map)
jsonMap[k] = JsonOutput.toJson(jsonMap[k])
}
outputStream.write(JsonOutput.toJson(jsonMap).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS) For more info on how to write write script inside ExecuteScript: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 If you find this helpful please accept solution. Thanks
... View more
02-05-2024
01:33 PM
Hi, Using pythong extension Im trying to write my custom processor , I noticed when I return failure rel as follows: return FlowFileTransformResult("failure", contents=None,attributes={"Feeling.Good.About.PY.Ext":"No"}) I cant find any attribute "Feeling.Good.About.PY.Ext" in the failure rel flowfile with value "No". I can guarantee that its not because the attribute name or the value 🙂 . Not sure if this is done by design or what.Any thoughts on this? I have also posted another question related to splitting byte input into multiple flowfiles here but no answer: https://community.cloudera.com/t5/Support-Questions/python-extension-generate-multiple-flowfiles-from-bytes/m-p/383095 @MattWho, @steven-matison , @cotopaul , @joseomjr Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
02-05-2024
01:18 PM
Hi , I'm having a hard time writing my custom processor using python extension. it seems this feature is not mature as I thought or maybe I'm miss understanding something. To summarize I have encountered two issues but they are both related to the same error, as follows: 1- I was experimenting with developing a custom processor using the RecordTransform class to partition a json array, my processor is simple, it takes json array of records where each record is a json object that has a "name" and "age" fields. The processor then try to return RecordTransformResult from the transform method as follows: def transform(self, context, record, schema, attributemap):
return RecordTransformResult(schema=None,record=record,relationship="success",partition={'name':record['name']}) I'm trying to use partition to split the record and passing the partition key just as described in the developer guide: https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.html#record-transform When I place the processor in the extensions folder and run nifi, Im getting the following error in the nifi-python.log: AttributeError: 'dict' object has no attribute '_get_object_id' After a lot time researching the issue I found that the py4j expects a java type dictionary and a conversion should happen based on the post: https://stackoverflow.com/questions/57502975/py4j-serialization-attributeerror-dict-object-has-no-attribute-get-object I was not sure how to access the java gateway object from the processor but after looking at the code from the framework folder I found in the controller.py how to instantiate it so I copied the same code and my transform method became like this: def transform(self, context, record, schema, attributemap):
java_port = int(os.getenv('JAVA_PORT'))
auth_token = os.getenv('AUTH_TOKEN')
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
auto_convert=True)
input_dict = {'name':record['name']}
mc_run_map_dict = MapConverter().convert(input_dict,gateway._gateway_client)
return RecordTransformResult(schema=None,record=record,relationship="success",partition=mc_run_map_dict)
The code above worked and the record was split accordingly. My question is this the correct way or am I missing something because I thought I followed the steps in the developer guide? 2- The other scenario is when I tried to override the getRelationships to define my custom ones again as described in the developer guide, as follows : def getRelationships(self):
failedrel = Relationship(name="MyFailedRel",description="custom failed rel")
successrel = Relationship(name="success",description="custom success rel")
return [failedrel,successrel] First I got the error that List doesnt have the attribute "_get_object_id" , so I tried to covert the list using the method above but now I'm getting the following error: AttributeError: 'Relationship' object has no attribute '_get_object_id' Im not sure what to do about this. It could be related to the first scenario. Any suggestion would be really appreciated as I have been struggling with this for couple of days. @MattWho , @cotopaul , @steven-matison , @joseomjr Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
02-03-2024
12:23 PM
2 Kudos
Hi, Im trying to write my own python extension that takes an excel bytes and produce an output flowfile for each sheet represented in html. Im using pandas dataframe for the conversion. I have been looking to the python extension developer guide but I cant find anything that will point me in the right direction: https://nifi.apache.org/documentation/nifi-2.0.0-M2/html/python-developer-guide.html#record-transform The TransformRecord talks about partitioning flowfile input but it seems like it has to be nifi readable format (json, csv , avro...etc). In executescript processor you could have easily generated multiple flow files by passing array of flowfiles to the session.transfer method: https://community.cloudera.com/t5/Support-Questions/Split-one-Nifi-flow-file-into-Multiple-flow-file-based-on/td-p/203387 However neither RecordTransformResult nor FlowFileTransformResult can do that. Can someone provide me with the code if applicable. Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
01-30-2024
06:12 AM
1 Kudo
My Pleasure . I'm glad you were able to get it with the help of Jolt. I agree jolt is a little intimidating initially, but with practice , trial and error you grow to love it 🙂 . A simpler way to represent your spec: [
{
"operation": "shift",
"spec": {
"store": {
"book": {
"*": "[]"
}
}
}
}
] Feel free to post any jolt question or challenges in the future.
... View more
01-30-2024
02:26 AM
2 Kudos
Hi @noncitizen , I think using JsonPathReader is not the right choice for this requirement. This service according to the documentation will always evaluate against the root element: "...If the root of the FlowFile's JSON is a JSON Array, each JSON Object found in that array will be treated as a separate Record, not as a single record made up of an array. If the root of the FlowFile's JSON is a JSON Object, it will be evaluated as a single Record..." ref: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.23.2/org.apache.nifi.json.JsonPathReader/additionalDetails.html So since your root element "store" is an object it will always return a single record , and if it happens that one of the fields is an array , it will be returned as a single record as an array representation: ["references", "fiction", "fiction"] It seems the JsonPathReader is more suited when your root element is an array. If you want to make it work so that it returns multiple records you probably need to do jolt transformation to dump all book array items into root array. Keep in mind if you want just the category or any other certain field you have to define an Avro schema with the expected fields otherwise all none specified fields will be returned with blank values. What you need is JsonTreeReader service instead where its configured to use Nested Field as Starting Field Strategy, then specify "book" as the Starting Field Name, as follows: The QueryRecord then simply can be configured as follows: which will give you desired output in CSV format: category
reference
fiction
fiction If that helps please accept solution. Thanks
... View more
01-29-2024
08:11 AM
1 Kudo
@jarviszzzz, @BigJames , I understand your concern specially when you have too many columns to check against and large amount of records where indexing can make big difference in response time. It seems that the LookupRecord is intended for simpler cases where you are checking against one key that is not nullable. For your case I would suggest two options: 1- You can use ExecuteSQL\ExecuteSQLRecord instead of LookupRecord. To use this option you need to extract all data used to filter, update or insert into flowfile attributes since the content will change after ExecuteSQL. Using this processor you can specify the select statement ( in the SQL Select Query property ) with where clause that checks against all fields. The select property allows Expression Language which means you can use flowfile attributes to populate values dynamically. Using SQL and EL also you can handle null values accordingly. Now to check if there is a match or not , you can use simple RouteOnAttribute processor with one condition (dynamic property) : IsRecordFound: ${fileSize:gt(0)} the "fileSize" is a system attribute that gives you the size of the flowfile content, If it's 0 then there is no content return which mean no match , otherwise there is a match. 2- Defer checking for match (update) and no match (insert) to SQL . I like this approach because you are doing one trip to sql instead of two. This approach also requires you to extract all data into flowfile attributes. Basically you create stored proc where you pass record data as parameters ( or json if your sql can parse json). You can run this stored proc using PutSQL processor. The SQL statement property allows for Expression Language which mean you can pass stored proc parameter values dynamically. Inside the stored proc you can check if the record exist or not and do the proper action. One thing you have to be careful with using this approach is using asynchronous calls to the store proc with multi threading or on a cluster as this might end up in sql in sql deadlock exception which you can retry within nifi or the stored proc itself. I'm looping other experts like @MattWho , @cotopaul to see if they can provide other input as well. Thanks
... View more
01-26-2024
09:34 AM
1 Kudo
Hi @jarviszzzz , In your DB Lookup Service , I noticed you are aliasing ID as fs_ID in the Lookup Value Columns! I dont think you do an aliasing there , this property is used to list columns that exist in the target table: "A comma-delimited list of columns in the table that will be returned when the lookup key matches. Note that this may be case-sensitive depending on the database." Can you try to set it to just the ID and see if it works.
... View more
01-26-2024
06:43 AM
1 Kudo
Hi @jarviszzzz , The lookupRecord processor is not suppose to work in a chain where the next lookup will filter based on the result of the previous one. Each LookupRecord is independent call to the database and it will return a result from the whole dataset based on the specified key. If you want to lookup record by multiple keys then you probably need to create new column that join all those key columns values together and use that in your lookupRecord, For example I would create a column called CompanyID_FiscalYear to join both values and use this column as my lookup key. In the dynamic property key of the LookupRecord you can use the concat function to concatenate values from different path: https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#concat If that helps please accept solution. Thanks
... View more
01-26-2024
05:50 AM
Hi @Anderosn , From previous posts it seems this is a common behavior for COLB column data type and it doesnt seem like you can avoid it. To extract the json value please refer to the following post: https://community.cloudera.com/t5/Support-Questions/Avro-to-Json-adding-extra-delemeters/m-p/380646#M244113
... View more