Member since
05-02-2016
154
Posts
54
Kudos Received
14
Solutions
07-09-2020
02:23 PM
@sajidiqubal CAn you share more info. The solution is simple, the spart streaming jobs needs to find the kafka-jaas and the corresponding keytab. Make sure both paths are accessible on all machines. So kafka-jaas and the keytab need to be in the local folder and not hdfs. If you need in hdfs, then it needs to be sent it as a part of the spark --files and --keytab arguments ( iirc). In newer versions of kafka, you can add the jaas info as a kafka parameter using sasl.jaas.config. see ex. below. In that case you just need the keytab to be available on all machines. sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/keytabs/kafka_client.keytab" \
principal="kafkaclient1@EXAMPLE.COM";
you will also need a parameter sasl.kerberos.service.name=kafka From your error it looks like the code is not able to find one of the files, the jaas.conf or the keytab. Please check and make sure the file is in the right path and on all yarn nodes.
... View more
10-19-2018
05:05 AM
This is a quick how-to on integrating spark on zeppelin with kerberised kafka. To do this first we will enable impersonation support in zeppelin for spark. By Impersonation, i mean jobs will be executed / submitted to spark by zeppelin as the user logged in to zeppelin and not the default zeppelin user. In my case i will be using my user "Karthik" Now to enable impersonation while submitting to spark, i would recommend using the livy server. The livy server and the interpreter natively support impersonation. Spark interpreter impersonation is possible, but you may have to configure SSh keys or sudo capability for zeppelin which may not be recommended. You can livy to hdp by going to a host and installing "Spark client" , followed by the livy server. Restart Zeppelin , to enable the livy interpreter. Ensure that livy impersonation and livy proxy permissions are setup correctly in ambari. This is done when you install livy server by default. Log in to zeppelin using your credentials.click the drop-down with your username on the top right and select interpreter option. on the interpreter menu, scroll down to the livy interpreter and change interpreter instantiation to be "per user". you can change process option to scoped / isolated based on your needs, either option works. At this point you write a test spark code and execute. check the yarn resourcemanger ui for the task, it should run under the logged in user, "karthik" in my case. I attempted this using spark streaming, to enable spark streaming to connect with kafka, you need to include the spark-streaming-kafka jar as a dependency for your spark job. Please add the below property and value to your livy interpreter. ensure the correct jar version based on kafka and spark versions. Now to enable the spark job to authenticate itself to a kerberised kafka. The trick is to specify the jaas config along with correct keytab to both the spark driver and executor. To do this i created a kafka-jass config with reference to my users keytab and put in /tmp/ in HDFS. I also put in the /tmp/ folder of all my data nodes. I also copied my keytab to all data nodes under /tmp When passing the jaas conf or keytab to spark, i have two options 1. I can push them both to HDFS and refer to them in my livy spark options. The options are exactly the same as what you would pass to spark-submit, except you add "livy." in front of them. In this case since i copied the keytabs already to the datanode /tmp folders , i only had to pass the jaas config as a file to livy. I then passed the name of the file (the part after # ) to my spark driver and executer as extrajava options... This what my jaas-config looks like . Since, i had already copied my keytab to all compute nodes and set it with right permissions for me, i did not have to pass them in the config. I think it may be also better to do that with jaas-config so users can pass them from inside their code and not have to depend on a interpreter config. I will show how to do this later. KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
keyTab="/tmp/karthik.keytab"
storeKey=true
serviceName="kafka"
principal="karthik@HORTONWORKS.COM";
};
2. The second option is to simply pass the jaas config file location and envrionment variable using the System.setProperty() right inside the spark code. This will ensure that the user can use his own jaas conf and keytab without having to change interpreter config. System.setProperty("java.security.auth.login.config","/tmp/kafka_jaas.conf") Run, the spark job and you should be able to connect to the kerberised kafka broker and access topics. if it fails for some reason, check you have uploaded the jaas-config to right location in hdfs, your keytabs have proper permission and your user has permissions to the topic in ranger or acls.
... View more
Labels:
10-17-2018
05:01 AM
sorry just saw this message. could you provide more details
... View more
08-16-2018
05:48 AM
3 Kudos
Was trying to dig up some TSP benchmark info for nifi listenhttp, which allows for providing a rest proxy, with no luck, so i tried to create one myself. This is a very rough effort, i can improve it by capturing how many client instances i have running , so we can see the TPS drop and rise as the clients drop and rise. At some point i ran out of servers and resources to run more clients, you will the chart drops off at the end , before going back up. This is where i noticed some of my clients had crashed and i restarted them. Anyway, getting to the matter. Purpose The benchmark only meassure how much load can a ListenHTTP processor handle , when subjected to real world traffic. Setup The nifi cluster is setup on an m4.4xlarge ( 16 cores CPU, 32 GB RAM), The node is also hosting the kafka broker and zookeeper. HDF version is 3.1.1 The NiFi is a simple Listenhttp processor forwarding to updateattribute. updateattribute burns the flowfile. The idea was to only measure Listenhttp performance for receiving a message, create flowfile, respond to client and forward the message to next processor. The benchmark tries to measure what kind of peak TPS could be achieved. The NiFi instance is running a S2S provenance task, which forwards provenance event to another nifi instance, which further forwards it to a kafka topic. The data is then ingested into Druid using kafka ingestion. timestampmillis column of the provenancce event will be used by druid for indexing. For the client piece i have a simple python script that constantly calls the rest service exposed by listenhttp, passing the below json. The timestamp in the json is just to ensure the messages are different. {“key”:”clien1”,”timestamp”:<current_unix_time>}. The python is a simple infinite loop in the below format. import requests
import time
import random
from multiprocessing import Process
import os
import json
import threading
from time import sleep
def call_rest():
values=["client1"]
value = random.choice(values)
start = time.time()
timestamp = round(time.time()*1000)
r = requests.post('http://nifi1.field.hortonworks.com:19192/test',data = json.dumps({"key":value,"timestamp":timestamp}))
while True:
threads = []
for i in range(5):
t = threading.Thread(target=call_rest)
threads.append(t)
t.start()
I ran 5 instances of the script across 8 servers to help me generate the kind of volume i needed for this test. Dashboard Once the data is in druid, i can utilize superset to chart and aggregate the provenance events at an interval of one second. Since the provenance events can take a few minutes to arrive, i used a one minute window from 5 minutes ago, meaning from t-5 to t-4 timestamps. This what i saw on the chart, I also filterd by query to only look for componentType=Listenhttp and eventType=RECEIVE. From the above chart we can see that the rate fluctuates from a max of 3000 TPS max to around 600 TPS minimum. To get a better aggregation or a even aggregation, i aggregated this over 5 minute interval over an hour to see what we are doing on average...The chart was pretty promising. So on an average we are looking at 300k messages per 5 minutes, which is around 1000 TPS. Conclusion The 1000 TPS we se see from NiFi from this above load test, is not probably what the max load it can handle, i can try and run my tasks on more severs and see if we see higher numbers. But, at 1000 TPS , NiFi should be able to handle most web based traffic. Additionaly this is on a clusert with one node of NiFi, we can linearly scale by adding more nodes to the cluster .
... View more
Labels:
08-07-2018
05:23 PM
3 Kudos
Nifi build on HDF 3.1.2 and HDF 3.1.0 fail with a depency issue when trying to push data to ADLS. This is because the new version of ADLS has some dependency on hadoop 2.8 feature, which is not available in 2.7.3 which is referenced by nifi. to fix this you can build nifi again. You could eith build it against hadoop 2.8 or againt hdp 2.6.x which should have the classes that ADLS depends on. to do that git clone nifi repository
cd <nif-repo-home>/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/
vi pom.xml add a hadoop.version property to the pom.xml as shown below. if already set, no change is needed. change nifi version to match the nifi version you are running for the parent <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-bundle</artifactId>
<version>1.5.0.3.1.2.0-7</version>
</parent>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
<curator.version>2.11.0</curator.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
cd ..
vi pom.xml
change the nifi-nar-bunlde version to match your nifi version as shown below <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.5.0.3.1.2.0-7</version>
</parent>
run the maven build using the command below. change hadoop.version to match your version of hadoop. the nar will avaliable under nifi-hadoop-libraries-nar/target folder. Take that nar and replace the existing nar under nifi/lib. mvn clean package -Dhadoop.version=2.7.3.2.6.5.0-292
Ensure you have the right jars for adls downloaded into a folder accesible to nifi. Add the folder path to Additional classpath option in PUTHDFS. for hdp 2.6.x you can find the needed jars under /usr/hdp/2.6.x..../hadoop/ you will need the following jars. azure-data-lake-store-sdk-2.2.5.jar
hadoop-azure-2.7.3.2.6.5.0-292.jar
hadoop-azure-datalake-2.7.3.2.6.5.0-292.jar
start nifi and push files , hope it works.
... View more
Labels:
05-29-2018
03:33 PM
4 Kudos
This article goes on how Nifi, with the of the OPC UA api along with Druid for dashboarding can help with industrial iot use cases. We have seen a trend where several clients have shown interest in HDP as a platform for storing all of the sensor data and third party data. This allows these companies store their sensor data over a longer period of time, easily corelate them with other data sets and enrich them in a very cost effective manner. Additionally, they can run ML algorithms on this data set to better predict outages, malfunctions or maintenance. Using NiFi,Kafka and Druid, can provide means for a very easy to maintain and superfast ingestion mechanism from the several edge devices be it windmills, substations, meters etc. Druid can then provide a robust platform for real-time querying dashboarding on this sensor data so operations can keep an eagle eye on whats going on at a plant or wind farm. For the pupose of this article, i will be consuming data from a demo OPC server which is running some simulation , that generates sample data signals, Ramp,Sinusoid, Square etc. The simulation generates a data point for each signal every second. To ingest this data into Druid, i will be using the kafka streaming ingestion option available in druid. But, Kafka by itself can be very complicated for someone in the industrial background to work with, especially since many may not be exposed to this technology. Hence, instead of writing java applications that produce data into Kafka, we need a mechanism that hides these complexiites away and helps the engineers to foucs not on the platform, but more on the use case. NiFi, is an ideal choice for this with its GUI interface and seemless connectivity to kafka. To allow nifi to connect and consume data from the OPC server, we will be using the eclipse Milo library, which communicates with the server over OPC UA. To make it easy and reproducable, i created a custom processor . The repository is located here. Simply clone the repo. build using mvn clean package -Dskiptests. Once the processor is built just copy the .nar file to nifi/lib folder. Restart Nifi. Developed a nifi flow using the opc processor to extract data from the opc server and publishkafka to push data to a kafka topic. The consumeopc events processor was configured as below. Once flow is configured and started, you should see data being to the kafka topic. Now we will configure the druid kafka ingestion spec. This let druid run real time jobs that will pull data from kafka topic into druid real time nodes for segmenting. {
"type": "kafka",
"dataSchema": {
"dataSource": "iot_ds",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS zzz"
},
"dimensionsSpec": {
"dimensions": ["Identifier"]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "NONE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 400
},
"ioConfig": {
"topic": "iot_metrics",
"consumerProperties": {
"bootstrap.servers": "seregion3.field.hortonworks.com:6667"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
Create the above spec , make sure to modify the kafka bootstrap.server to your environment. Then post this spec to druid curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://<overlord-host>:<port>/druid/indexer/v1/supervisor This create the indexing service in druid. open overlord console to make sure the ingest is working fine. We will superset for the visualization. It may take upto a a few minutes to atleast an 1 hour for the druid datasource to be available based on your configration. Once it is available, click on Source > Refresh druid metadata to see the new iot_ds data source. Upon creating a slice we can clearly see the different signals plotted on superset. I am using a 1 second granularity for these slices. Hopefully you find the article helpful. You could improve accuracy by also copying the data into HDFS / HIve , so you could do some BI kind of reporting and do advance analytics on the data using spark. YOu could also load this data in batches to druid so it can correct any holes or errors from the real time ingest.
... View more
Labels:
02-07-2018
04:22 AM
1 Kudo
The solution in this article was tested for DB2 and could work for other databases as well , that are not available in the GenerateTableFetch Database Type dropdown. I was trying to develop a flow that extracted data from a few tables in DB2 database and the goal was to land that data into HDFS. Our flow was a Basic ListDatabaseTables followed by a GenerateTableFetch followed by a ExecuteSSQL. The idea was to generate multiple queries that could parallelly extract data from the db2 nodes, utilizing all the 3 NiFi nodes in the cluster. The problem we ran into was that there is no option for DB2 in the GenerateTableFetch DBType property. Which means we had to select the generic option. Upon running the flow, the select query failed to execute on DB2. On investigating we found that the query generated by GenerateTableFetch looked like this select id,name,update_dt,state,city,address from customers where update_dt<='01-01-2018 12:00:00' order by update_dt offset 10000 limit 10000
The limit and offset syntax does not work with DB2, so we basically could not use the query generated by GenerateTableFetch. our first thought was to see if we could replcatetext to some hack offset 10000 limit 1000 , to a format that DB2 like "offset 10000 rows fetch first 10000 rows" . I quickly realised that would not be easy. Thats when NiFi EL came to the rescue. Noticed that apart from generating the query in the flowfile content GenerateTableFetch also puts the different pieces that form the query into the flowfiles attribute. pieces like the columns to output, the column to order on , the where clause. Using these attributes and EL , i could recreate the query. Update the ExecuteSQL "select query" property to something like this select ${generatetablefetch.columnnames} from ${generatetablefetch.tablename} where ${generatefetchtable.whereClause} order by ${generatetablefetch.maxColumnNames} offset ${generatetablefetch.offset} rows fetch first ${generatetablefetch.limit} rows only and vola , it works. to see the different attributes GenerateTableFetch writes to the output flowfile, refer to the documentation here.
... View more
Labels:
12-20-2017
06:47 AM
was going through some ORC based processing from pdf. looks it works better if each page is split into a monochrome image. Examples online show Ghostscript as an option. I was able to leverage this processor to extract images and with a property change it to grayscale if needed. I could now send this to OCR processor for extraction. @Jeremy Dyer
... View more
12-20-2017
06:44 AM
1 Kudo
https://github.com/rkarthik29/pdfprocessors
... View more
09-26-2017
04:55 AM
5 Kudos
In this article we will go over how we can use nifi to ingest PDFs and while we ingest we will use a custom groovy script with executescript processor to extract images from this PDF. The images will be tagged with PDF filename, page num and imagenum , so that it can be indexed in hbase/solr for quick searching and doing some machine learning or other analytics.
The Groovy code below depends on the following jars. Download and copy them to a folder on your computer. I my case they were under /var/pdimagelib/.
pdfbox-2.0.7.jar fontbox-2.0.7.jar jai_imageio.jar commons-logging-1.1.1.jar
Copy the code below to a file on your computer, in my case the file was under /var/scripts/pdimage.groovy import java.nio.charset.*;
import org.apache.commons.io.IOUtils;
import java.awt.image.BufferedImage;
import java.awt.image.RenderedImage;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import javax.imageio.ImageIO;
import org.apache.pdfbox.cos.COSName;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.pdmodel.PDPage;
import org.apache.pdfbox.pdmodel.PDPageTree;
import org.apache.pdfbox.pdmodel.PDResources;
import org.apache.pdfbox.pdmodel.graphics.PDXObject;
import org.apache.pdfbox.pdmodel.graphics.image.PDImageXObject;
def flowFile = session.get();
if (flowFile == null) {
return;
}
def ffList = new ArrayList<FlowFile>()
try {
session.read(flowFile,
{inputStream ->
PDDocument document = PDDocument.load(inputStream);
int imageNum = 1;
int pageNum = 1;
document.getDocumentCatalog().getPages().each(){page ->
PDResources pdResources = page.getResources();
pdResources.getXObjectNames().each(){ cosName->
if (cosName != null) {
PDXObject image =pdResources.getXObject(cosName);
if(image instanceof PDImageXObject){
PDImageXObject pdImage = (PDImageXObject)image;
BufferedImage imageStream = pdImage.getImage();
imgFF = session.create(flowFile);
outputStream = session.write(imgFF,
{ outputStream->
ImageIO.write((RenderedImage)imageStream, "png", outputStream);
} as OutputStreamCallback)
imgFF = session.putAttribute(imgFF, "imageNum",String.valueOf(imageNum));
imgFF = session.putAttribute(imgFF, "pageNum",String.valueOf(pageNum));
ffList.add(imgFF);
imageNum++;
}
}
}
pageNum++;
}
} as InputStreamCallback)
session.transfer(ffList, ExecuteScript.REL_SUCCESS)
session.remove(flowFile);
} catch (Exception e) {
log.warn(e.getMessage());
e.printStackTrace();
session.remove(ffList);
session.transfer(flowFile, ExecuteScript.REL_FAILURE);
}
Below is a screenshot of executescript after it has been setup correctly. To ingest the PDF, i used a simple GetFile, though this approach should work for pdfs ingested with any other nifi processor. Below is a snapshot of the nifi flow. When a PDF is ingested, executescript will leverage groovy and pdfbox to extract images. The images will be tagged with the pdf filename,pagenum and imagenum. This can now be sent to hbase or any indexing solution for search or analytics. Hope you find the article useful. Please comment withy your thoughts/questions.
... View more
Labels: