Created on 11-02-2017 04:57 PM - edited 08-17-2019 10:24 AM
You will be performing the following steps for each API interaction after authenticating into Salesforce and getting your SessionID. The following is for an enterprise account. If you have a partner account, these instructions are still mostly valid but you may need to tweak your endpoints slightly when communicating with SFDC.
You will first need to authenticate into Salesforce using your username and password. Create a file named login.txt that has the following information in it:
<?xml version="1.0" encoding="utf-8"?> <env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/"> <env:Body> <n1:login xmlns:n1="urn:partner.soap.sforce.com"> <n1:username>YOUR_USERNAME@DOMAIN.COM.TEST</n1:username> <n1:password>YOURPASSWORD_YOURTOKEN</n1:password> </n1:login> </env:Body> </env:Envelope>
If you have an HDP cluster, place this file in HDFS. Otherwise you will need to place it in a location where all of your NiFi nodes can access it. During my testing, my cluster only contained one NiFi node so I put this file on that node's local file system.
You will need to start your flow with the FetchFile or FetchHDFS processor, specifying the location of the file you created above. Note that if you are using HDFS, you will need to reference your core-site.xml and hdfs-site.xml files. Make sure they are somewhere all your nodes can reach.
Then you will need to use the InvokeHTTP processor with the following URL, passing in the file that was just fetched: https://test.salesforce.com/services/Soap/u/41.0. This is the URL for the auth server, but after authentication you will use the URL for your own instance. Configure it as a POST request. This will authenticate with the SFDC API and return XML response data.
Out of this data, you will need to parse out the SessionID using the EvaluateXQuery processor. Set the destination to 'flowfile-attribute' so we can store the sessionid as an attribute. Add a custom property called 'sessionid' and assign it the value of '/*:Envelope/*:Body/*:loginResponse/*:result/*:sessionId/text()'.
At this point if you run your NiFi flow, you should be authenticated into Salesforce and have sessionID stored as an attribute of the flowfile. We will use this attribute in all future communications with SFDC so it is useful to store it as an attribute and reference it when necessary.
I executed the following steps after authenticating to access the data I was looking for:
We will go through one API request/response so that you have the template of how to communicate with the API.
You'll need to create a file for each XML request you plan on sending over to the server. The first one you'll need to have is creating a job.
<?xml version="1.0" encoding="UTF-8"?> <jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload"> <operation>query</operation> <object>OBJECT_NAME_HERE</object> <concurrencyMode>Parallel</concurrencyMode> <contentType>CSV</contentType> </jobInfo>
Once you create the file and place in a location all your NiFi nodes can access, you can use the FetchFile/FetchHDFS processors to retrieve it.
If you'd like, you can dynamically replace the OBJECT_NAME_HERE text after retrieving the file using the ReplaceText processor. I did not do this as I wanted a quick example but you can.
Using the InvokeHTTP processor, communicate with the API. In this processor you'l need to set the following:
-X-SFDC-Session header variable as a custom property as I have done in the following screenshot
-Content-Type to 'application/xml; charset=UTF-8'.
-Remote URL to 'https://YOUR_INSTANCE.salesforce.com/services/async/41.0/job'
-HTTP method to POST
Now you will need to store the results of your communication in a flowfile-attribute so you can use it in subsequent communication. For example when creating a job, you'll need to store the JobID that comes back in the response to create a batch under that job. You'll need to use EvaluateXQuery for this with the following custom property:
key: jobId
value: /*:jobInfo/*:id/text()
Now that you have the basic flow, you'll need to chain this 3/4-processor template together with itself by duplicating it and modifying the parameters.
Next step is to create a batch, done with the following endpoint and request body.
Endpoint: https://cs78.salesforce.com/services/async/41.0/job/${jobId}/batch
request: select id from YOUR_OBJECT_NAME limit 100
From that response, you'll need to store both the batchId and the state in flowfile attributes using EvaluateXQuery.
After that, you'll need to query the server using a scheduled InvokeHTTP processor (mine is every 5 seconds) until it returns a 'Completed' status. Then, you can retrieve the resultId and ultimately the CSV result.
When you're done, here's what it could look like:
screen-shot-2017-11-02-at-125859.png
Full NiFi flow here: sfdc.xml
Created on 01-03-2019 02:21 PM
Hi @anarasimham
Nice Blog with multiple stuff to integrating into Sales force but I have one query in this flow
After creating a Job we have to add batch for the job to post CSV file into Sales force ,I couldn't able to create batch in XML body for csv post into Sales force .Could you please share sample xml format to post csv data into Sales force ?