Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to ingest data from invokeHttp in chunks from a fire hose

avatar
New Contributor

I am trying to use invokeHttp to ingest data from an Oauth2 authenticated streaming endpoint authorized with "client_credentials".

The flow works fine to grab a new token. I suspect this is because that response terminates.

But, when trying to call the endpoint that streams (which has no termination or Content-Length), it sits there with an open connection but no response.

I can test the endpoint and get the streaming data no problem with curl (or httpie with the --streams flag).

Any suggestion?

1 ACCEPTED SOLUTION

avatar

@Alvaro Muir Unfortunately InvokeHttp isn't currently built to transfer the chunks as individual FlowFiles for chunk transfer encoded responses.

In lieu of that I think you have a few options:

  1. You can use ExecuteProcess with the Batch Duration set along with curl or httpie and just capture the output from shell command.
  2. You could create a scripted or custom processor. The trick for this one is going to be that you're going to have to have another thread reading the chunks off the long running request and feeding them to a process local queue. That way, the processor's job is to just check that queue and then transfer whatever chunks it sees at that time. To be specific, your @OnScheduled method could connect to the HTTP endpoint, read individual chunks, and push them onto a LinkedBlockingQueue. Then your @OnTrigger method could do a poll() or take() to see if any chunks are available, iterating through, creating a new FlowFile out the chunks and doing a session.transfer() for each. The GetTwitter processor is the prototypical example of this pattern. In it, the Hosebird client is setup in @OnScheduled to feed eventQueue and @OnTrigger then polls() eventQueue for the Tweets.
  3. You could just use curl or httpie to create files and have NiFi pick those up with GetFile. This is pretty silly but `http --stream <URL> | split -l 1` will actually create individual files out of each chunk.

View solution in original post

2 REPLIES 2

avatar

@Alvaro Muir Unfortunately InvokeHttp isn't currently built to transfer the chunks as individual FlowFiles for chunk transfer encoded responses.

In lieu of that I think you have a few options:

  1. You can use ExecuteProcess with the Batch Duration set along with curl or httpie and just capture the output from shell command.
  2. You could create a scripted or custom processor. The trick for this one is going to be that you're going to have to have another thread reading the chunks off the long running request and feeding them to a process local queue. That way, the processor's job is to just check that queue and then transfer whatever chunks it sees at that time. To be specific, your @OnScheduled method could connect to the HTTP endpoint, read individual chunks, and push them onto a LinkedBlockingQueue. Then your @OnTrigger method could do a poll() or take() to see if any chunks are available, iterating through, creating a new FlowFile out the chunks and doing a session.transfer() for each. The GetTwitter processor is the prototypical example of this pattern. In it, the Hosebird client is setup in @OnScheduled to feed eventQueue and @OnTrigger then polls() eventQueue for the Tweets.
  3. You could just use curl or httpie to create files and have NiFi pick those up with GetFile. This is pretty silly but `http --stream <URL> | split -l 1` will actually create individual files out of each chunk.

avatar
New Contributor

Thanks @jfrazee ! I came to this conclusion hours after I posted this question. It seems like a customer processor is the way to go. Started working on developing it today.