Support Questions

Find answers, ask questions, and share your expertise

How can I extract the provenance data using rest api with python script

avatar
Explorer

Hi all,

I have the requirement that to do the unit testing for each nifi processors while send hl7 message via a python script. The following are the needs:
1. After sent the HL7 test message it should receive the ACK/NACK data=  Done
2. After processed the validation check this flow file will move into another upstream processors = [ Here I need the status of the flow file on each upstream processors whether it successful / fails.
For that I am trying to extract or fetch the provenance data through nifi rest api, I am using NiFi version 2.0.0 with the python script I sent the payload as

payload = {
"provenance": {
"request": {
"componentId": "f6ca227e-0196-1000-13c2-d66cf56e69f2",
"sort": "eventTime",
"maxResults": 1
}
}
}

with url  https://localhsot:port/nifi-api/provenance
I am getting this output :
:clipboard::clipboard: Showing 1 recent provenance events:
:three_o_clock: 05/14/2025 15:48:24.041 IST
:wrench: Component: ExecuteGroovyScript (ExecuteGroovyScript)
:page_facing_up: Type: CLONE
🧾 UUID: 1f7c9140-f09d-4da3-bd6c-075ac8e1444b
:package: Size: 126 bytes bytes


but the issue here is it shows the end of the provenance data not the latest one. the below is my python script method :

def fetch_provenance_events(limit=10😞
hdr = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
print(f"token : {hdr}")
try:
payload = {
"provenance": {
"request": {
"componentId": "f6ca227e-0196-1000-13c2-d66cf56e69f2",
"sort": "eventTime",
"maxResults": 1
}
}
}

url = f"{NIFI_BASE_URL}/nifi-api/provenance"
response = requests.post(url, headers=hdr,json=payload, verify=False)
print(":magnifying_glass_tilted_left: Raw response:", response.status_code) # 👈 Add this
print(":magnifying_glass_tilted_left: Text response:", response.text) # 👈 Add this
response.raise_for_status()
submission = response.json()["provenance"]
print(f"submission: {submission}")
query_id = submission["id"]
print(f" Qry ID : {query_id}")
 
# Poll until complete
while not submission.get("finished", False😞
time.sleep(1)
resp = requests.get(f"{NIFI_BASE_URL}/provenance-events/latest/{query_id}", headers=hdr, verify=False)
print('prv_event_resp : {resp.text}')
submission = resp.json()["provenance"]

# Now retrieve the results
events = submission["results"]["provenanceEvents"]
return sorted(events, key=lambda e: e["eventTime"], reverse=True)
#events = response.json().get("provenanceEvents", [])
#return events
except requests.exceptions.HTTPError as e:
print(f":cross_mark: HTTP Error: {e}")
except Exception as e:
print(f":cross_mark: Other Error: {e}")
return []
 
Here is my View Data Provenance (UI) the latest records shows on the top.AllIsWell_0-1747981128675.png

But it fetches the random data as attached UI screenshot of provenance data:

AllIsWell_1-1747981363857.png

 

I tried with all combinations of reverse=False but no luck, if someone already tried and got the success result please let me know where I made the wrong or anything that i missed out here.
Thanks in advance.
VAX

1 ACCEPTED SOLUTION

avatar
Explorer
1 REPLY 1

avatar
Explorer

Hi All,

I got the answer from this https://community.cloudera.com/t5/Support-Questions/Nifi-how-to-get-provenance-event-id-in-nifi/m-p/...  thread it got resolved my query. Thank you @alopresto