Member since
05-02-2016
154
Posts
54
Kudos Received
14
Solutions
07-09-2020
02:23 PM
@sajidiqubal CAn you share more info. The solution is simple, the spart streaming jobs needs to find the kafka-jaas and the corresponding keytab. Make sure both paths are accessible on all machines. So kafka-jaas and the keytab need to be in the local folder and not hdfs. If you need in hdfs, then it needs to be sent it as a part of the spark --files and --keytab arguments ( iirc). In newer versions of kafka, you can add the jaas info as a kafka parameter using sasl.jaas.config. see ex. below. In that case you just need the keytab to be available on all machines. sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/keytabs/kafka_client.keytab" \
principal="kafkaclient1@EXAMPLE.COM";
you will also need a parameter sasl.kerberos.service.name=kafka From your error it looks like the code is not able to find one of the files, the jaas.conf or the keytab. Please check and make sure the file is in the right path and on all yarn nodes.
... View more
08-16-2018
05:48 AM
3 Kudos
Was trying to dig up some TSP benchmark info for nifi listenhttp, which allows for providing a rest proxy, with no luck, so i tried to create one myself. This is a very rough effort, i can improve it by capturing how many client instances i have running , so we can see the TPS drop and rise as the clients drop and rise. At some point i ran out of servers and resources to run more clients, you will the chart drops off at the end , before going back up. This is where i noticed some of my clients had crashed and i restarted them. Anyway, getting to the matter. Purpose The benchmark only meassure how much load can a ListenHTTP processor handle , when subjected to real world traffic. Setup The nifi cluster is setup on an m4.4xlarge ( 16 cores CPU, 32 GB RAM), The node is also hosting the kafka broker and zookeeper. HDF version is 3.1.1 The NiFi is a simple Listenhttp processor forwarding to updateattribute. updateattribute burns the flowfile. The idea was to only measure Listenhttp performance for receiving a message, create flowfile, respond to client and forward the message to next processor. The benchmark tries to measure what kind of peak TPS could be achieved. The NiFi instance is running a S2S provenance task, which forwards provenance event to another nifi instance, which further forwards it to a kafka topic. The data is then ingested into Druid using kafka ingestion. timestampmillis column of the provenancce event will be used by druid for indexing. For the client piece i have a simple python script that constantly calls the rest service exposed by listenhttp, passing the below json. The timestamp in the json is just to ensure the messages are different. {“key”:”clien1”,”timestamp”:<current_unix_time>}. The python is a simple infinite loop in the below format. import requests
import time
import random
from multiprocessing import Process
import os
import json
import threading
from time import sleep
def call_rest():
values=["client1"]
value = random.choice(values)
start = time.time()
timestamp = round(time.time()*1000)
r = requests.post('http://nifi1.field.hortonworks.com:19192/test',data = json.dumps({"key":value,"timestamp":timestamp}))
while True:
threads = []
for i in range(5):
t = threading.Thread(target=call_rest)
threads.append(t)
t.start()
I ran 5 instances of the script across 8 servers to help me generate the kind of volume i needed for this test. Dashboard Once the data is in druid, i can utilize superset to chart and aggregate the provenance events at an interval of one second. Since the provenance events can take a few minutes to arrive, i used a one minute window from 5 minutes ago, meaning from t-5 to t-4 timestamps. This what i saw on the chart, I also filterd by query to only look for componentType=Listenhttp and eventType=RECEIVE. From the above chart we can see that the rate fluctuates from a max of 3000 TPS max to around 600 TPS minimum. To get a better aggregation or a even aggregation, i aggregated this over 5 minute interval over an hour to see what we are doing on average...The chart was pretty promising. So on an average we are looking at 300k messages per 5 minutes, which is around 1000 TPS. Conclusion The 1000 TPS we se see from NiFi from this above load test, is not probably what the max load it can handle, i can try and run my tasks on more severs and see if we see higher numbers. But, at 1000 TPS , NiFi should be able to handle most web based traffic. Additionaly this is on a clusert with one node of NiFi, we can linearly scale by adding more nodes to the cluster .
... View more
Labels:
08-07-2018
05:23 PM
3 Kudos
Nifi build on HDF 3.1.2 and HDF 3.1.0 fail with a depency issue when trying to push data to ADLS. This is because the new version of ADLS has some dependency on hadoop 2.8 feature, which is not available in 2.7.3 which is referenced by nifi. to fix this you can build nifi again. You could eith build it against hadoop 2.8 or againt hdp 2.6.x which should have the classes that ADLS depends on. to do that git clone nifi repository
cd <nif-repo-home>/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/
vi pom.xml add a hadoop.version property to the pom.xml as shown below. if already set, no change is needed. change nifi version to match the nifi version you are running for the parent <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-bundle</artifactId>
<version>1.5.0.3.1.2.0-7</version>
</parent>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
<curator.version>2.11.0</curator.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
cd ..
vi pom.xml
change the nifi-nar-bunlde version to match your nifi version as shown below <parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.5.0.3.1.2.0-7</version>
</parent>
run the maven build using the command below. change hadoop.version to match your version of hadoop. the nar will avaliable under nifi-hadoop-libraries-nar/target folder. Take that nar and replace the existing nar under nifi/lib. mvn clean package -Dhadoop.version=2.7.3.2.6.5.0-292
Ensure you have the right jars for adls downloaded into a folder accesible to nifi. Add the folder path to Additional classpath option in PUTHDFS. for hdp 2.6.x you can find the needed jars under /usr/hdp/2.6.x..../hadoop/ you will need the following jars. azure-data-lake-store-sdk-2.2.5.jar
hadoop-azure-2.7.3.2.6.5.0-292.jar
hadoop-azure-datalake-2.7.3.2.6.5.0-292.jar
start nifi and push files , hope it works.
... View more
Labels:
10-17-2018
05:01 AM
sorry just saw this message. could you provide more details
... View more
02-07-2018
04:22 AM
1 Kudo
The solution in this article was tested for DB2 and could work for other databases as well , that are not available in the GenerateTableFetch Database Type dropdown. I was trying to develop a flow that extracted data from a few tables in DB2 database and the goal was to land that data into HDFS. Our flow was a Basic ListDatabaseTables followed by a GenerateTableFetch followed by a ExecuteSSQL. The idea was to generate multiple queries that could parallelly extract data from the db2 nodes, utilizing all the 3 NiFi nodes in the cluster. The problem we ran into was that there is no option for DB2 in the GenerateTableFetch DBType property. Which means we had to select the generic option. Upon running the flow, the select query failed to execute on DB2. On investigating we found that the query generated by GenerateTableFetch looked like this select id,name,update_dt,state,city,address from customers where update_dt<='01-01-2018 12:00:00' order by update_dt offset 10000 limit 10000
The limit and offset syntax does not work with DB2, so we basically could not use the query generated by GenerateTableFetch. our first thought was to see if we could replcatetext to some hack offset 10000 limit 1000 , to a format that DB2 like "offset 10000 rows fetch first 10000 rows" . I quickly realised that would not be easy. Thats when NiFi EL came to the rescue. Noticed that apart from generating the query in the flowfile content GenerateTableFetch also puts the different pieces that form the query into the flowfiles attribute. pieces like the columns to output, the column to order on , the where clause. Using these attributes and EL , i could recreate the query. Update the ExecuteSQL "select query" property to something like this select ${generatetablefetch.columnnames} from ${generatetablefetch.tablename} where ${generatefetchtable.whereClause} order by ${generatetablefetch.maxColumnNames} offset ${generatetablefetch.offset} rows fetch first ${generatetablefetch.limit} rows only and vola , it works. to see the different attributes GenerateTableFetch writes to the output flowfile, refer to the documentation here.
... View more
Labels:
12-20-2017
06:47 AM
was going through some ORC based processing from pdf. looks it works better if each page is split into a monochrome image. Examples online show Ghostscript as an option. I was able to leverage this processor to extract images and with a property change it to grayscale if needed. I could now send this to OCR processor for extraction. @Jeremy Dyer
... View more
02-03-2017
04:56 PM
3 Kudos
I was looking for a way to easily forward and analyze provenance data that is available in nifi. There were a couple of options available.
You could use the nifi-rest api to search for provenance data and then use the results for analysis or storing in a database. The other alternative was to setup a Site2Siteprovenancereportingtask in nifi. This will forward provenance events to a flow in nifi. option 1 is a very techy option , you could point you UI directly to the rest api and present a nice provenance visual with bulk replay capabilities. But, then it makes the developer responsible for keeping up with changes in the nifi rest api. It would be nice if we did not have direct dependency. Also, you might want to lockdown the rest api in production. option 2 is very easy , but it is limited in where i can send those provenance events. The apache nifi eng team resolved this situation with a ScriptedReportingTask controller service. It gives you an easy way of setting up the provenance reporting in Nifi and forwarding it to an end point of your choice. You also do not have a direct dependency between your application and nifi rest api. You can use ScriptedReportingTask to massage the events into a format that works with you application/endpoint. I chose groovy as the language for my script, but there is options for python,javascript and a few others. once you are logged in to nifi . Click the menu on the top right corner. Select controller settings option. On the Controller setting dialog, choose the Reporting Tasks tab. Click the + on the top right corner to create a new reporting task. On the Add reporting task dialog, search for ScriptedReportingTask. Double click on ScriptedReportingTask option in the results or select the row and click Add. You will see a new ScriptedReportingTask in the reporting tasks list. Click on the pencil icon , to edit the reporting task. You will see a reporting task window. Select groovy as the Script Engine choice and paste the script below in Script Body. Make sure to change the location of the file where your events will be written to. import groovy.json.*;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
final StateManager stateManager = context.getStateManager();
final EventAccess access = context.getEventAccess();
final ProvenanceEventRepository provenance = access.getProvenanceRepository();
log.info("starting event id: " + Long.toString(1));
final List<ProvenanceEventRecord> events = provenance.getEvents(1, 100);
log.info("ending event id: " + events.size());
def outFile = new File("/tmp/provenance.txt");
outFile.withWriter('UTF-8') { writer -> events.each{event -> writer.writeLine(new JsonBuilder(event).toPrettyString()) }}
Click ok and apply. Click on the "Play " Button to active the reporting task. I had set the scheduling frequency for the task on mine to 10 secs, so i could see the results right away. You can set it to a higher value as needed. You should the logs appear in /tmp/provenance.txt , in json format. you could use other formats if needed and also may be not prettify for better performance. The ScriptReportingTask is repsponsible for the ReportingContext , which is available to your scripts as the context object. You can log information to the nidi-log using the ComponentLog log object, which is also passed to you by the reporting task. If you need anyother variables to be set in from the nifi task, you can define them as dynamic properties. My script is very simple, it will look at 100 provenance events from the first provenance event. You can use the statemanager to keep track of the last provenance event that you received. You look at the implementation by @jfrazee to see how we can incrementally collect provenance events. https://github.com/jfrazee/nifi-provenance-reporting-bundle Thank you to @Matt Burgess for putting together this very useful reportintask component. Hope this is useful.
... View more
Labels:
12-15-2016
03:11 PM
5 Kudos
A usual query that comes up is on using snappy and other compression codes when loading data from hdfs using the nifi puthdfs component. A common error that users come across is "java.lang.UnsatisfiedLinkError". This error occurs because snappy and other compression codecs, which are part of the native linux binaries and which the java libraries for these codecs utilize to do the actual compression, are not in the path of the JVM. Since the JVM cannot find them the bindings fails and you get a java.lang.UnsatisfiedLinkError. Follow the following steps to resolve this issue. 1. copy the native folder containing the compression libraries, from one of your hadoop nodes. cd /usr/hdp/x.x.x.x-xx/hadoop/lib/ tar -cf ~/native.tar native/ 2. scp the native.tar from your hadoop node to your NiFi node, untar to a location of your choice. in my case i use /home/myuser/hadoop/ cd ~ mkdir hadoop cd hadoop tar -cf /path/to/native.tar 3. go to your nifi folder , and open conf/bootstrap.conf and add the following jvn argument for java.library.path pointing to your folder containing the native hadoop binaries. (/home/myuser/hadoop/native in my case ) java.arg.15=-Djava.library.path=/home/myuser/hadoop/native/ i used 15 because that was the number for the last jvm argument in my bootstrap.conf. Alternately, you can edit bin/nifi-env.sh and add export LD_LIBRARY_PATH=/home/mysuser/hadoop/native/ 4. restart nifi.
... View more
Labels: