Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

NiFi ExecuteScript alternative to InvokeHTTP processor: Login only once

NiFi ExecuteScript alternative to InvokeHTTP processor: Login only once

Contributor

We have an SFDC REST endpoint (let's call this UpdateSFDC) where we stream and post database changes, in JSON format, to reflect it to SalesForce object store. Before we get to that endpoint, we need to Login to SalesForce first to this endpoint:

https://test.salesforce.com/services/oauth2/token

So we use InvokeHTTP (let's call this Login2SFDC) and appends the parameters to the Remote URL field in the processor:

https://test.salesforce.com/services/oauth2/token?grant_type=password&client_id=<client_id>&client_s... + key>

This will respond with a token which we need to extract and add as an attribute in every JSON format flowfile (let's call this CreateSignedRequest). Here's a NiFi schematic:

16066-sfdc-flow.png

The Login2SFDC is configure this way:

16067-login2sfdc-1.png

16068-login2sfdc-2.png

However, this is quite inefficient because for every JSON flowfile, we need to login to SalesForce everytime, which will add overhead, not to mention the risk that Login2SFDC REST endpoint will eventually throw some sort of a max limit request reached. Rather, we want to login once and as long as the token is valid (not yet expired), then we will just add the token as an attribute to the JSON flowfile and go to UpdateSFDC REST endpoint straight (via CreateSignedRequest). So I will convert the Login2SFDC processor from InvokeHTTP to ExecuteScript.

I have 3 questions for this problem:

1. How do I this in ECMAScript and/or Python? When I use their respective native HTTP client functionalities, they do not seem to work. Should I be importing and using Java libraries (like HttpClient) instead in which case, the scripting language is simply a wrapper to Java libraries?

2. I am successful in getting a response from Login2SFDC REST endpoint when in NiFi. I tried writing a corresponding Java code (using same exact endpoint and credentials) using HttpClient library:

CloseableHttpClient client = HttpClients.createDefault(); 
String urlStr = "https://test.salesforce.com/services/oauth2/token?grant_type=password&client_id=<client_id>&client_secret=<client_secret>&username=<username>&password=<password + key>"; 
HttpPost httpPost = new HttpPost(urlStr); 
String jsonPayload = "{\"qty\":100,\"name\":\"iPad 4\"}"; 
StringEntity entity = new StringEntity(jsonPayload); httpPost.setEntity(entity); 
httpPost.setHeader("Accept", "application/json"); 
httpPost.setHeader("Content-type", "application/json"); 
CloseableHttpResponse response = client.execute(httpPost); 
System.out.println(response.getStatusLine().getStatusCode()); 
InputStream is = response.getEntity().getContent(); 
String responseStr = IOUtils.toString(is, "UTF-8"); 
System.out.println(responseStr);

client.close();

but I get this response:

Status Code 400 {"error":"invalid_grant","error_description":"authentication failure"}

Am I missing something in the Java code?

3. Is there a better NiFi flow design for this kind of problem?

Hi @Matt Burgess, can you help?

1 REPLY 1
Highlighted

Re: NiFi ExecuteScript alternative to InvokeHTTP processor: Login only once

Master Guru
@J. D. Bacolod

Have you considered using the PutDistributedMapCache and GetDistributedMapCache processors?

Have two separate dataflows. One runs on a cron and is responsible for obtaining the token and write that token to the distirbutedMapCache using the putDistirbutedMapCache processor.

The Second flow is for doing all your other operations using that token. Just before the invokeHTTP processor add a GetDistibutedMapCache processor that reads the token from the distributed map cache in to a FlowFile attribute. You then use that attribute to pass the token in your connections.

One thing to keep in mind is that it is possible that a new token may be retrieved after a FlowFile had already retrieved the old token from the distirbutedMapCache. This would result in auth failure. So you will want your flow to loop back to GetDistributedMapChace processor to get latest key on auth failure on your invokeHTTP processor.

This flow does not keep track in any way when a token expires, but if you know how long a token is good for you can set your cron accordingly.

Thanks,

Matt

Don't have an account?
Coming from Hortonworks? Activate your account here