Member since
05-02-2016
154
Posts
54
Kudos Received
14
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4184 | 07-24-2018 06:34 PM | |
5784 | 09-28-2017 01:53 PM | |
1437 | 02-22-2017 05:18 PM | |
14287 | 01-13-2017 10:07 PM | |
3972 | 12-15-2016 06:00 AM |
02-07-2018
04:22 AM
1 Kudo
The solution in this article was tested for DB2 and could work for other databases as well , that are not available in the GenerateTableFetch Database Type dropdown. I was trying to develop a flow that extracted data from a few tables in DB2 database and the goal was to land that data into HDFS. Our flow was a Basic ListDatabaseTables followed by a GenerateTableFetch followed by a ExecuteSSQL. The idea was to generate multiple queries that could parallelly extract data from the db2 nodes, utilizing all the 3 NiFi nodes in the cluster. The problem we ran into was that there is no option for DB2 in the GenerateTableFetch DBType property. Which means we had to select the generic option. Upon running the flow, the select query failed to execute on DB2. On investigating we found that the query generated by GenerateTableFetch looked like this select id,name,update_dt,state,city,address from customers where update_dt<='01-01-2018 12:00:00' order by update_dt offset 10000 limit 10000
The limit and offset syntax does not work with DB2, so we basically could not use the query generated by GenerateTableFetch. our first thought was to see if we could replcatetext to some hack offset 10000 limit 1000 , to a format that DB2 like "offset 10000 rows fetch first 10000 rows" . I quickly realised that would not be easy. Thats when NiFi EL came to the rescue. Noticed that apart from generating the query in the flowfile content GenerateTableFetch also puts the different pieces that form the query into the flowfiles attribute. pieces like the columns to output, the column to order on , the where clause. Using these attributes and EL , i could recreate the query. Update the ExecuteSQL "select query" property to something like this select ${generatetablefetch.columnnames} from ${generatetablefetch.tablename} where ${generatefetchtable.whereClause} order by ${generatetablefetch.maxColumnNames} offset ${generatetablefetch.offset} rows fetch first ${generatetablefetch.limit} rows only and vola , it works. to see the different attributes GenerateTableFetch writes to the output flowfile, refer to the documentation here.
... View more
Labels:
12-20-2017
06:47 AM
was going through some ORC based processing from pdf. looks it works better if each page is split into a monochrome image. Examples online show Ghostscript as an option. I was able to leverage this processor to extract images and with a property change it to grayscale if needed. I could now send this to OCR processor for extraction. @Jeremy Dyer
... View more
09-26-2017
04:22 AM
With nifi 1.3 you have the record based processors. So you can forward the output of executesql or your custom processor to QueryRecord. Set it up to read Avro. Now add a query like so select col_a,col_b..'${query_id}','${query_time}','${query_end_time}' from flowfile'. this will add the data you need to the query resultset.
... View more
03-15-2019
03:04 PM
Use the ExecuteStreamCommand Processor and use the sed command something like this. This worked for me.
... View more
02-03-2017
04:56 PM
3 Kudos
I was looking for a way to easily forward and analyze provenance data that is available in nifi. There were a couple of options available.
You could use the nifi-rest api to search for provenance data and then use the results for analysis or storing in a database. The other alternative was to setup a Site2Siteprovenancereportingtask in nifi. This will forward provenance events to a flow in nifi. option 1 is a very techy option , you could point you UI directly to the rest api and present a nice provenance visual with bulk replay capabilities. But, then it makes the developer responsible for keeping up with changes in the nifi rest api. It would be nice if we did not have direct dependency. Also, you might want to lockdown the rest api in production. option 2 is very easy , but it is limited in where i can send those provenance events. The apache nifi eng team resolved this situation with a ScriptedReportingTask controller service. It gives you an easy way of setting up the provenance reporting in Nifi and forwarding it to an end point of your choice. You also do not have a direct dependency between your application and nifi rest api. You can use ScriptedReportingTask to massage the events into a format that works with you application/endpoint. I chose groovy as the language for my script, but there is options for python,javascript and a few others. once you are logged in to nifi . Click the menu on the top right corner. Select controller settings option. On the Controller setting dialog, choose the Reporting Tasks tab. Click the + on the top right corner to create a new reporting task. On the Add reporting task dialog, search for ScriptedReportingTask. Double click on ScriptedReportingTask option in the results or select the row and click Add. You will see a new ScriptedReportingTask in the reporting tasks list. Click on the pencil icon , to edit the reporting task. You will see a reporting task window. Select groovy as the Script Engine choice and paste the script below in Script Body. Make sure to change the location of the file where your events will be written to. import groovy.json.*;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
final StateManager stateManager = context.getStateManager();
final EventAccess access = context.getEventAccess();
final ProvenanceEventRepository provenance = access.getProvenanceRepository();
log.info("starting event id: " + Long.toString(1));
final List<ProvenanceEventRecord> events = provenance.getEvents(1, 100);
log.info("ending event id: " + events.size());
def outFile = new File("/tmp/provenance.txt");
outFile.withWriter('UTF-8') { writer -> events.each{event -> writer.writeLine(new JsonBuilder(event).toPrettyString()) }}
Click ok and apply. Click on the "Play " Button to active the reporting task. I had set the scheduling frequency for the task on mine to 10 secs, so i could see the results right away. You can set it to a higher value as needed. You should the logs appear in /tmp/provenance.txt , in json format. you could use other formats if needed and also may be not prettify for better performance. The ScriptReportingTask is repsponsible for the ReportingContext , which is available to your scripts as the context object. You can log information to the nidi-log using the ComponentLog log object, which is also passed to you by the reporting task. If you need anyother variables to be set in from the nifi task, you can define them as dynamic properties. My script is very simple, it will look at 100 provenance events from the first provenance event. You can use the statemanager to keep track of the last provenance event that you received. You look at the implementation by @jfrazee to see how we can incrementally collect provenance events. https://github.com/jfrazee/nifi-provenance-reporting-bundle Thank you to @Matt Burgess for putting together this very useful reportintask component. Hope this is useful.
... View more
Labels:
01-26-2017
05:36 PM
1 Kudo
@srini
The default "FlowFIle Policy" is "use clone" within the advanced UI. For an "and" operation, you will want change that to "use original". Just keep in mind (does not apply here since both rules are unique) that if more then one rule tries to perform the exact same "Actions", only the last rule in the list that will be applied to the original FlowFile on output. Both "Rules" are run against every FlowFile that passes through this UpdateAttribute processor, so it will work in cases where only one or the other is set and cases where both are not set. Rules are operated on in the order they are listed. You can drag rules up and down in the list to order them how you like. If this answer addresses your question, please click "accept". Thank you, Matt
... View more
12-15-2016
03:11 PM
5 Kudos
A usual query that comes up is on using snappy and other compression codes when loading data from hdfs using the nifi puthdfs component. A common error that users come across is "java.lang.UnsatisfiedLinkError". This error occurs because snappy and other compression codecs, which are part of the native linux binaries and which the java libraries for these codecs utilize to do the actual compression, are not in the path of the JVM. Since the JVM cannot find them the bindings fails and you get a java.lang.UnsatisfiedLinkError. Follow the following steps to resolve this issue. 1. copy the native folder containing the compression libraries, from one of your hadoop nodes. cd /usr/hdp/x.x.x.x-xx/hadoop/lib/ tar -cf ~/native.tar native/ 2. scp the native.tar from your hadoop node to your NiFi node, untar to a location of your choice. in my case i use /home/myuser/hadoop/ cd ~ mkdir hadoop cd hadoop tar -cf /path/to/native.tar 3. go to your nifi folder , and open conf/bootstrap.conf and add the following jvn argument for java.library.path pointing to your folder containing the native hadoop binaries. (/home/myuser/hadoop/native in my case ) java.arg.15=-Djava.library.path=/home/myuser/hadoop/native/ i used 15 because that was the number for the last jvm argument in my bootstrap.conf. Alternately, you can edit bin/nifi-env.sh and add export LD_LIBRARY_PATH=/home/mysuser/hadoop/native/ 4. restart nifi.
... View more
Labels:
12-13-2016
04:44 PM
After running ambari-server setup --jdbc-db=mysql --jdbc-driver=/usr/share/java/mysql-connector-java.jar Everything works - restart & service check ! Thank you so much !
... View more
12-15-2016
06:16 PM
2 Kudos
Thanks @Karthik Narayanan. I was able to resolve the issue. Before diving into the solutions, I should make the below statement - With NiFi 1.0 and 1.1,
LZO compression cannot be achieved using the PutHDFS processor. The only supported compressions are the ones listed in the compression codec drop down. With the LZO related classes being present in the core-site.xml, the NiFi processor fails to run. The suggestion from the previous HCC post was to remove those classes. It needed to be retained so that NiFi's copy and HDP's copy of core-site are always in sync.
NiFi 1.0
I created the hadoop-lzo jar by building it from sources and added the same to the NiFi lib directory and restarted NiFi.
This resolved the issue and I am able to proceed using the PutHDFS without it erroring out. NiFi 1.1
Configure the processor's additional classpath to the jar file. No restart required.
Note : This does not provide LZO compression, it just can run the processor without ERROR even when you have the LZO classes in the core site.
UNSATISFIED LINK ERROR WITH SNAPPY I also had issue with Snappy Compression codec in NiFi. Was able to resolve it setting the path to the .so file. This did not work on the ambari-vagrant boxes, but I was able to get this working on an openstack cloud instance. The issue on the virtual box could be systemic.
To resolve the link error, I copied the .so files from HDP cluster and recreated the links. And as @Karthik Narayanan suggested, added the java library path to the directory containing the .so files. Below is the list of .so and links
And below is the bootstrap configuration change
... View more
- « Previous
-
- 1
- 2
- Next »