Created 01-30-2023 09:59 PM
I have a flow file that I want to enrich with the contents from a list of JSON values based on the "id". Basically perform an inner join of the flow file with this look-up data set. However, I need the look-up dataset to be in memory, then using the "id", query the look-up dataset and append the results to the current flow file. Here is an example:
Incoming flow file:
{
"id": "abc123",
"fname": "The",
"lname": "Rock"
}
Contents of the look-up data set:
[
{
"id": "abc123",
"dob": "03/09/1977",
"phone": "987-654-0001"
},
{
"id": "def765",
"dob": "04/08/1976",
"phone": "789-654-0001"
},
{
"id": "hij765",
"dob": "05/06/1975",
"phone": "456-654-0001"
}
]
Enriched flow file:
{
"id": "abc123",
"fname": "The",
"lname": "Rock",
"dob": "03/09/1977",
"phone": "987-654-0001"
}
I need to be able to look up the correct record in the look-up dataset based on the "id" then append the values to the current flow file.
The key here is that I need the look-up dataset to reside in memory (can't be a file or a database)
Thanks for reading and look forward to hearing back with ideas.
Created 01-31-2023 10:16 AM
If you are using Nifi 1.16 or higher I would also refer you to the ForEnrichment & JoinEnrichment processors that can help you with what you are trying to do. I think you can use those processor regardless if you read the lookup dataset directly from HTTP or after you loaded into the DistributedMapCache:
Hope that helps.
Created 01-31-2023 08:04 AM
Hi,
Before trying to answer your question I'm trying to understand how are you planning to populate the lookup dataset in "memory" ? Are you thinking of doing it manually in case you have limited number of lookups as you would do with "SimpleKeyValueLookupService" or you just want to read it once from a file\db and populate it in some lookup service like "DistributedMapCacheLookupService" ?
Created 01-31-2023 09:56 AM
Great questions @SAMSAL
I would like to do something like storing the JSON in a parameter contexts variable then using a Lookup service to retrieve the corresponding record.
I think of it as an in-memory table that I can use to perform inner joins with flow files.
Alternatively, I could read the table from a remote source (using InvokeHTTP) and then load it into a DistributedMapCacheLookupService, but I'm not familiar with this approach so I'd have to do some research.
I appreciate your time. Thank you.
Created 01-31-2023 10:16 AM
If you are using Nifi 1.16 or higher I would also refer you to the ForEnrichment & JoinEnrichment processors that can help you with what you are trying to do. I think you can use those processor regardless if you read the lookup dataset directly from HTTP or after you loaded into the DistributedMapCache:
Hope that helps.
Created 01-31-2023 11:01 AM
Thank you @SAMSAL I will investigate these processors; I had no idea they existed. Thank you!
Created on 09-01-2023 01:37 AM - last edited on 09-08-2023 11:30 AM by cjervis
Below is a Python script that demonstrates this process using your example
import json
# Define the lookup dataset in memory as a list of dictionaries
lookup_data = [
{
"id": "abc123",
"dob": "03/09/1977",
"phone": "987-654-0001"
},
{
"id": "def765",
"dob": "04/08/1976",
"phone": "789-654-0001"
},
{
"id": "hij765",
"dob": "05/06/1975",
"phone": "456-654-0001"
}
]
# Function to perform the enrichment
def enrich_flow_file(flow_file):
try:
flow_data = json.loads(flow_file)
# Perform a lookup based on the "id" field
for item in lookup_data:
if item["id"] == flow_data["id"]:
# Merge the data into the flow_file
flow_data.update(item)
enriched_flow_file = json.dumps(flow_data)
return enriched_flow_file
except Exception as e:
# Handle any exceptions that may occur during processing
return str(e)
# Example usage:
incoming_flow_file = '''
{
"id": "abc123",
"fname": "The",
"lname": "Rock"
}
'''
enriched_flow_file = enrich_flow_file(incoming_flow_file)
print(enriched_flow_file)
This script defines the lookup dataset in memory as a list of dictionaries and defines a function enrich_flow_file that takes an incoming flow file as input, performs the lookup based on the "id" field, and appends the matched data to the flow file. You can adapt this script to your specific use case and integrate it into your data processing pipeline.
Created 09-08-2024 08:38 PM
can I keep the JSON file as a constatnts in nifi itself? so that I can avoid the invokeHTTP call. every time I received a flow data, I just need to check the id from the flow data and using that i need to find the correspondent entry from the conastant json file and append those in to the flow data.