Member since
05-02-2016
154
Posts
54
Kudos Received
14
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3840 | 07-24-2018 06:34 PM | |
5412 | 09-28-2017 01:53 PM | |
1328 | 02-22-2017 05:18 PM | |
13100 | 01-13-2017 10:07 PM | |
3645 | 12-15-2016 06:00 AM |
07-09-2020
02:23 PM
@sajidiqubal CAn you share more info. The solution is simple, the spart streaming jobs needs to find the kafka-jaas and the corresponding keytab. Make sure both paths are accessible on all machines. So kafka-jaas and the keytab need to be in the local folder and not hdfs. If you need in hdfs, then it needs to be sent it as a part of the spark --files and --keytab arguments ( iirc). In newer versions of kafka, you can add the jaas info as a kafka parameter using sasl.jaas.config. see ex. below. In that case you just need the keytab to be available on all machines. sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/keytabs/kafka_client.keytab" \
principal="kafkaclient1@EXAMPLE.COM";
you will also need a parameter sasl.kerberos.service.name=kafka From your error it looks like the code is not able to find one of the files, the jaas.conf or the keytab. Please check and make sure the file is in the right path and on all yarn nodes.
... View more
10-19-2018
05:05 AM
This is a quick how-to on integrating spark on zeppelin with kerberised kafka. To do this first we will enable impersonation support in zeppelin for spark. By Impersonation, i mean jobs will be executed / submitted to spark by zeppelin as the user logged in to zeppelin and not the default zeppelin user. In my case i will be using my user "Karthik" Now to enable impersonation while submitting to spark, i would recommend using the livy server. The livy server and the interpreter natively support impersonation. Spark interpreter impersonation is possible, but you may have to configure SSh keys or sudo capability for zeppelin which may not be recommended. You can livy to hdp by going to a host and installing "Spark client" , followed by the livy server. Restart Zeppelin , to enable the livy interpreter. Ensure that livy impersonation and livy proxy permissions are setup correctly in ambari. This is done when you install livy server by default. Log in to zeppelin using your credentials.click the drop-down with your username on the top right and select interpreter option. on the interpreter menu, scroll down to the livy interpreter and change interpreter instantiation to be "per user". you can change process option to scoped / isolated based on your needs, either option works. At this point you write a test spark code and execute. check the yarn resourcemanger ui for the task, it should run under the logged in user, "karthik" in my case. I attempted this using spark streaming, to enable spark streaming to connect with kafka, you need to include the spark-streaming-kafka jar as a dependency for your spark job. Please add the below property and value to your livy interpreter. ensure the correct jar version based on kafka and spark versions. Now to enable the spark job to authenticate itself to a kerberised kafka. The trick is to specify the jaas config along with correct keytab to both the spark driver and executor. To do this i created a kafka-jass config with reference to my users keytab and put in /tmp/ in HDFS. I also put in the /tmp/ folder of all my data nodes. I also copied my keytab to all data nodes under /tmp When passing the jaas conf or keytab to spark, i have two options 1. I can push them both to HDFS and refer to them in my livy spark options. The options are exactly the same as what you would pass to spark-submit, except you add "livy." in front of them. In this case since i copied the keytabs already to the datanode /tmp folders , i only had to pass the jaas config as a file to livy. I then passed the name of the file (the part after # ) to my spark driver and executer as extrajava options... This what my jaas-config looks like . Since, i had already copied my keytab to all compute nodes and set it with right permissions for me, i did not have to pass them in the config. I think it may be also better to do that with jaas-config so users can pass them from inside their code and not have to depend on a interpreter config. I will show how to do this later. KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
keyTab="/tmp/karthik.keytab"
storeKey=true
serviceName="kafka"
principal="karthik@HORTONWORKS.COM";
};
2. The second option is to simply pass the jaas config file location and envrionment variable using the System.setProperty() right inside the spark code. This will ensure that the user can use his own jaas conf and keytab without having to change interpreter config. System.setProperty("java.security.auth.login.config","/tmp/kafka_jaas.conf") Run, the spark job and you should be able to connect to the kerberised kafka broker and access topics. if it fails for some reason, check you have uploaded the jaas-config to right location in hdfs, your keytabs have proper permission and your user has permissions to the topic in ranger or acls.
... View more
Labels:
10-17-2018
05:01 AM
sorry just saw this message. could you provide more details
... View more
10-11-2018
06:23 PM
@Rajesh Ghosh yeah the problem is that putmongo cannot recognize that there is more than one json in the file. So what ends up happening is that it reads the first line which is valid json, but when it hits the next line, it thinks this is part of the json from the first line and probably fails with a parsing error. To resolve this i would add a splitline or splittext processor so you can split the input file using the newline character. This will give you the 20 json in individual flowfile. Then you can feed this to mongo and it should work fine. From the split , you can take the "original" relation and use that for doing the archive using putfile.
... View more
08-16-2018
05:48 AM
3 Kudos
Was trying to dig up some TSP benchmark info for nifi listenhttp, which allows for providing a rest proxy, with no luck, so i tried to create one myself. This is a very rough effort, i can improve it by capturing how many client instances i have running , so we can see the TPS drop and rise as the clients drop and rise. At some point i ran out of servers and resources to run more clients, you will the chart drops off at the end , before going back up. This is where i noticed some of my clients had crashed and i restarted them. Anyway, getting to the matter. Purpose The benchmark only meassure how much load can a ListenHTTP processor handle , when subjected to real world traffic. Setup The nifi cluster is setup on an m4.4xlarge ( 16 cores CPU, 32 GB RAM), The node is also hosting the kafka broker and zookeeper. HDF version is 3.1.1 The NiFi is a simple Listenhttp processor forwarding to updateattribute. updateattribute burns the flowfile. The idea was to only measure Listenhttp performance for receiving a message, create flowfile, respond to client and forward the message to next processor. The benchmark tries to measure what kind of peak TPS could be achieved. The NiFi instance is running a S2S provenance task, which forwards provenance event to another nifi instance, which further forwards it to a kafka topic. The data is then ingested into Druid using kafka ingestion. timestampmillis column of the provenancce event will be used by druid for indexing. For the client piece i have a simple python script that constantly calls the rest service exposed by listenhttp, passing the below json. The timestamp in the json is just to ensure the messages are different. {“key”:”clien1”,”timestamp”:<current_unix_time>}. The python is a simple infinite loop in the below format. import requests
import time
import random
from multiprocessing import Process
import os
import json
import threading
from time import sleep
def call_rest():
values=["client1"]
value = random.choice(values)
start = time.time()
timestamp = round(time.time()*1000)
r = requests.post('http://nifi1.field.hortonworks.com:19192/test',data = json.dumps({"key":value,"timestamp":timestamp}))
while True:
threads = []
for i in range(5):
t = threading.Thread(target=call_rest)
threads.append(t)
t.start()
I ran 5 instances of the script across 8 servers to help me generate the kind of volume i needed for this test. Dashboard Once the data is in druid, i can utilize superset to chart and aggregate the provenance events at an interval of one second. Since the provenance events can take a few minutes to arrive, i used a one minute window from 5 minutes ago, meaning from t-5 to t-4 timestamps. This what i saw on the chart, I also filterd by query to only look for componentType=Listenhttp and eventType=RECEIVE. From the above chart we can see that the rate fluctuates from a max of 3000 TPS max to around 600 TPS minimum. To get a better aggregation or a even aggregation, i aggregated this over 5 minute interval over an hour to see what we are doing on average...The chart was pretty promising. So on an average we are looking at 300k messages per 5 minutes, which is around 1000 TPS. Conclusion The 1000 TPS we se see from NiFi from this above load test, is not probably what the max load it can handle, i can try and run my tasks on more severs and see if we see higher numbers. But, at 1000 TPS , NiFi should be able to handle most web based traffic. Additionaly this is on a clusert with one node of NiFi, we can linearly scale by adding more nodes to the cluster .
... View more
Labels:
08-07-2018
05:23 PM
3 Kudos
Nifi build on HDF 3.1.2 and HDF 3.1.0 fail with a depency issue when trying to push data to ADLS. This is because the new version of ADLS has some dependency on hadoop 2.8 feature, which is not available in 2.7.3 which is referenced by nifi. to fix this you can build nifi again. You could eith build it against hadoop 2.8 or againt hdp 2.6.x which should have the classes that ADLS depends on. to do that git clone nifi repository
cd <nif-repo-home>/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/
vi pom.xml add a hadoop.version property to the pom.xml as shown below. if already set, no change is needed. change nifi version to match the nifi version you are running for the parent <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-bundle</artifactId>
<version>1.5.0.3.1.2.0-7</version>
</parent>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
<curator.version>2.11.0</curator.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
cd ..
vi pom.xml
change the nifi-nar-bunlde version to match your nifi version as shown below <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.5.0.3.1.2.0-7</version>
</parent>
run the maven build using the command below. change hadoop.version to match your version of hadoop. the nar will avaliable under nifi-hadoop-libraries-nar/target folder. Take that nar and replace the existing nar under nifi/lib. mvn clean package -Dhadoop.version=2.7.3.2.6.5.0-292
Ensure you have the right jars for adls downloaded into a folder accesible to nifi. Add the folder path to Additional classpath option in PUTHDFS. for hdp 2.6.x you can find the needed jars under /usr/hdp/2.6.x..../hadoop/ you will need the following jars. azure-data-lake-store-sdk-2.2.5.jar
hadoop-azure-2.7.3.2.6.5.0-292.jar
hadoop-azure-datalake-2.7.3.2.6.5.0-292.jar
start nifi and push files , hope it works.
... View more
Labels:
07-25-2018
04:14 PM
@Michael Nacey Which minifi agent are you using? I dont think there is anything against load balancers, but in the past i have had some users complain about performance of the load balancer itself. What kind of messages are the minfi agent forwarding to nifi? are these infrequent large files or datasets , or very frequent small messages.
... View more
07-24-2018
06:34 PM
1 Kudo
Site to site already load balancing internally. When you connect to a remote nifi over s2s and provide url of a node in that cluster, the first call by s2s is to the service discovery api. it receives the cluster topology and addresses of all nodes in the cluster , s2s client then uses that information to pick one node which under the least amount of load and then pushes data to that node. If one of your node is down, s2s should pick a different node in the cluster to push data. For upgrades, you can do a rolling upgrade , one node at a time and mostly you should be fine.
... View more
05-29-2018
03:33 PM
4 Kudos
This article goes on how Nifi, with the of the OPC UA api along with Druid for dashboarding can help with industrial iot use cases. We have seen a trend where several clients have shown interest in HDP as a platform for storing all of the sensor data and third party data. This allows these companies store their sensor data over a longer period of time, easily corelate them with other data sets and enrich them in a very cost effective manner. Additionally, they can run ML algorithms on this data set to better predict outages, malfunctions or maintenance. Using NiFi,Kafka and Druid, can provide means for a very easy to maintain and superfast ingestion mechanism from the several edge devices be it windmills, substations, meters etc. Druid can then provide a robust platform for real-time querying dashboarding on this sensor data so operations can keep an eagle eye on whats going on at a plant or wind farm. For the pupose of this article, i will be consuming data from a demo OPC server which is running some simulation , that generates sample data signals, Ramp,Sinusoid, Square etc. The simulation generates a data point for each signal every second. To ingest this data into Druid, i will be using the kafka streaming ingestion option available in druid. But, Kafka by itself can be very complicated for someone in the industrial background to work with, especially since many may not be exposed to this technology. Hence, instead of writing java applications that produce data into Kafka, we need a mechanism that hides these complexiites away and helps the engineers to foucs not on the platform, but more on the use case. NiFi, is an ideal choice for this with its GUI interface and seemless connectivity to kafka. To allow nifi to connect and consume data from the OPC server, we will be using the eclipse Milo library, which communicates with the server over OPC UA. To make it easy and reproducable, i created a custom processor . The repository is located here. Simply clone the repo. build using mvn clean package -Dskiptests. Once the processor is built just copy the .nar file to nifi/lib folder. Restart Nifi. Developed a nifi flow using the opc processor to extract data from the opc server and publishkafka to push data to a kafka topic. The consumeopc events processor was configured as below. Once flow is configured and started, you should see data being to the kafka topic. Now we will configure the druid kafka ingestion spec. This let druid run real time jobs that will pull data from kafka topic into druid real time nodes for segmenting. {
"type": "kafka",
"dataSchema": {
"dataSource": "iot_ds",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS zzz"
},
"dimensionsSpec": {
"dimensions": ["Identifier"]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "NONE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 400
},
"ioConfig": {
"topic": "iot_metrics",
"consumerProperties": {
"bootstrap.servers": "seregion3.field.hortonworks.com:6667"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
Create the above spec , make sure to modify the kafka bootstrap.server to your environment. Then post this spec to druid curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://<overlord-host>:<port>/druid/indexer/v1/supervisor This create the indexing service in druid. open overlord console to make sure the ingest is working fine. We will superset for the visualization. It may take upto a a few minutes to atleast an 1 hour for the druid datasource to be available based on your configration. Once it is available, click on Source > Refresh druid metadata to see the new iot_ds data source. Upon creating a slice we can clearly see the different signals plotted on superset. I am using a 1 second granularity for these slices. Hopefully you find the article helpful. You could improve accuracy by also copying the data into HDFS / HIve , so you could do some BI kind of reporting and do advance analytics on the data using spark. YOu could also load this data in batches to druid so it can correct any holes or errors from the real time ingest.
... View more
Labels:
02-07-2018
04:22 AM
1 Kudo
The solution in this article was tested for DB2 and could work for other databases as well , that are not available in the GenerateTableFetch Database Type dropdown. I was trying to develop a flow that extracted data from a few tables in DB2 database and the goal was to land that data into HDFS. Our flow was a Basic ListDatabaseTables followed by a GenerateTableFetch followed by a ExecuteSSQL. The idea was to generate multiple queries that could parallelly extract data from the db2 nodes, utilizing all the 3 NiFi nodes in the cluster. The problem we ran into was that there is no option for DB2 in the GenerateTableFetch DBType property. Which means we had to select the generic option. Upon running the flow, the select query failed to execute on DB2. On investigating we found that the query generated by GenerateTableFetch looked like this select id,name,update_dt,state,city,address from customers where update_dt<='01-01-2018 12:00:00' order by update_dt offset 10000 limit 10000
The limit and offset syntax does not work with DB2, so we basically could not use the query generated by GenerateTableFetch. our first thought was to see if we could replcatetext to some hack offset 10000 limit 1000 , to a format that DB2 like "offset 10000 rows fetch first 10000 rows" . I quickly realised that would not be easy. Thats when NiFi EL came to the rescue. Noticed that apart from generating the query in the flowfile content GenerateTableFetch also puts the different pieces that form the query into the flowfiles attribute. pieces like the columns to output, the column to order on , the where clause. Using these attributes and EL , i could recreate the query. Update the ExecuteSQL "select query" property to something like this select ${generatetablefetch.columnnames} from ${generatetablefetch.tablename} where ${generatefetchtable.whereClause} order by ${generatetablefetch.maxColumnNames} offset ${generatetablefetch.offset} rows fetch first ${generatetablefetch.limit} rows only and vola , it works. to see the different attributes GenerateTableFetch writes to the output flowfile, refer to the documentation here.
... View more
Labels: