Member since
01-14-2019
144
Posts
48
Kudos Received
17
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1195 | 10-05-2018 01:28 PM | |
1018 | 07-23-2018 12:16 PM | |
1330 | 07-23-2018 12:13 PM | |
6960 | 06-25-2018 03:01 PM | |
4204 | 06-20-2018 12:15 PM |
11-02-2017
04:57 PM
2 Kudos
Pre-requisites
Basic knowledge of NiFi, processors, custom properties, and configuring a NiFi flow Salesforce developer account with Bulk API (SOAP API) access Working HDF cluster General Workflow You will be performing the following steps for each API interaction after authenticating into Salesforce and getting your SessionID. The following is for an enterprise account. If you have a partner account, these instructions are still mostly valid but you may need to tweak your endpoints slightly when communicating with SFDC.
Retrieve an XML request body from disk or another accessible location Modify the XML if necessary based on requirements Send your request, receive a response from the SFDC API Parse the XML result and pull out any data that is necessary to continue the interaction Authenticate into Salesforce You will first need to authenticate into Salesforce using your username and password. Create a file named login.txt that has the following information in it: <?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
<env:Body>
<n1:login xmlns:n1="urn:partner.soap.sforce.com">
<n1:username>YOUR_USERNAME@DOMAIN.COM.TEST</n1:username>
<n1:password>YOURPASSWORD_YOURTOKEN</n1:password>
</n1:login>
</env:Body>
</env:Envelope>
If you have an HDP cluster, place this file in HDFS. Otherwise you will need to place it in a location where all of your NiFi nodes can access it. During my testing, my cluster only contained one NiFi node so I put this file on that node's local file system. You will need to start your flow with the FetchFile or FetchHDFS processor, specifying the location of the file you created above. Note that if you are using HDFS, you will need to reference your core-site.xml and hdfs-site.xml files. Make sure they are somewhere all your nodes can reach. Then you will need to use the InvokeHTTP processor with the following URL, passing in the file that was just fetched: https://test.salesforce.com/services/Soap/u/41.0. This is the URL for the auth server, but after authentication you will use the URL for your own instance. Configure it as a POST request. This will authenticate with the SFDC API and return XML response data. Out of this data, you will need to parse out the SessionID using the EvaluateXQuery processor. Set the destination to 'flowfile-attribute' so we can store the sessionid as an attribute. Add a custom property called 'sessionid' and assign it the value of '/*:Envelope/*:Body/*:loginResponse/*:result/*:sessionId/text()'. At this point if you run your NiFi flow, you should be authenticated into Salesforce and have sessionID stored as an attribute of the flowfile. We will use this attribute in all future communications with SFDC so it is useful to store it as an attribute and reference it when necessary. Communicating after authentication I executed the following steps after authenticating to access the data I was looking for:
Create a job Create a batch inside of your job Check batch status When the batch is complete, retrieve the result ID Using the result ID, retrieve the result data. I asked SFDC to format this data as a CSV but you can choose one of several formats (specified when you create the job) We will go through one API request/response so that you have the template of how to communicate with the API. 1. Retrieve the XML request body You'll need to create a file for each XML request you plan on sending over to the server. The first one you'll need to have is creating a job. <?xml version="1.0" encoding="UTF-8"?>
<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">
<operation>query</operation>
<object>OBJECT_NAME_HERE</object>
<concurrencyMode>Parallel</concurrencyMode>
<contentType>CSV</contentType>
</jobInfo>
Once you create the file and place in a location all your NiFi nodes can access, you can use the FetchFile/FetchHDFS processors to retrieve it. 2. Modify the XML If you'd like, you can dynamically replace the OBJECT_NAME_HERE text after retrieving the file using the ReplaceText processor. I did not do this as I wanted a quick example but you can. 3. Communicate with the API Using the InvokeHTTP processor, communicate with the API. In this processor you'l need to set the following: -X-SFDC-Session header variable as a custom property as I have done in the following screenshot -Content-Type to 'application/xml; charset=UTF-8'. -Remote URL to 'https://YOUR_INSTANCE.salesforce.com/services/async/41.0/job' -HTTP method to POST 4. Parse the result and incorporate it into your next request Now you will need to store the results of your communication in a flowfile-attribute so you can use it in subsequent communication. For example when creating a job, you'll need to store the JobID that comes back in the response to create a batch under that job. You'll need to use EvaluateXQuery for this with the following custom property: key: jobId value: /*:jobInfo/*:id/text() Stringing multiple requests together Now that you have the basic flow, you'll need to chain this 3/4-processor template together with itself by duplicating it and modifying the parameters. Next step is to create a batch, done with the following endpoint and request body. Endpoint: https://cs78.salesforce.com/services/async/41.0/job/${jobId}/batch request: select id from YOUR_OBJECT_NAME limit 100 From that response, you'll need to store both the batchId and the state in flowfile attributes using EvaluateXQuery. After that, you'll need to query the server using a scheduled InvokeHTTP processor (mine is every 5 seconds) until it returns a 'Completed' status. Then, you can retrieve the resultId and ultimately the CSV result. When you're done, here's what it could look like: screen-shot-2017-11-02-at-125859.png Full NiFi flow here: sfdc.xml
... View more
Labels:
10-27-2017
03:45 PM
I ran into this exact issue and didn't see a resolution here but wanted to update the thread for anyone that comes looking in the future: I am setting up HDF on an Azure IaaS cluster and had the same issue of Zookeeper unable to bind to the port. In my case I believe it was cloud network configuration that was blocking communication. Switching to using internal IPs for my VMs inside of /etc/hosts for all my nodes (rather than the public IPs I was using before) solved the issue.
... View more
10-23-2017
12:16 PM
You can configure a machine to be a Knox edge node by installing Knox on it and blocking access to the rest of the nodes in your cluster via firewall rules. The only part of your cluster that will be accessible externally (by end users) is the Knox port on the edge node(s) you setup. Knox will first authenticate the user, and after successful authentication forward the user's request to the appropriate node in the cluster for processing.
... View more
10-20-2017
09:09 PM
There isn't an exclusion filter per se (there isn't one for the native UNIX mv command either) but if the files you want to move are named with a pattern you could use that pattern to select only those files to move. The wildcard operator (*) will come in handy in this situation as well.
... View more
10-20-2017
08:53 PM
2 Kudos
@Saurabh Knox communicates with the LDAP server to verify that the credentials you have provided are the same credentials that the LDAP server has stored (username/password). After that process is complete, Knox now knows that it can trust you since it has authenticated you. However, the HDP cluster now needs to authenticate the Knox service to make sure it can be trusted to send commands to the various services inside the cluster. After all any machine could pose to be a Knox edge node. Therefore, Knox then goes through the authentication process with Kerberos using a shared secret called a keytab. This keytab file can only be found on the Knox node that has been configured to connect to the cluster, so this prevents impersonation. After Knox authenticates into the cluster successfully, all communications between Knox and the cluster are encrypted, providing security for data in-transit/on-the-wire.
... View more
09-29-2017
08:29 PM
If your goal is to send data into Spark, you'll need to use one of the mechanisms that Spark accepts to do so. You could write to Kafka, persistent disk such as HDFS, use a port, etc. There are many ways to get data to Spark. What you are using, the output port, is for processor groups. With the use of an output port inside of a processor group, you can link the processor group to something downstream and it will feed data from the output port down to the next processor.
... View more
09-22-2017
09:03 PM
3 Kudos
Ingesting HL7 Data in NiFi
Pre-requisites
We will assume you have the following:
Basic understanding of Linux CLI
HDF cluster or sandbox with NiFi, Kafka, and HDFS installed and running
Basic knowledge of connecting components in NiFi and configuring settings
Overview
Here's the finished product:
This NiFi flow accomplishes the following:
Read the data from a Kafka stream
Format the data to comply with HL7 specifications
Extract the HL7 attributes from the formatted content block
Transform the attribute list into a JSON block
Impose the data's natural hierarchy onto the JSON rather than leaving it as a flat-mapped structure
Write the data to HDFS
Here's an example of our input data:
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
And the output data after being written to HDFS:
{
"OBX_1": {
"UserDefinedAccessChecks": "20150101000100",
"ObservationIdentifier": {
"Text": "Glucose Lvl",
"Identifier": "GLU"
},
"ReferencesRange": "H",
"Units": {
"NameOfCodingSystem": "99",
"Identifier": "65-99",
"Text": "65"
},
"ObservationSubID": "159",
"NatureOfAbnormalTest": "F",
"SetIDOBX": "1",
"ValueType": "NM",
"ObservationValue": "mg\/dL"
},
"OBR_1": {
"OrderingProvider": {
"FamilyName": "Johnson",
"IDNumber": "1620",
"GivenName": "John",
"MiddleInitialOrName": "R"
},
"UniversalServiceIdentifier": {
"Text": "Basic Metabolic Panel",
"Identifier": "648088"
},
"FillerOrderNumber": {
"EntityIdentifier": "000000000000000000"
},
"PlacerOrderNumber": {
"NamespaceID": "HNAM_ORDERID",
"EntityIdentifier": "341856649"
},
"ResultStatus": "M",
"ObservationDateTime": "20150101000100",
"ScheduledDateTime": "20150101000100",
"SetIDObservationRequest": "1",
"ResultsRptStatusChngDateTime": "20150101000100"
},
"MSH": {
"MessageControlID": "Q1111111111111111111",
"SendingApplication": {
"NamespaceID": "XXXXXX"
},
"ReceivingApplication": {
"NamespaceID": "HealthOrg01"
},
"ProcessingID": {
"ProcessingID": "P"
},
"MessageType": {
"MessageType": "ORU",
"TriggerEvent": "R01"
},
"EncodingCharacters": "^~\&",
"VersionID": "2.3",
"FieldSeparator": "|"
},
"uuid": "de394ca2-cbaf-4703-9e25-4ea280a8c691",
"PID": {
"SSNNumberPatient": "123456789",
"PatientAccountNumber": {
"ID": "999999999999"
},
"DateOfBirth": "19700101",
"Sex": "M",
"PatientName": {
"GivenName": "JOHN",
"FamilyName": "SMITH"
},
"PatientIDInternalID": {
"ID": "000000001"
}
},
"path": ".\/",
"PD1": {
"PatientPrimaryCareProviderNameIDNo": {
"IDNumber": "1234567890",
"FamilyName": "LAST",
"GivenName": "FIRST",
"AssigningAuthority": "NPI",
"MiddleInitialOrName": "M"
}
},
"filename": "279877223850444",
"kafka": {
"partition": "0",
"offset": "221",
"topic": "test"
}
}
Reading from Kafka
To read from Kafka, we will need to first create a topic. Execute the following to create a topic:
./kafka-topics.sh --create --zookeeper <ZOOKEEPER_HOSTNAME>:2181 --replication-factor 1 --partitions 1 --topic test
You should have a topic named test available to you.
We can now create the ConsumeKafka processor with this configuration:
You can test that this topic is functioning correctly by using the command-line kafka-console-producer and kafka-console-consumer tools in two different command prompts.
Formatting to Spec
The text data we are using had to be compressed into one line since we are using Kafka to read it (Kafka needs each entry as a single line without any carriage returns), so we now need to de-compress our FlowFile into the appropriate number of lines. We will be replacing all '<cr>' strings with a carriage return ('\r', 0x0D). Use the following configuration on the ReplaceText processor to do so:
Extracting HL7 attributes
We will use the built-in ExtractHL7Attributes processor to transform our formatted text into "Attributes" that are native to NiFi and understood by it.
Transforming into JSON
We can now do a simple conversion of those attributes into JSON. Note that our data is destined for the flowfile-content, so we will now overwrite the formatted data that we used to extract our attributes from. Up until now, we had both the content and attributes which are duplicates of the content.
Formatting the JSON
To format the JSON, we will use a Jolt Shift specification. This specification will fully depend on the data you are using and expecting. The sample spec below is based on my sample input data.
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"uuid": "uuid",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"path": "path",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"filename": "filename",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"kafka.partition": "kafka.partition",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"kafka.offset": "kafka.offset",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"kafka.topic": "kafka.topic",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}
Here's the associated NiFi configuration:
Writing to HDFS
Finally, we will persist this JSON-transformed data to HDFS for further analysis and storage. Configure your PutHDFS processor as follows
Note that the 'Hadoop Configuration Resources' is a required field for your connection from NiFi to HDFS to work properly, so be sure to fill that in with wherever your core-site.xml and hdfs-site.xml HDFS configuration files are on disk.
Be sure to create the directory /tmp/tst/helloworld in hdfs and change ownership to the nifi user:
hdfs dfs -mkdir -p /tmp/tst/helloworld
hdfs dfs -chown -R nifi /tmp/tst
Putting it all together
Now you should be able to connect all of your processors, start them, and see your data move through.
To send the sample data through our new flow, do the following:
1. SSH into a kafka broker in your cluster and cd to the binaries folder - in my environment, that is located at /usr/hdp/current/kafka-broker/bin.
2. Create a file in your home directory and paste the contents of your sample input data (from the Overview section above). This file should be exactly 1 line, no more.
3. Feed your sample data into the kafka producer with the following command:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <KAFKA_BROKER_HOSTNAME>:6667 --topic test < hl7data.txt
If all goes well, when you NiFi flow refreshes you should see it go all the way through. At this point you can check HDFS with the following command:
hdfs dfs -ls /tmp/tst/helloworld
You should see a file there. You can read it with the following:
hdfs dfs -cat /tmp/tst/helloworld/<FILENAME>
Thanks for reading!
Full NiFi template: hl7flow-template.xml
... View more
09-19-2017
06:44 PM
Yes, you can add users to policies either through a CREATE or UPDATE as the second article above outlines. There is a parameter called 'userList' and you will be able to specify any number of users you'd like. To add one user to many policies, you will have to go one by one and add the user to each of the target policies. Please refer to this link from above: https://cwiki.apache.org/confluence/display/RANGER/REST+APIs+for+Policy+Management
... View more
09-19-2017
01:31 PM
1 Kudo
The Ranger REST API is best to programmatically create users and policies. Here's how to create users, from an article on this site: https://community.hortonworks.com/content/supportkb/49439/how-to-use-api-curl-commands-to-create-internal-ra.html Here's how to use the Ranger REST API to create policies: https://cwiki.apache.org/confluence/display/RANGER/REST+APIs+for+Policy+Management Please note the sections on Creating, Updating, Deleting policies. To create for example, send a POST request to the following endpoint with the following body. This example is pulled from the link above. POST /service/public/api/policy {
"policyName": "HomePolicy",
"resourceName": "\/home,\/apps",
"description": "Home",
"repositoryName": "hadoopdev",
"repositoryType": "hdfs",
"isEnabled": "true",
"isRecursive": false,
"isAuditEnabled": true,
"permMapList": [
{
"userList": [
"john",
"andrew"
],
"permList": [
"SELECT",
"UPDATE"
]
},
{
"userList": [
"hr"
],
"groupList": [
"admin"
],
"permList": [
"DROP",
"ALTER",
"ADMIN"
]
}
]
}
... View more
09-18-2017
03:56 PM
1 Kudo
For files from Google Drive, see the following article. This is an example of how you can use the Google Sheets API to retrieve data from Sheets inside Drive. https://community.hortonworks.com/articles/61180/streaming-ingest-of-google-sheets-into-a-connected.html If by Google Drive you mean Google Cloud Storage (GCS), please see the following question/answer: https://community.hortonworks.com/questions/35325/nifi-processor-for-google-cloud-storage.html
... View more
- « Previous
- Next »