Member since
09-25-2015
37
Posts
45
Kudos Received
4
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5529 | 07-06-2017 08:46 PM | |
1460 | 10-03-2016 08:51 PM | |
1950 | 06-23-2016 02:24 PM | |
2196 | 05-25-2016 10:04 PM |
05-08-2018
02:38 PM
2 Kudos
The following example provides a guide to connecting to HBase from Spark then perform a few operations using Spark on the data extracted from HBase Please add the following repo to your package manager http://repo.hortonworks.com/content/repositories/releases/ and import the following class com.hortonworks:shc-core:1.1.0-2.1-s_2.11 Keep in mind that new releases may be available Import the following classes import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import spark.sqlContext.implicits._
import org.apache.spark.sql.types._ The first step in connecting to HBase is defining how Spark should interpret each row of data found in HBase. This is done by defining a catalog to use with the connector def catalog = s"""{
|"table":{"namespace":"default", "name":"timeseries"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf", "col":"t", "type":"string"},
|"col2":{"cf":"cf", "col":"v", "type":"string"},
|"col3":{"cf":"cf", "col":"id", "type":"string"}
|}
|}""".stripMargin
Next we read from Hbase using this catalog val df = sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->catalog.toString))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
Since the available data formats for the catalog ( I believe these are limited to Avro datatypes ) you will almost always need to cast something into a more meaningful datatype. Also you can see here that I am using "split" to break the composite row key into its two parts val df2 = df.withColumn("value", df("col2").cast(FloatType))
.withColumn("tag", split(df("col0"), "-").getItem(0))
.withColumn("t", unix_timestamp(split(df("col0"), "-").getItem(1), "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
Now that we have a data frame containing the data we want to work with, we can perform SQL simply by registering the data frame as a temporary table. df2.registerTempTable("Sine1")
Now we can use SQL to explore the data and apply other manipulations select * from sine1
... View more
Labels:
11-07-2017
05:13 PM
2 Kudos
I am looking to confirm methods for connecting to MS SQL Server without using SQL authentication. The two options I am aware of are Windows authentication and Kerberos authentication. It is also desired that the creditials of the logged in user or the service that Nifi's JVM is running under be supplied without a password needing to be supplied. I am looking for someone who might have ran into this before as this is difficult to test. The difficulty lies in the number of components hat must be configured. Nifi, AD, MS SQL Server, Windows, Kerberos etc Per the Microsoft documentation here https://docs.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url#Connectingintegrated Windows authentication is made possible by adding the sqljdbc_auth.dll file to the host running Nifi. If this file is added to a directory outside the normal classpath of Nifi then the class path can be specified by updating the JVM property Djava.library.path The JDBC URL string for Windows authentication is jdbc:sqlserver://localhost:1433;databaseName=AdventureWorks;integratedSecurity=true; Where integratedSecurity=true; is the flag for using Windows authentication. In this case the credentials of the logged in user or service would automatically be provided without a prompt for a password. Which is highlighted here https://docs.microsoft.com/en-us/sql/connect/jdbc/setting-the-connection-properties under the details of the integratedSecurity flag Can anyone confirm this works with Nifi? Are there any limitations other than the requirement for a Windows host? Using Kerberos and the MSSQL JDBC Driver is documented here: https://docs.microsoft.com/en-us/sql/connect/jdbc/using-kerberos-integrated-authentication-to-connect-to-sql-server What is not clear here is how the password for the Kerberos principal would be obtained as it is included in all the examples I have seen. e.g. jdbc:sqlserver://servername=server_name;integratedSecurity=true;authenticationScheme=JavaKerberos;userName=user@REALM;password=**** Can anyone confirm this works with Nifi? Is this functionality limited to a Windows host? It seems that this could also work from a xUnix host if I properly configure the Kerberos client on that machine. Any input here is appreciated, thanks!
... View more
Labels:
- Labels:
-
Apache NiFi
08-04-2017
02:52 PM
One additional strategy that I recommend is using the OSI PI SDK and the OPC UA stack from the OPC foundation to build a OPC UA server for PI. This should be very straight forward to implement and can be scoped down to only implement the features necessary. I hope with growing demand for OPC UA that this is something that OSISoft will make available in the same way OPC DA is today. You can find more information regarding the Pi SDK in https://techsupport.osisoft.com/Products/Other-Products/PI-SDK/Overview and you can find the OPC UA Stack from the OPC foundation here https://github.com/OPCFoundation
... View more
07-06-2017
10:14 PM
For matrikon you need to enable the OPC UA server and use the UA processor provided by https://community.hortonworks.com/articles/90355/collect-data-from-opc-ua-protocol.html
This article https://community.hortonworks.com/articles/88649/control-system-data-from-kepware.html is a closer guide to connecting to Matrikon. Comment back here if you need more help to get this working
... View more
07-06-2017
08:46 PM
1 Kudo
One option is to use a proxy server like KepServerEx www.kepware.com to access OPC DA data via OPC UA and use https://github.com/wadesalazar/NIFI-OPCUA If you need more direct access to OPC DA you have 2 choices. 1) build native Java client and wrap it with the service and/or processor Nifi APIs. or 2) build / obtain a Windows executable program that can be called or interfaced with Java and Nifi The reason why 1) is so difficult and 2) is necessary is COM. COM was, is, a
binary-interface standard for introduced by Microsoft in
1993. It is used to enable inter-process communication and dynamic object
creation Ironically I believe it was an attempt at an "open" standard for RPC.
There
is no public Java implementation for COM and developing for OPC DA /
HDA / A&E in Java is a real pain. The only tried and true path
forward for a native Java OPC DA client is is JInterop
http://www.j-interop.org/ . See http://openscada.org/projects/utgard/
for a reference implementation. J-Interop is old and sparsely
supported There may be a compiler that can create the necessary Java
class files ( similar to what JInterop has provided )
https://msdn.microsoft.com/en-us/library/ms691398(v=vs.85).aspx but it
is in Visual J++ 6.0. ( if you happen to have this please email me Going with option 2 is better supported but still not easy. You may use a commercial OPC DA stack/client/library, such
as https://www.prosysopc.com/products/ to build an executable that you
can call with nifi execute process or Integrate into a processor using
JNI. This does require your Nifi hosts to be Windows machines.
If
you are looking to stay on Linux but use this approach. A relatively
recent development is Rad Hat's .Net frame work for RHEL, it isnt clear
to me if it supports COM but I believe it may. This might allow one to
use a commercial OPC stack to build an executable that could be used. I love to help you or anyone else build an OPC DA connector for Apache Nifi. Let me know how I can help.
... View more
04-26-2017
12:26 PM
7 Kudos
Accessing data from industrial control devices can be made easier and safer by utilizing an OPC Server. OPC Servers provide protocol proxy services by translating requests for data from a standardized protocol like OPC to the native protocol of the industrial control device. Since the typical OPC Server can proxy many requests into many different protocols each for a different family of data sources they also make ideal aggregation points for control system data. Centralized data access also has advantages in security that should not be overlooked. The OPC Sever and Nifi can work together to provide certificate based authentication and role based data access to devices in process control networks. Nifi's unique pedigree is capable of complying with the stringent network ingress and regress rules. Get started fast with Kepware and Apache Nifi Out of the box tools available in KepServer Ex can make data available via HTTP and MQTT. ----|Kepware|---HTTP REST API-----|NIFI| or ----|Kepware|-----------MQTT-----------|NIFI| Data can also be acquired via OPC UA but this requires you to build the Nifi processor available at https://github.com/wadesalazar/NIFI-OPCUA First Get Kepware Download and install a copy of Kepware's Kepserver on a Windows VM host. You can download the application at at https://my.kepware.com/download/demo/ex/ The installation is straightforward but be sure to install select the optional IoT Gateway Plug-in. Start KepServer Configuration from the Windows Start menu. After being started a unlicensed demo will run for 2 hours before “timing out”. Restarting the server will restore it. Open the KepServerEX Configuration tool and select IoT Gateway Right click on the default agent and select New IoT Item Select Sin1 from the simulation functions and apply this tag. Selecting a scan rate and ok applies this tag configuration to this agent. From here you can export a CSV file that can be used for batch imports of IoT Item configurations Again right click on the Agent but select Properties this time. On the Endpoint tab of the dialogue box you will find the port configuration and option to enable HTTPS. Here you can also find a test link for the server which will bring you to a docs page that explains the available queries and JSON response format Now the data has been exposed over HTTP you may make REST requests to KepServer for data. Configure Get HTTP processor The URL is something of the form: http://YOUR.KEPSERVER.ADDRESS:39320/iotgateway/read?ids=Simulation%20Examples.Functions.Sine1 Review the acquired data in the data providence section of Nifi FYI the default JSON from Kepware puts [] around the inner document like this: {"readResults":[{"id":"Simulation Examples.Functions.Sine1","s":true,"r":"","v":33.705616,"t":1462251936055}]} Using MQTT or OPC UA is a very similar process requiring only changes in the Nifi processors used. Enjoy!
... View more
Labels:
04-25-2017
05:13 PM
6 Kudos
A - Key components needed to make use of time series data provided by the OSIsoft Pi System
For real time processing
NiFi for reading the data
from Pi and push to HDF cluster Kafka for queuing data samples for stream processing Storm / Spark
Streaming For batch processing
NiFi for reading the data
from Pi and push to HDP cluster Hive - for data, storage, pre-processing
and analytical processing HBase - for real time use cases and
iterative processing of windows of data B - Ways HDF can be
used to access data from the Pi System and move it into the data lake -
File based ingestion - This is the easiest setup
and does not require any additional license from OSI Soft. Pi System admins typically know how
to export the data files (by tags and/or by a window of time raw/interpolated). The file export process
involves setting up, then automating, an extract script (typically this
is a .bat script that runs the command line utility to extract data from PI Archive). These text files are then placed on in a location that Apache Nifi can access. There are two patterns for Nifi at this point
Edge MiNiFi that takes the
extracted file and delivers the content to a central NiFI for further processing An additional step in the extract
script that leverages FTP / SCP / NDM to push the file to a landing directory on a central NiFi
server or cluster. ADO / OLEDB / ODBC (
AND Pi SDK ) - Pi
Servers are built on Microsoft technologies and hence support these data access methods (almost) out of the box. These options do require the connection
to originate from a Windows host, either local or remote to the Pi Server, and a bridge to connect the MSFT technology to Nifi.There are design patterns for this
available JDBC - (May require
additional components / license from OSISoft) - NiFi or MiNiFi
connects to Pi using JDBC and pulls data by tag and window of time using SQL queries API - (May
require additional components / license from OSISoft)
OPC DA ( again windows dependent ). There is a processor built by Hortonworks Professional Services for direct connection to OPC DA using pure Java but is not freely available yet. As an alternative, OPC
DA/UA bridges are an option as we do have an open source an OPC UA
processor available for Nifi. See https://github.com/wadesalazar/NIFI-OPCUA OSI offeres an HTTP
REST server that can provide data directly to Nifi's get HTTP processors Native Methods are under development from OSI Soft that allow direct pushes into Hive
or onto Kafka topics. See https://techsupport.osisoft.com/Products/Product-Roadmap Bottom line - If one is
just trying to prove HDF or HDP the least resistance path would be the file
option to get the customer started very quickly Additional notes If you find very old instances of PI
Server there is a real performance concern for the “bulk load” situation regardless of the access method. If you plan
to load any large quantity of historical data from a PI Server, then the PI Server will 1) retrieve binary storage
files that cover the selected range 2) decompose those files and 3) serve your
client the data. To accomplish these
steps, the server must load the necessary binaries into memory and then have
the spare processing capacity to rip the binary. If you have years and years of data to bring over start small and scale
up. If the number of data points in the server server has not changed
frequently, look at the PI Archive Files to get an idea of what average
amount of time a binary covers and try selecting a range that covers one
binary, then two, then three and so on. Another thing to look for in this bulk loading step: Often there are two PI Servers the first is
close to the asset(s) then a second where data from the first is replicated to
where most of the Enterprise applications connect to. You will always want to work on the
second. If you bring it down, you will
only interrupt the customers reporting tasks and not their data collection. After the bulk loading step if you have a requirement to
continuously acquire data from the PI server then you should ask the PI Admin to treat Apache Nifi like
any other client and plan for performance accordingly. Often the challenge here is impact of a frequent select * from the real time database which amounts to a significant performance hit. Secondly keep in mind you are often NOT GETTING RAW
DATA you will be getting interpolated data. Increasing Nifi's polling frequency does not necessarily increase the resolution of
the acquired data. If updates are requested faster than the PI Server itself is acquiring data then all that is returned is
interpolations between the samples. This can impact data science calculations and analyst using this data must be aware this has happened. The last piece to cover is error handling. As above, the most straight forward method is
the file based approach. When a connection to the enterprise historian is
broken files will accumulate on the local server and Nifi will start moving
them again when the back pressure falls. All of the programmatic data access
methods error logic will have to be built for loss of upstream connectivity,
loss of downstream connectivity and late arriving data.
... View more
Labels:
03-22-2017
09:25 PM
9 Kudos
OPC UA is a 100% open source protocol found in many industrial networks. Much like the Apache Foundation stewards the Hadoop project the OPC Foundation https://opcfoundation.org/ is the steward of the OPC UA standard. You can access the standard at https://opcfoundation.org/developer-tools/specifications-unified-architecture for the standard. In addition to the standard itself, the OPC foundation has released implementations of clients, data servers and discovery servers for OPC UA in Java, .NET, and C. The Java repo can be found here OPC UA servers. https://github.com/OPCFoundation/UA-Java The following Nifi Service & Processor bundle can be used to retrieve data from OPC UA severs. https://github.com/wadesalazar/NIFI-OPCUA Information on building the bundle will be provided in a separate article To use the service, start by dropping the GetNodeIds onto the pallet Most OPC servers provide a properties dialog that provides the information necessary to configure the GetNodeIds processor and its companion OPC UA Service. The following example properties are from an instance of KepServerEx's OPC UA Server In this dialog there are 3 endpoints associated with the example server. One for each NIC, 10.0.2.6 and 192.168.189.10, and one for the localhost or 127.0.0.1. Each is using port 49320 for incoming communications. The security column shows the security handshaking mechanisms supported by the endpoint. Once you have this information for your OPC UA server return to Nifi, right click the GetNodeIDs processor and select configure. On the properties tab select value box for the OPC UA Sevrvice property field and select Create new service on the following dialog and to create a new instance of the StandardOPCUAService Access the Process Group Configuration by selecting the black arrow to the right of the value box of the OPC UA Service property. Edit the newly created StandardOPCUAService and enter the information corresponding to the endpoint of your OPC UA server. Take care to ensure the security setting here matches one of one of the available security modes on the server. If you have been give a certificate for authenticating your client with the OPC UA server, you must place the certificate in a folder that nifi can access. Provide the complete path and filename to your certificate in the Certificate field. If you do not have a certificate leave this property field blank and Nifi will create a certificate for you, as they are required for OPC UA authentication. The application name is arbitrary but is used to identify your client in various dialogs on the server. Make this something you will recognize in a list of other clients. Once the configuration of the controller service is complete then save and start the controller. Returning to the GetNodeId configuration dialog. The StandardOPCUAService you just configured should be selected in the OPC UA Service property field. Recursive Depth defines how may branches "deep" the processor will browse into the OPC server's namespace. WARNING!!! The namespace can be both very large and may have circular references start this processors off with the default setting of 0 and increase it only after successful reads. The Starting Node property defines the starting node when browsing the server's namespace. If you do not know where to start browsing, leave the field blank and the browse will start from the designated root node. On the scheduling tab set the Run Schedule to something appropriate for your use case greater than 250ms. On the Settings tab select both the Failure and Success checkbox under Automatically Terminate Relationships, we will change this in following steps. Select accept and transition the processor to "Run". You should the metrics shown on the face of the processor change indicating that a flowfile has been generated and written out that is greater that 1B. This is a successful read of the name space Right clicking the processor and selecting Data provenance will allow you to inspect the contents of the newly generated flowfile. You should see something like the following Most of the nodes starting with NSU are internall nodes that do not contain data we are interested in. Somewhere in your name space you will find the tags that contain the data you would like to retrieve. For this example we are looking for the Nodes under the "Simulation Examples Functions". Now we need to refine the GetNodeIDs processor so that only these items are returned each time the processor is triggered. To do this we change the starting node to ns=2;s=Simulation Examples.Functions and the Recursive Depth to 0. Now we have the information needed to query the OPC server for time stamped values for each NodeID in our list. The list can be saved to disk or keep only in Nifi's memory. Either way it is a best practice to periodically check the OPC Server for changes in the name space as in some applications this happens frequently. Next add a split text processor and configure it as follows to split each line of this flowfile into separate flow files. Now add the GetValue OPC processor to the pallet and connect it to the "Splits" output from the split text processor. There is only one configuration parameter available on the Properties tab of the GetValue processor. For this field select the OPC UA Service created earlier. On the settings tab select both Failure and Success for initial testing. Select apply and transitioning the whole data flow to run. The output data from the GetValue processor is formatted as lines of CSV that can be merged into a single document and treated as a CSV. Future iterations of the processor will support various serialization techniques to avoid processing text in later flows.
... View more
Labels:
12-05-2016
11:49 PM
2 Kudos
To post random data to Azure Event hubs using HTTP, use a generate flowfile processor to first generate the random data. Connect this to an UpdateAttribute processor to add your SAS (Share Access Signature) as a new attribute called "Authorization" to the flow file's attributes. Finally use PostHTTP to submit the data to Event Hubs Place the following processors on the pallet and set the run schedule of the generate flowfile processor to something reasonable like 10 seconds. Configure Update Attribute, adding the "Authorized" property with a SAS token. For more information on creating a SAS token see https://community.hortonworks.com/articles/69823/creating-shared-access-signature-sas-for-posting-d.html Configure the PostHTTP to post the URL https://eventhub-nifi.servicebus.windows.net/hub-nifi/messages Note: I set the content type to text/plain you should set this to suit your use case. Only the truststore needs to be configured for this example as we only need to trust the server certificates presented to us by the Azure servers. For simplicity I use Java's default keystore. See this post for finding and configuring the SSL service to use this keystore. Otherwise you will need to import trust certificates into your keystore manually Start all processors and verify successful submissions to the Event Hub Service in the Azure Portal Background on the security mechanisms of Event Hubs https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-shared-access-signature-authentication Background on Shard Access Signatures and how to generate them https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas-overview
... View more
Labels:
12-04-2016
03:31 AM
1 Kudo
First ... if you need background Azure Event Hubs go here: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-overview Second, its handy to know why you need a SAS token and what you can do once you have one. See: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview We will use an example configuration for and Event Hub service. The corresponding data for your service can be found on the details page of the share access policy you want to use. More information on this topic can be found here https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-shared-access-signature-authentication From the shared access policy "hub-nifi"'s detail page we can get all the information we need to create our token from the connection string-primary key field Endpoint=sb://nifi-eventhub.servicebus.windows.net/;
SharedAccessKeyName=hub-user;
SharedAccessKey=2hmLYbJk2q5uZ2Yfyl0XSezXbxD+afO9ysh0Vsv4Xq8=;EntityPath=hub1 A SAS token is simply the hash of a string consisting of two substrings, the endpoint URL and the date the token should expire. The expiration date should be in Unix epoch format. The format for the string is <resourceURI> + \n + <expiry> For our example the URL is https://eventhub-nifi.servicebus.windows.net/hub-nifi and for <expiry> we arbitrarily used Thu Dec 08 2016 06:26:40 UTC-0600 which is 1481200000 in Unix epic. The string to hash is then http://nifi-eventhub.servicebus.windows.net/hub1\n1481868000 Before hashing this string we must URL Encode it, which would result in http%3A%2F%2Fnifi-eventhub.servicebus.windows.net%2Fhub1\n1481868000 Hash the URL encoded string using the shared access key and openssl. The format of the openssl command is: echo -n -e 'value' | openssl sha256 -binary -hmac 'key' | openssl base64 using our example values echo -e -n 'http%3A%2F%2Fnifi-eventhub.servicebus.windows.net%2Fhub1\n1481868000' | openssl dgst -sha256 -binary -hmac '2hmLYbJk2q5uZ2Yfyl0XSezXbxD+afO9ysh0Vsv4Xq8=' | openssl base64
The output should be similar to ZYxl4SEwnNMa/gir+aYgkb5rZv/6vUCqh1+NZgIGI4s= To make a HTTP request to an Event Hubs endpoint a "Authorization" property must be added to the headers of the request. IMPORTANT URL encode the hash before using it in the token The value of the authorization property is formatted as Authorization: SharedAccessSignature sr={URI}&sig={HMAC_SHA256_SIGNATURE}&se={EXPIRATION_TIME}&skn={KEY_NAME} Using our example values the property are Authorization: SharedAccessSignature sig=ZYxl4SEwnNMa%2Fgir%2BaYgkb5rZv%2F6vUCqh1%2BNZgIGI4s%3D&se=1481868000&skn=hub-user&sr=http%3A%2F%2Fnifi-eventhub.servicebus.windows.net%2Fhub1 use curl to confirm the token we have generated works curl -v -H 'Authorization: SharedAccessSignature sig=ZYxl4SEwnNMa%2Fgir%2BaYgkb5rZv%2F6vUCqh1%2BNZgIGI4s%3D&se=1481868000&skn=hub-user&sr=http%3A%2F%2Fnifi-eventhub.servicebus.windows.net%2Fhub1' --data 'hello world!' https://nifi-eventhub.servicebus.windows.net/hub1/messages?timeout=60\&api-version=2014-01
... View more
Labels: