Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11227 | 04-15-2020 05:01 PM | |
| 7131 | 10-15-2019 08:12 PM | |
| 3114 | 10-12-2019 08:29 PM | |
| 11495 | 09-21-2019 10:04 AM | |
| 4343 | 09-19-2019 07:11 AM |
01-18-2019
12:32 AM
@Manish Parab Your GetMongo processor is running on All nodes that means same data is pulled on all nodes.. If you select EvaluateJsonPath processor to run on Only primary node then all the other nodes flowfiles will be left queue before EvaluateJsonPath processor, Because you are not processing flowfiles that are pulled from all other nodes except of PrimaryNode. Run GetMongoProcessor only on primary node and keep EvaluateJsonPath Processor to run on all nodes, Reason to keep EvaluateJsonPath processor on all nodes if NiFi primary node changed then EvaluateJsonPath processor not going to processor the flowfile that are listed on old PrimaryNode.
... View more
01-17-2019
01:54 PM
@Anish Gupta Could you create the Hive table using JsonSerde and then apply explode array as per this SO thread.
... View more
01-17-2019
01:50 PM
@Haider Naveed You can use QueryRecord processor and select all the rows which don't have the backslash character using ApacheCalcite sql syntax. (or) Using RouteText processor add the matching regex (or) other Matching Strategies to identify the backslash character from the file.
... View more
01-16-2019
10:49 PM
@Venkat Use regexp_replace function in hive to replace $ with '' then cast to int. Example: select int(regexp_replace(string("124$"),'\\$',''));
+------+--+
| _c0 |
+------+--+
| 124 |
+------+--+
(or) Starting from Hive-1.3 version use replace function. select int(replace(string("124$"),'$',''));
+------+--+
| _c0 |
+------+--+
| 124 |
+------+--+
... View more
01-16-2019
11:46 AM
@Mitthu Wagh Try to upload this template. json-to-csv.xml
... View more
01-16-2019
01:20 AM
@Mitthu Wagh Make sure you are having ${inferred.avro.schema} attribute is associated with the flowfile. I am not able to reproduce the error as i recreated same scenario on my end i have tried on NiFi-1.8.0 here is the template of my flow. Upload the template to your instance and run, Let us know if you are having issue.json-to-csv-232797.xml
... View more
01-16-2019
12:18 AM
1 Kudo
@Deppu If you are able to identify insert/updates then set statement.type attribute to the flowfile then based on the attribute value PutDatabaseRecord processor will run Update/Insert/Delete Statements. - This is kind of hacky way to do this: If you are not able to identify inserts/updates then split the records into each record individually using SplitRecord processor then run two/three successive PutDatabaseRecord processors.
You need to make decision how to identify Deletes/Updates. because if insert fails then you can try with either update statement type (or) delete statement type. Flow: 1. FirstPutDatabaseRecord //insert statementtype use failure connection to next PutDatabaseRecord
2.SecondPutDatabaseRecord //update statementtype,use failure connection to next PutDatabaseRecord.
3.ThirdPutDatabaseRecord //delete statementtype Note: I'm guessing this flow but rearrange these processors as per your logic.
... View more
01-12-2019
06:53 PM
@Yasir Khokhar
If you are using DBCPConnectionPoolLookup controller service for ExecuteSQL processor then each flowfile feeding to ExecuteSQL processor needs to have database.name attribute with the value of DBCPconnectionpool name that is given. - Now based on database.name attribute value ExecuteSQL processor is going to select connectionpool dynamically, so we are going to use one ExecuteSQL processor using multiple DBCPconnectionPool dynamically. - Use UpdateAttribute Processor to set database.name attribute to the flowfile and configure/enable DBCPConnectionPoolLookup service select this service in ExecuteSQL processor.
... View more
01-12-2019
02:58 AM
1 Kudo
@Yasir Khokhar
Starting from NiFi-1.7 version we can dynamically assign connection pooling service by using DBCPConnectionPoolLookup controller service. From documentation: 1.Provides a DBCPService that can be used to dynamically select another DBCPService. This service requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception if the attribute is missing.
2.The value of 'database.name' will be used to select the DBCPService that has been registered with that name.
3.This will allow multiple DBCPServices to be defined and registered, and then selected dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute If you are using NiFi version <1.7.0 then we need to have to define each DBCPConnection controller service for each connection of ExecuteSql processor.
... View more
01-12-2019
02:40 AM
1 Kudo
@Satya G Read the CSV file with header as described here: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader Once you are able to read the csv file with header then use .select method and select the col's as #pyspark: df= spark.read.csv(<file>).option("header", "true") //read the csv with header
df1=df.select("A","B","C","D") //select the columns in an order
df1.write.mode("<overwrite/append>").saveAsTable("<db_name>.<tab_name>")
... View more