Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11255 | 04-15-2020 05:01 PM | |
| 7160 | 10-15-2019 08:12 PM | |
| 3139 | 10-12-2019 08:29 PM | |
| 11588 | 09-21-2019 10:04 AM | |
| 4372 | 09-19-2019 07:11 AM |
06-14-2018
12:42 PM
@Daniel
Bates
Yep,I'm able to reproduce the same scenario in NiFi-1.5 but not in NiFi-1.6. In NiFi-1.6 attributes are not dropping after ConvertExcelToCSV processor. As you can upgrade the NiFi version to 1.6 (or) contact support team to get a patch (or) As a workaround to fix this issue in NiFi-1.5 please refer to this link and implement the same logic to store all the attributes in DistributedCache and Fetch them from DistributedMapCache after ConvertExcelToCsv processor. - Click on Accept button below to accept the answer, if the answer helped to resolve your issue..!!
... View more
06-14-2018
03:11 AM
@Satya Nittala As the data is not in the consistent format i have 2 step method to populate field values correctly. Step1: Create an temporary table to store this intermediate step data Split the data on = character and populate the data hive> select split('hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt=20170303',"=")[0] Location,split('hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt=20170303',"=")[1] partitionfields;
+-----------------------------------------------------------------+------------------+--+
| location | partitionfields |
+-----------------------------------------------------------------+------------------+--+
| hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt | 20170303 |
+-----------------------------------------------------------------+------------------+--+
We are still missing load_dt in partitionfields column data Step2: select from temporary table data and insert into final table hive> select regexp_extract("hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt","(.*)\\/",1) location,concat_ws("=",reverse(split(reverse('hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt'), '/')[0]),"partitionfields") partitionfields;
+---------------------------------------------------------+--------------------------+--+
| location | partitionfields |
+---------------------------------------------------------+--------------------------+--+
| hdfs://Test/lob/ebia/publish/gss_attribute_details_pub | load_dt=partitionfields |
+---------------------------------------------------------+--------------------------+--+
In step2 first i'm extracting only data before last / i.e after / the data needs to go to partitionfields column, and by using concat_ws i'm joining partitionfields column data and using reverse function with split on / then extracting [0] position value, concatenating with partitionfields value. (or) By using two regular expression extracts to prepare the actual final data for the columns hive> select regexp_extract("hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt","(.*)\\/",1) location,concat_ws("=",regexp_extract('hdfs://Test/lob/ebia/publish/gss_attribute_details_pub/load_dt', '(.*)\/(.*)',2),"partitionfields") partitionfields;
+---------------------------------------------------------+--------------------------+--+
| location | partitionfields |
+---------------------------------------------------------+--------------------------+--+
| hdfs://Test/lob/ebia/publish/gss_attribute_details_pub | load_dt=partitionfields |
+---------------------------------------------------------+--------------------------+--+ I hope this will work for your case correctly.
... View more
06-13-2018
10:16 PM
@rajat puchnanda Merging that you are expecting is a lookup in the departments table with department id, For your case you can use LookUpRecord processor to look for deptid and add get the salary,email and add to the record. LookupRecord processor supports all these controller services to Load your inputfile2 in one of the lookup services then use LookupRecord to look for deptid value then add the value to the record. Refer to this and this links to get more details regarding configuration and working with LookupRecord processor.
... View more
06-13-2018
11:06 AM
2 Kudos
@Haifa Ben Aouicha
Yes, you need to run msck repair table daily once you have loaded a new partition in HDFS location. Why we need to run msck Repair table statement everytime after each ingestion? Hive stores a list of partitions for each table in its metastore. If, however, new partitions are directly added to HDFS , the metastore (and hence Hive) will not be aware of these partitions unless the user runs either of below ways to add the newly add partitions. 1.Adding each partition to the table hive> alter table <db_name>.<table_name> add partition(`date`='<date_value>') location '<hdfs_location_of the specific partition>'; (or) 2.Run metastore check with repair table option hive> Msck repair table <db_name>.<table_name> which will add metadata about partitions to the Hive metastore for partitions for which such metadata doesn't already exist. In other words, it will add any partitions that exist on HDFS but not in metastore to the metastore. In addition if you are loading dynamic/static partitions to the final table from other temp table with hive statement(like insert into final table partition(..) select * from temp table), then you don't need to do any of the above methods because as you are using hive statement to load a partition then hive will update the metadata of the final table. All the above mentioned ways we have to do if you are directly adding a new directory in hdfs or other ways instead of hive. Please refer to this link for more details regarding refresh hive metadata.
... View more
06-12-2018
11:35 AM
@Shailesh Bhaskar If the answer addressed your question,Take a moment to Log in and Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues and close this thread.
... View more
06-12-2018
11:34 AM
@neeraj sharma If the answer addressed your question,Take a moment to Log in and Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues and close this thread.
... View more
06-12-2018
11:25 AM
@RAUI
Does the answer helpful to resolve your issue..!! Take a moment to Log in and Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues and close this thread.
... View more
06-10-2018
06:12 PM
1 Kudo
@yazeed salem If your flowfile content is already in json format and each message/record are in one line then use Split Text processor with split line count of <desired number> If your flowfile content is already in json and each message are not in one line then use Split Record processor and configure record reader/writer controller services(define matching avro schema to the incoming flowfile content), change the records per split property as your desired number. using Split record processor will be efficient as the processor works with chunks of data. Refer to this and this links to configure Record Reader/Writer Controller services. Flow: 1.Custom processor2.SplitRecord/SplitText processors3.DistributeLoad
4.ConvertJsonToSQL DistributeLoad Configs: Number of Relationships 1 Determines the number of Relationships to which the load should be distributed Distribution Strategy round robin round robin next available load distribution service Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.
As i have configured Number of Relationships to 3 then connected
1 relationship from Distribute Load processor to first ConvertJsonToSql processor 2 to second ConvertJsonToSql processor 3 relationship to third ConvertJsonToSql processor Based on the number of splits that you want change the configs in Distribute Load processor and add more ConvertJsonToSQL processor. In addition please consider using record oriented PutDatabaseRecord processor which works on chunks of data, Configure the Record Reader controller service to read the incoming flowfile, then i think you don't have to split any records also. Flow: 1.Custom Processor 2.PutDatabaseRecord - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
06-10-2018
01:04 PM
1 Kudo
@gal itzhak List s3 -> Fetch s3 objects -> merge content(output as avro) -> convert Avro to orc -> put s3 object the above approach is correct merge content processor won't support merge format as orc,we still needs to merge all the avro files into one then feed it into AvroToOrc processor. Supported Merge Formats in NiFi: You can either use Merge Record processor also which reads incoming flowfile and writes the merged flowfile based on the configured Record Writer and merges the records based on the configurations. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.MergeRecord/index.html How to merge small orc files then? Merging small orc files we still need to do through hive/spark Compacting small files using Concatenate: As your storing the the orc file to S3 then you can merge orc files and if you are having hive table on top of s3 orc files. Use Alter table concatenate to merge of small ORC files together by issuing a CONCATENATE command on their table or partition. The files will be merged at the stripe level without reserialization ALTER TABLE istari [PARTITION partition_spec] CONCATENATE; refer to this link for more details regarding concatenate. (or) 2.Compacting small files without using Concatenate: step1: Let's assume your final orc table having thousands of small orc files then Create a temporary table by selecting the final table as hive> create table <db.name>.<temp_table_name> stored as orc as select * from <db_name>.<final_table>; step2: Now we have created temp_table by selecting all the data from final table, then again overwrite the final table by selecting the temp table data you can use order by/sort by/distribute by clauses to create new files in the final table with the even distribution. hive> insert overwrite table <db_name>.<final_table> select * from <db.name>.<temp_table_name> order by/sort by <some column>; in addition you can set all the hive session properties before overwriting the final table. By following this approach until the overwrite job gets completed, make sure any other applications are not writing data into final table because we are doing overwrite from temp table if other applications are writing the data to final table we are going to loose the data. Refer to this ,this and this links will describe more details regarding how to compact small files. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
06-09-2018
10:44 PM
@Alex Witte
To add more details why cluster by/Sort by are not working is there is no order by clause specified in your window function, if we won't mention then hive initializes more than one reducer and distributes/sorts the data based on baic_id,point values. Example: i have a table and the order of id is not consistent hive> select * from test_table;
+-----+-------+-------+--+
| id | dt | name |
+-----+-------+-------+--+
| 2 | fajf | sda |
| 4 | fajf | sda |
| 1 | fajf | sda |
| 3 | asd | $h |
| 3 | asd | $h |
+-----+-------+-------+--+ 2.then i ran row_number without specifying any order by clause and clustered by id hive> select row_number() over () as rowid,* from test_table cluster by id;
Output: +--------+-----+-------+-------+--+
| rowid | id | dt | name |
+--------+-----+-------+-------+--+
| 3 | 1 | fajf | sda |
| 1 | 2 | fajf | sda |
| 4 | 3 | asd | $h |
| 5 | 3 | asd | $h |
| 2 | 4 | fajf | sda |
+--------+-----+-------+-------+--+ The id field is ordered but rowid is not in the correct order. rowid 1 is assigned to id=2 because it's first value in the table and rowid 2 assigned to 4 .. 3.row_number with order by clause: hive> select row_number() over (order by id) as rowid,* from test_table cluster by id;
+--------+-----+-------+-------+--+
| rowid | id | dt | name |
+--------+-----+-------+-------+--+
| 1 | 1 | fajf | sda |
| 2 | 2 | fajf | sda |
| 3 | 3 | asd | $h |
| 4 | 3 | asd | $h |
| 5 | 4 | fajf | sda |
+--------+-----+-------+-------+--+
(or) without cluster by hive> select row_number() over (order by id) as rowid,* from test_table;
+--------+-----+-------+-------+--+
| rowid | id | dt | name |
+--------+-----+-------+-------+--+
| 1 | 1 | fajf | sda |
| 2 | 2 | fajf | sda |
| 3 | 3 | asd | $h |
| 4 | 3 | asd | $h |
| 5 | 4 | fajf | sda |
+--------+-----+-------+-------+--+
Both results the same output and now the ordering is based on order by clause. If you need to assign row column based on some column value then use the column in window function order by clause then the results will be always consistent.
... View more