Member since
06-08-2017
1049
Posts
510
Kudos Received
312
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5948 | 04-15-2020 05:01 PM | |
3199 | 10-15-2019 08:12 PM | |
1214 | 10-12-2019 08:29 PM | |
6274 | 09-21-2019 10:04 AM | |
2062 | 09-19-2019 07:11 AM |
07-29-2019
08:37 PM
@Ram S You need to use QueryRecord processor for this case and add new query to the processor by using Apache Calcite SQL syntax. Use Window functions using lag to get previous row value for all the other rows in a partition.
... View more
07-28-2019
10:45 PM
@Erkan ŞİRİN Did you try using yarn-client (or) yarn-client instead of yarn in .master. If error still exists then add spark-yarn.jar to the build path, then try to submit the job again. Refer to this link for more details about similar issue.
... View more
07-26-2019
01:30 AM
@Yogesh Kumar This is an expected behaviour from ConvertJSONToSql processor and if you want to view the values for columns check flowfile attributes. Refer to this link for more details regards to similar question https://community.hortonworks.com/questions/155492/can-someone-tell-me-how-to-update-a-record-in-orac.html Using ConvertJsonToSql is an older approach, instead use PutDatabaseRecord processor and define RecordReader controller service to read the incoming flowfile.
... View more
07-25-2019
03:32 AM
@Shailuk Could you give password in ListSFTP processor and then try to run the processor again?
... View more
07-25-2019
03:29 AM
@Farooq Mustafa If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
07-25-2019
03:28 AM
@Thuy Le If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
07-25-2019
03:26 AM
@Chandan Singh You can use QueryRecord processor for this case and use Apache Calcite sql parser. - Add new property to the processor as select substring(0,9)Phone...etc from FLOWFILE - Define Record Reader(csvreader)/Writer(jsonsetwriter) controller services and resultant flowfile from processor will be in JSON format.
... View more
07-23-2019
03:42 AM
1 Kudo
@Shailuk Schedule GetSFtp processor to run on Primary node with Run Schedule as 0 Sec then processor will try to run everypossible sec and pulls the file from configured directory. **NOTE** if we don't delete the file from the path then GetSFTP processor will pull the same file again and again because GetSFTP processor doesn't store the state. Correct Approach: Use ListSFTP + FetchSFTP processors and configure ListSFTP processor to run on primary node with Run schedule as 0 sec and this processor stores the state and runs incrementally by listing out only the newly added files in the directory. FetchSFTP processor fetches the files from the directory and then use PutFile processor to store the files into Local machine.
... View more
07-23-2019
03:23 AM
1 Kudo
@Farooq Mustafa That is expected behaviour from NiFi: if you `Right Click` on specific/some processors and choose Create Template will create a template for those processors only. if you `Right Click` on canvas where no processor has been selected then NiFi creates template including all processors. You can also use NiFi Registry instead of Templates to keep versioning and import/export workflows into other environments and keep them in sync with each other.
... View more
07-23-2019
01:30 AM
@Thuy Le If you have array then configure EvaluateJson processor and add new property as $.[2].length() //means if you have 2 json objects in array then attribute value will be 1 else empty. Then by using RouteOnAttribute processor check if the value is empty or 1 and route the flowfile accordingly. 2.What if there is empty json data? Still processor adds length attribute value as Empty String Set and in RouteOnAttribute processor you can check the value is Empty or not. Another way of checking will be by using RouteText processor and check is there any data in the flowfile content by adding Regular expression dynamic property in RouteText processor.
... View more
07-22-2019
04:17 AM
1 Kudo
@Thuy Le Use EvaluateJsonPath processor and add new property to determine length of the array. Then use RouteOnAttribute processor to check the array_length attribute value and route to SplitJson processor or some other processor. Flow: --other processors
EvaluateJsonProcessor //add array_length property to the processor
RouteOnAttribute //add dynamic properties to check the array_length attribute value.
|split
----------------------------------------------
|(array_length more than 2) |(array_length less than 2)
SplitJson Some other processors..
... View more
07-19-2019
01:39 PM
@Micro Change PutHDFS to run on All Nodes, Right now the processor is running only on Primary Node. If you have all the queued flowfiles(1,328) in other nodes except Primary node then PutHDFS processor doesn't process those files Until the primary node changes. In case you have more number of nodes in NiFi cluster running on all nodes will give you best resutl (but need to distribute the load across the cluster by using Load balancing (or) RemoteProcessorGroup). Increasing concurrent tasks is one of the optimizations we can perform on PutHDFS processor if you are not distributing the load across the cluster only if one node is doing all the work.
... View more
07-18-2019
04:16 PM
@shahid rehman If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
07-18-2019
04:15 PM
@Duraisankar S If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
07-18-2019
03:25 AM
@Duraisankar S You can run Major compaction on the partition in Hive, after the major compaction is done base-**** directory will be created. Then spark able to read the specific partition which have base-*** directories in it. But spark not able to read delta directories as there is an open Jira [SPARK-15348] about spark is not able to read acid table. I think starting from HDP-3.X HiveWareHouseConnector is able to support to read HiveAcid tables.
... View more
07-17-2019
03:51 AM
@Amit Khandelwal In DistributeMapCacheServer we can define Persistence Directory value If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only. If we define persistence directory then NiFi will not wipes out the data if server is restarted.
... View more
07-12-2019
02:29 PM
@Abderrahim BOUDI Good articles regards to tune Hive performance: Hive_performance_tune Tez_Performance_Tune . ExplainPlan This is too broad question to answer, here are my thoughts: 1.Check is your HiveJob is getting started running in Resource manager(not in queue waiting for resources i.e Accepted state..etc) 2.Check in HDFS how many files are there in the table pointed directory, too many small files will result poor performance. and you need to consolidate all small files into big one's then run the query again. 3.Try running hive console in debug mode to see where the job is taking time to execute. 4.Check is there any skew's in the data and create table stating all these skewed columns in the table properties.
... View more
07-12-2019
02:16 PM
1 Kudo
@shahid rehman Possible workaround to resolve this issue could be: If you are having filename as attribute to the flowfile(after EvaluateJsonPath processor) then by using UpdateAttribute processor after FetchS3Object processor we can change the filename back to the original filename. Add new property name filename to UpdateAttribute processor filename with value as ${<filename_attribute_name>}
... View more
07-12-2019
02:07 PM
@Sampath Kumar As you have enabled Ranger authorization then DFS commands are restricted in Hive when authorization is enabled.
... View more
07-12-2019
01:06 PM
@shahid rehman Could add your NiFi flow screenshot? more specifically before FetchS3Object processor?
... View more
07-10-2019
02:53 AM
@Jayashree S Please follow this link to configure QueryDatabase table processor. To incrementally pull data from Mysql table give MaxColumnName property value as one of your columns from the table. Then processor keep track of the max column and only fetches the changes from the table. - If you are still facing issue provide us more details about the issue and configuration of the processor.
... View more
07-09-2019
03:35 AM
@Shawn Park You can use AttributesToJson processor. Then pass the flowfile to downstream processor and read the flowfile using ExecuteStreamCommand processor. (or) Use UpdateAttribute processor and use NiFi expression language to add attributes to the flowfile. then pass the flowfile to downstream processor and use attributes of the flowfile.
... View more
07-09-2019
03:10 AM
@Tom De Backer Did you tried using EnforceOrder processor in NiFi. Then for first queue use increment 1 then for second queue use increment 2 so that flowfiles are going to funnel will always wait until the first queue flowfile came in.
... View more
07-04-2019
03:19 AM
@David Sargrad You can have sysdate,current_date..etc variable in your spark job then use that variable to read the directory from HDFS dynamically.
... View more
07-04-2019
03:09 AM
@Matt Field From NiFi ExtractText docs: The first capture group, if any found, will be placed into that attribute name.But all capture groups, including the matching string sequence itself will also be provided at that attribute name with an index value provided. This is an expected behaviour from NiFi as you are having capture group in your regular expression, so extract text processor adds index value to attribute name. For consistency use ${myattribute} without index value as the reference for the attribute value. - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly 🙂
... View more
07-03-2019
06:18 AM
1 Kudo
@David Sargrad Option1:NiFi You can also think of using MergeContent processor to create bigger files(by using min,max group size properties 1GB..etc) and then store these files into HDFS directory. - Option2:Hive If you have structured json files then create hive table on top of this files and run insert overwrite <same_hive_table> select * from <same_hive_table>; By using this method hive will create exclusive lock on the directory until the overwrite will be completed. - Option3:Hadoop Streaming jar: Store all files into one daily directory and then run merge as a daily job at midnight..etc by using hadoop-streaming.jar as described in this link. - Option4:Hive Using ORC files: If you are thinking to store the files as orc files then convert json data to orc format then you can use concatenate feature of ORC to create big file by merging small orc files. - Option5:Hive Transactional tables: By using hive transactional tables we can insert data using PutHiveStreaming(convert json data to avro and feed it to puthivestreaming ) processor and based on buckets we have created in hive transactional table, Hive will store all your data into these buckets(those many files in HDFS). -> If you are reading this data from Spark then make sure your spark is able to read Hive Transactional tables. - If you found any other efficient way to do this task, please mention the method so that we will learn based on your experience.. 🙂
... View more
07-03-2019
05:23 AM
@Amod Acharya Create a shell script, that will truncate your hive orc table and then sqoop import to hive orc table. Sample shell script:- 1.For Managed(internal) Tables: bash$ cat sq.sh
#truncate the hive table first
hive -e "truncate table default.my_table_orc"
#sqoop import into Hive table
sqoop import --connect jdbc:mysql://localhost:3306/<db_name> --username root --password "<password>" --table <table_name> --hcatalog-database <hive_database> --hcatalog-table <hive_table_name> --hcatalog-storage-stanza "stored as orcfile" 2.For External Tables: bash$ cat sq.sh
#drop the hive table first
hive -e "drop table default.my_table_orc"
#sqoop import into Hive table
sqoop import --connect jdbc:mysql://localhost:3306/<db_name> --username root --password "<password>" --table order_items --hcatalog-database <hive_database> --hcatalog-table <hive_table_name> --create-hcatalog-table --hcatalog-storage-stanza "stored as orcfile"
... View more
07-01-2019
02:19 PM
@DADA206 If you are thinking to count all duplicated rows you can use one of these methods. 1.Using dropDuplicates function: scala> val df1=Seq((1,"q"),(2,"c"),(3,"d"),(1,"q"),(2,"c"),(3,"e")).toDF("id","n")
scala> println("duplicated counts:" + (df1.count - df1.dropDuplicates.count))
duplicated counts:2 There are 2 duplicated rows in the dataframe it means in total there are 4 rows duplicated. 2.Using groupBy on all columns: scala> import org.apache.spark.sql.functions._
scala> val cols=df1.columns
scala> df1.groupBy(cols.head,cols.tail:_*).agg(count("*").alias("cnt")).filter('cnt > 1).select(sum("cnt")).show()
+--------+
|sum(cnt)|
+--------+
| 4|
+--------+ 3.Using window functions: scala> import org.apache.spark.sql.expressions.Window
scala> val wdw=Window.partitionBy(cols.head,cols.tail:_*)
wdw: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@1f6df7ac
scala> df1.withColumn("cnt",count("*").over(wdw)).filter('cnt > 1).count()
res80: Long = 4
... View more
07-01-2019
03:50 AM
@hem lohani --> Permission Denied - Please check the permission on the HDFS directory! --> Could you share the error logs, of the error that you are getting?
... View more
07-01-2019
03:38 AM
1 Kudo
@DADA206 --> You can try using case statement and then assign 1 value for all the rows that matches col2,col4 otherwise assign 0. --> Then aggregate on the new column and sum the new column. Example: scala> val df=Seq((1,3,999,4,999),(2,2,888,5,888),(3,1,777,6,777)).toDF("id","col1","col2","col3","col4")
scala> df.withColumn("cw",when('col2 === 'col4,1).otherwise(0)).agg(sum('cw) as "su").show()
+---+
| su|
+---+
| 3|
+---+ - As you are having 3 rows same values and the count is 3 in this case. - If the answer is helpful to resolve the issue, Login and Click on Accept button below to close this thread.This will help other community users to find answers quickly :-).
... View more