Member since
02-10-2023
5
Posts
1
Kudos Received
0
Solutions
04-29-2024
11:43 AM
1 Kudo
To retrieve data records after crm enrichment added since the last time you polled an Impala table using a Date/Timestamp column, you can use a SQL query with a WHERE clause filtering for records with timestamps greater than the last timestamp you retrieved. Here's a basic example assuming your timestamp column is named timestamp_column: SELECT * FROM your_table WHERE timestamp_column > 'last_poll_timestamp'; Replace 'last_poll_timestamp' with the actual timestamp value you stored from your last poll. Make sure the timestamp format matches the format stored in your table. Here's a step-by-step guide: Store the timestamp of the last poll in your source system. Use this timestamp to construct your SQL query, ensuring you're retrieving records with timestamps greater than the last poll timestamp. Execute the SQL query against your Impala table to retrieve the new records.
... View more
09-01-2023
01:37 AM
Below is a Python script that demonstrates this process using your example
import json
# Define the lookup dataset in memory as a list of dictionaries lookup_data = [ { "id": "abc123", "dob": "03/09/1977", "phone": "987-654-0001" }, { "id": "def765", "dob": "04/08/1976", "phone": "789-654-0001" }, { "id": "hij765", "dob": "05/06/1975", "phone": "456-654-0001" } ]
# Function to perform the enrichment def enrich_flow_file(flow_file): try: flow_data = json.loads(flow_file)
# Perform a lookup based on the "id" field for item in lookup_data: if item["id"] == flow_data["id"]: # Merge the data into the flow_file flow_data.update(item) enriched_flow_file = json.dumps(flow_data) return enriched_flow_file
except Exception as e: # Handle any exceptions that may occur during processing return str(e)
# Example usage: incoming_flow_file = ''' { "id": "abc123", "fname": "The", "lname": "Rock" } '''
enriched_flow_file = enrich_flow_file(incoming_flow_file) print(enriched_flow_file)
This script defines the lookup dataset in memory as a list of dictionaries and defines a function enrich_flow_file that takes an incoming flow file as input, performs the lookup based on the "id" field, and appends the matched data to the flow file. You can adapt this script to your specific use case and integrate it into your data processing pipeline.
... View more