Member since
06-08-2017
1049
Posts
510
Kudos Received
312
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5979 | 04-15-2020 05:01 PM | |
3220 | 10-15-2019 08:12 PM | |
1216 | 10-12-2019 08:29 PM | |
6301 | 09-21-2019 10:04 AM | |
2064 | 09-19-2019 07:11 AM |
04-15-2020
05:01 PM
Hi @ChineduLB , You can use `.groupBy` and `concat_ws(",",collect_list)` functions and to generate `ID` use `row_number` window function. val df=Seq(("1","User1","Admin"),("2","User1","Accounts"),("3","User2","Finance"),("4","User3","Sales"),("5","User3","Finance")).toDF("ID","USER","DEPT") import org.apache.spark.sql.expressions.Window df.groupBy("USER"). agg(concat_ws(",",collect_list("DEPT")).alias("DEPARTMENT")). withColumn("ID",row_number().over(w)). select("ID","USER","DEPARTMENT").show()
... View more
10-18-2019
07:24 PM
We can store state to DistributeMapCache/HDFS/Hive/Hbase and pullled the stored state and use it in ExecuteSQL processor to incrementally pull the data. Try with this approach mentioned in this link. - In QueryDatabaseTable processor we need to configure max value columns so that nifi stores the state in to QueryDatabaseTable processor. In next run QueryDatabaseTable pulls only the latest changes from the table.
... View more
10-15-2019
08:25 PM
Change the transaction manager to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager then try to run your update query. Run this command in hive shell: --creating hive table with transaction enabled.
CREATE TABLE table_name (
id int,
name string
)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true");
--changing transaction manager
set hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
--insert data into table
insert into table_name values(1,"a"),(2,"b");
--update data in the table
update table_name set name="c" where id =1;
--delete specific id from table
delete from table_name where id=1; Refer to this link for more details about transaction hive tables.
... View more
10-15-2019
08:12 PM
1 Kudo
In EvaluateJsonPath processor add new property to extract event_id value from the flowfile. if flowfile is not having event_id then nifi adds empty value to the attribute. EvaluateJsonPath Configs: Then by using RouteOnAttribute processor we can check the attribute value and route the flowfile accordingly. RouteOnAttribute Configs: not null value ${event_id:isEmpty():not()} null value ${event_id:isEmpty()} Then use null value and not null value relationships for further processing..!!
... View more
10-12-2019
08:29 PM
1 Kudo
@Gerva Select count(*) from <table>; Query launches map reduce job and output will be displayed to the console. - If you want to store the output to file then use Insert overwrite directory '<directory_name>' select count(*) from scenariox; Now output of map reduce job will be stored into the given hdfs directory and you can find 00000_0 file in directory..
... View more
10-12-2019
07:17 PM
@sureshpathipati one way would be using Route Text processor to get first field value until first space.. and then apply NiFi expression language substring function on the extracted attribute value to check/route the line based on the value.. Regex to capture first field value until first space.. ([^ ]+)(.*) Another method: As you are having fixed width file so by using ReplaceText Processor we can create delimited file and by using QueryRecord processor add new SQL query to check substring(firstfield_name,Start,endposition)="00" Add two new queries in QueryRecord processor to check substring value and route the records dynamically based on matched criteria. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
09-21-2019
10:04 AM
@budati For this case define your avro schema(with one field) to read incoming flowfile with some delimiter that doesn't exist in flowfile. So that whole row will be read as string then we can filter out the records by using not like (or) using regex operator in apache calicite. Select * from flowfile where col1 not like 'SKIP' Now output flowfile will not having any records that have SKIP in them and this solution will work dynamically for any number of columns.
... View more
09-20-2019
09:42 AM
@budati Define Avro schema for record reader as col1 and col2...etc. Treat first line as header property value as false Add new query in QueryRecord processor as select * from FLOWFILE where col1 != "SKIP" (or) select * from FLOWFILE where col1 <> "SKIP" **NOTE** assuming col1 has "SKIP" in it. For record writer define avro schema with your actual fileldnames. Now queryrecord will exclude all the records that have "SKIP" in them and writes the flowfile with actual fieldnames in mentioned format.
... View more
09-19-2019
07:11 AM
1 Kudo
@budati i don't think there is a way to combine all 3 processors into one. We still need to use ExecuteSQL -> ConvertAvroToJson -> EvaluateJsonPath to extract the values from the flowfile. If the answer was helpful to resolve your issue, Accept the answer to close the thread 🙂
... View more
09-17-2019
09:40 PM
@budati Did you tried using NiFi DBCPConnectionLookup service and we can make Dynamic lookup from RDBMS. - Please refer to this link for more details regards to LookupService.
... View more
09-17-2019
09:31 PM
@budati You can use QueryRecord processor and add new SQL query to select only the records that don't have value "SKIP" for the field by using Apache Calicite SQL parser. - For more reference regards to QueryRecord processor refer to this link.
... View more
09-11-2019
09:44 AM
@VijayM Try to run msck repair table hive> msck repair table <db_name>.<table_name>; then run select and filter queries on the table. For more details regards to msck repair table please refer to this link.
... View more
09-09-2019
09:43 PM
@budati In mergeContent processor we can add correlation id property value then use MergeFormat as Zip to zip the files. Then by using PutS3Object to store files into S3.
... View more
09-09-2019
09:37 PM
@Rohit1981 Try using ValidateRecord processor as described in this article with Strict Type Checking is set to "true" and to parse fixed width file refer to this link for detailed explanation for parsing.
... View more
09-09-2019
09:14 PM
@ANMAR Try with this regex in ExtractText processor. (?:"x":.\w+?)(\d+) This regex will extract only the digit in "x" key and adds that value for "y" key in ReplaceText processor.
... View more
09-09-2019
09:07 PM
1 Kudo
@ANMAR Try with this regex (?:"b"\s*:\s*)"(.*?)", This will extract only 24 - ny value from the given event
... View more
09-09-2019
08:01 PM
1 Kudo
@ANMAR Try with this regex (?:\"key2\"\\s*:\\s*)(.*?), This will extract only the value of "key2" key i.e "value2" - if you don't need quotes to be extracted then use this regex (?:\"key2\"\\s*:\\s*)"(.*?)", This will extract only the value of "key2" key i.e value2
... View more
09-09-2019
07:54 PM
@ANMAR Try with this regex in ExtractText processor. (?:"x":.\w+?)(\d+) This regex will extract only the digit in "x" key and adds that value for "y" key in ReplaceText processor.
... View more
09-08-2019
08:14 PM
1 Kudo
@ANMAR You need to use ExtractText processor and matching regex to extract only the integer value. --------------------------------------------- Add new property in ExtractText processor val (\d+) - Then use ReplaceText processor with below configs: Search Value } Replacement Value ,"y":"${val}"} Character Set UTF-8 Maximum Buffer Size 1 MB Replacement Strategy Literal Replace Evaluation Mode Entire text - By using Replacetext processor we are extracting the value and adding "y" key with the extracted value. -------------------------------------------- Input data: {"x":"avc123.abc.com"} Output: {"x":"avc123.abc.com","y":"123"}
... View more
09-06-2019
11:16 PM
@RandomT You can check compression on .avro files using avro-tools bash$ avro-tools getmeta <file_path> For more details refer to this link - sqlContext.setConf //sets global config and every write will be snappy compressed if you are writing all your data as snappy compressed then you should use this method. - In case if you are compressing only the selected data then use exampleDF.write.option("compression", "snappy").avro("output path") for better control over on compression.
... View more
08-12-2019
01:26 AM
@Raymond Cui Try with adding new attribute in UpdateAttribute processor as epochtime ${file.creationTime:toDate("yyyy-MM-dd'T'HH:mm:ss+0000"):toNumber()}
Then nifi will match the format and convert to epoch time.
... View more
08-06-2019
02:11 AM
@Imad Anis I don't think this is possible as GetSNMP processor doesn't accept any incoming connections so we cannot use the processor for multiple hosts. Only way now i can see is to use Multple GetSNMP processors and keeping hostnames hard coded in them.
... View more
08-06-2019
02:07 AM
@Satish Karuturi This is an expected behaviour from ExecuteStreamCommand processor and best practice is to place shell script file on all nodes of NiFi cluster. As you are having 2 node nifi cluster and we are not able to control which will be primary node and ExecuteStreamCommand processor will run only on primary node. In case of primary node changes nifi will pick the shell script from the active primary node and continue to execute the script without any issues..! In addition you can also use ExecuteProcess processor to execute Shell script in NiFi.
... View more
08-02-2019
12:59 AM
@Joseph Patrick Try with below regex: Search Value (\\{\"categoryid\")(.*?)(\"\\},)
... View more
07-31-2019
01:40 PM
@Ash C You can use NiFi RestAPI to get the state of the processor. Sample Rest api call: curl -i -X GET http://<host>:<port>/nifi-api/processors/<processor-id>/state
... View more
07-31-2019
01:33 PM
@Rohini Mathur Please check this and this link to get location of the hive table.
... View more
07-31-2019
02:53 AM
@Rohini Mathur Using Shell script: one way of doing this would be using Shell script and to get all tables from the database show tables from <db_name>; then store all the tables into a variable and then loop through the each variable and execute show create table <table_name>; command. Using Spark: Another way would be Using spark.catalog.listtables("<db_name>") to list out all the tables from database then filter out only the managed tables and execute show create table on the list of managed tables.. Using Hive metastore db: Hive stores all the table information in metastore like mysql..etc you can also get information about tables from metastore also.
... View more
07-31-2019
02:39 AM
@Erkan ŞİRİN, Try specifying defaultFS,resourcemanager address val spark = SparkSession.builder().master("yarn")
.config("spark.hadoop.fs.defaultFS","<name_node_address>")
.config("spark.hadoop.yarn.resourcemanager.address","<resourcemanager_address>")
.app_name("<job_name>")
.enableHiveSupport()
.getOrCreate() and then add spark-yarn_x.x.jar to maven repository and try to run again.
... View more
07-29-2019
11:34 PM
@Rohit Sinha I don't think we can specify location and bucketing in Hive-Hbase integration. Try with this create table statement: CREATE external TABLE user_db.test_hbase( col_1 String, col_2 String )
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,test_hbase:col_2")
TBLPROPERTIES("hbase.table.name" = "user_db:test_hbase");
... View more
07-29-2019
08:46 PM
@Noah Brace Use ConvertExcelToCSVProcessor to convert into CSV processor then by using SplitRecord processor to write only the required column with Records for Splits as 1. Use ExtractText processor to extract the content as flowfile attribute then pass the attribute name in FetchFile processor Flow: 1.ConvertExcelToCSV
2.SplitRecord //configure to read the csv and write only the path column & with records per split as 1
3.ExtractText //add new property as fn with value as (.*) & now we will have attribute named fn to flowfile.
4.FetchFile/FetchSFTP/FetchFTP //keep filename property value as ${fn}
5.other processors.
... View more