Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Enrich flowfile with in memory look-up dataset

avatar
Expert Contributor

I have a flow file that I want to enrich with the contents from a list of JSON values based on the "id".  Basically perform an inner join of the flow file with this look-up data set.  However, I need the look-up dataset to be in memory, then using the "id", query the look-up dataset and append the results to the current flow file.  Here is an example:

 

Incoming flow file:

 

{
   "id": "abc123",
   "fname": "The",
   "lname": "Rock"
}

 

 

Contents of the look-up data set:

 

[
   {
     "id": "abc123",
     "dob": "03/09/1977",
     "phone": "987-654-0001"
   },
   {
     "id": "def765",
     "dob": "04/08/1976",
     "phone": "789-654-0001"
   },
   {
     "id": "hij765",
     "dob": "05/06/1975",
     "phone": "456-654-0001"
   }
]

 

 

Enriched flow file:

{
   "id": "abc123",
   "fname": "The",
   "lname": "Rock",
   "dob": "03/09/1977",
   "phone": "987-654-0001"
}

 

I need to be able to look up the correct record in the look-up dataset based on the "id" then append the values to the current flow file.

 

The key here is that I need the look-up dataset to reside in memory (can't be a file or a database)

 

Thanks for reading and look forward to hearing back with ideas.

1 ACCEPTED SOLUTION

avatar

If you are using Nifi 1.16 or higher I would also refer you  to the ForEnrichment & JoinEnrichment processors that can help you with what you are trying to do. I think you can use those processor regardless if you read the lookup dataset  directly from HTTP or after you loaded into the DistributedMapCache:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apach...

Hope that helps.

View solution in original post

5 REPLIES 5

avatar

Hi,

Before trying to answer your question I'm trying to understand how are you planning to populate the lookup dataset in "memory" ? Are you thinking of doing it manually in case you have limited number of lookups as you would do with "SimpleKeyValueLookupService" or you just want to read it once from a file\db and populate it in some lookup service like "DistributedMapCacheLookupService" ?

 

avatar
Expert Contributor

Great questions @SAMSAL 

I would like to do something like storing the JSON in a parameter contexts variable then using a Lookup service to retrieve the corresponding record.

I think of it as an in-memory table that I can use to perform inner joins with flow files.

 

Alternatively, I could read the table from a remote source (using InvokeHTTP) and then load it into a DistributedMapCacheLookupService, but I'm not familiar with this approach so I'd have to do some research. 

 

I appreciate your time.  Thank you.

avatar

If you are using Nifi 1.16 or higher I would also refer you  to the ForEnrichment & JoinEnrichment processors that can help you with what you are trying to do. I think you can use those processor regardless if you read the lookup dataset  directly from HTTP or after you loaded into the DistributedMapCache:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apach...

Hope that helps.

avatar
Expert Contributor

Thank you @SAMSAL  I will investigate these processors; I had no idea they existed.  Thank you!

avatar
New Contributor

Below is a Python script that demonstrates this process using your example

import json

# Define the lookup dataset in memory as a list of dictionaries
lookup_data = [
{
"id": "abc123",
"dob": "03/09/1977",
"phone": "987-654-0001"
},
{
"id": "def765",
"dob": "04/08/1976",
"phone": "789-654-0001"
},
{
"id": "hij765",
"dob": "05/06/1975",
"phone": "456-654-0001"
}
]

# Function to perform the enrichment
def enrich_flow_file(flow_file):
try:
flow_data = json.loads(flow_file)

# Perform a lookup based on the "id" field
for item in lookup_data:
if item["id"] == flow_data["id"]:
# Merge the data into the flow_file
flow_data.update(item)
enriched_flow_file = json.dumps(flow_data)
return enriched_flow_file

except Exception as e:
# Handle any exceptions that may occur during processing
return str(e)

# Example usage:
incoming_flow_file = '''
{
"id": "abc123",
"fname": "The",
"lname": "Rock"
}
'''

enriched_flow_file = enrich_flow_file(incoming_flow_file)
print(enriched_flow_file)

 

This script defines the lookup dataset in memory as a list of dictionaries and defines a function enrich_flow_file that takes an incoming flow file as input, performs the lookup based on the "id" field, and appends the matched data to the flow file. You can adapt this script to your specific use case and integrate it into your data processing pipeline.