Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11258 | 04-15-2020 05:01 PM | |
| 7161 | 10-15-2019 08:12 PM | |
| 3140 | 10-12-2019 08:29 PM | |
| 11592 | 09-21-2019 10:04 AM | |
| 4372 | 09-19-2019 07:11 AM |
06-04-2018
10:12 PM
@Winnie Philip if you are persisting some small datasets like tens of MB's should be good.If you are trying to cache some big dataset(couple of hundred MB's) then you need to increase the Max cache entry size in PutDistributedCacheMap processor. In DistributedMapCacheServer service configure Persistence Directory property value, If the value specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only. By specifying directory we are not going use memory to cache the dataset. Also you need to increase Maximum Cache Entries property in DistributedMapCacheServer according to number of cache entries you are trying to keep in cache.
... View more
06-04-2018
01:04 PM
@JAy PaTel One way of doing is using --exclude-tables argument and mentioning all the tables with comma separated values. (or) Writing shell script to to have the required two tables and iterating sqoop job on these tables. Please refer to this and this links for more details.
... View more
06-04-2018
10:12 AM
1 Kudo
@Vivek Singh PutHiveQL processor is used to execute HiveDDL/DML commands and the processor expects incoming flowfile content would be HiveQL command. You can keep your create table statement by using GenerateFlowfile processor (or) replacetext processor ..etc and feed the success relation to PutHiveQL processor then the processor executes the content of flowfile and creates the table. Flow: GenerateFlowfile configs: PuthiveQL configs: Configure/enable HiveConnection pool and if you are having more than one HiveDDL/DML command in the flowfile content then use ; as delimiter then the processor will execute those commands with the specified delimiter. In NiFi convertAvroTo ORC processor adds hive.ddl attribute based on the flowfile content we can make use of that attribute and then use ReplaceText processor to create new flowfile content and execute the hive ddl statement using PutHiveQL processor. Please refer to this link for more details regarding generating/executing hive.ddl statements using NiFi.
... View more
06-04-2018
02:29 AM
@Raja M List file processor with default configurations already does what you are expecting(lists/sends oldest file first) unless if you configured sucess queue with some kind of prioritizers. ListFile is a stateful processor and creates a flowfile/s that represents the file/s based on the last modified time and default queue prioritizer is OldestFlowfileFirst prioritizer. if you are using default queue configs then the processor will send first the oldest flowfile in the data flow will be processed first. Available NiFi queue prioritizers are as follows:
FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first. NewestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is newest in the dataflow will be processed first. OldestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected.' PriorityAttributePrioritizer: Given two FlowFiles that both have a "priority" attribute, the one that has the highest priority value will be processed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. Values for the "priority" attribute may be alphanumeric, where "a" is a higher priority than "z", and "1" is a higher priority than "9", for example. To get to know about which files are transferred from the list file processor use Control Rate processor with Rate Control Criteria
flowfile count
Maximum Rate
1 Now you can view in the success queue of Control Rate processor which flowfiles are released from the list file processor. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
06-02-2018
09:51 PM
@Shantanu kumar I think the issue is with the scheduling of Monitor Activity processor as the processor scheduled to run at 10:00 AM everyday and the Threshhold duration is 1 min. Change the schedule to run at 10:00AM and 10:01 AM also because processor will look for any flowfile will be feeded at 10:00AM,if there is no flowfile feeded to the processor then at 10:01 AM processor will trigger any inactivity flowfile. if you run processor at 10:00AM only then there will be no inactivity flowfile triggered. Change the scheduling cron expression to below expression 0 0-1 10 * * ?
... View more
06-02-2018
12:09 PM
@Mustafa Ali Qizilbash
I think in NiFi 1.2 the pagination is based on limit and offset but hive won't work with limit and offset. sample hql statements would be some thing like below SELECT * FROM test_table WHERE ts > '2018-06-02 14:24:17.242' AND ts <= '2018-06-03 14:24:17.242' ORDER BY ts LIMIT 10000 i don't have nifi 1.2 setup to recreate your scenario but the statements generated by GenerateTableFetch processor are having max value column last state and new state in them like above statement then use Replace Text processor to replace the sql statements and make them to work with hive syntax. if the statements wont have max value column statements in them like select * from test_table order by ts limit 10000 offset 20000 then, I have used NiFi 1.5 to generate table statements and works fine with hive either you have to update the NiFi version (or) implement the method 2 to run incrementally (or) get the last state and new state from the processor using Rest-API then use ReplaceText processor to prepare your statements to run. Second issue is showing your data doesn't exist partition column make sure your feeding data set to puthivestreaming having the partition column in it.
... View more
06-02-2018
04:00 AM
@Winnie Philip After QueryDatabaseTable processor use Convert Record processor to read the incoming avro data with Schema Access Strategy Use Embedded Avro Schema and Record writer as CsvSetWriter with the field that you want to keep in DistributedCacheMap processor and **Record Separator** property value as , Now you are going to have flowfile content in CSV format with the field/s that you want to cache. Feed the success relationship to PutDistributedMapCache processor. Flow: 1.QueryDatabaseTable 2.ConvertRecord //read incoming avro data and write in Csv format(output needs to be in one line) 3.PutDistributedMapCache
... View more
06-02-2018
03:30 AM
@Winnie Philip
Yep,the issue is with the avro format data that is going to PutDistributeMapCache processor. As you have connected QueryDatabaseTable processor success to PutDistributeMapCache but QDT processor outputs flowfile contents in avro format. In FetchDistributeCacheMap processor you are fetching the data into an attribute(avro format data) and using it in QueryRecord processor. You have to change the format to CSV instead of avro that is going to PutDistributeCache and the flowfile content needs to be 7000,7001,7003. Then PutDistributeCache processor will keep the attribute value as 7000,7001,7003 in csv format when you use QueryRecord processor you are not going to show any issues..!!
... View more
06-02-2018
02:12 AM
@Winnie Philip Could you please share the sample csv data and the data that you have loaded using PutDistributedMapCache?
... View more
06-01-2018
10:44 PM
@Mustafa Ali Qizilbash In the above flow GenerateFlowfile processor will be act as trigger to the flow and runs based on the scheduling strategy and generates your user defined custom text as the content of flowfile and in our case that is valid hql statement which is going to be executed by puthiveql processor and you can use this processor in production env also. PutHiveQL processor is designed to run hive.ddl/dml statements so the processor wont offer any target database (or) table name and expects the incoming flowfile content would be valid hiveql statement then processor executes that statement. If you want to store the data into final table incrementally then look into below methods Method1: You can use GenerateTableFetch(GTF) processor to fetch only the newly added rows from the table based on the Maximum value column property,but here is the issue sql statements that are generated by GTF processor some thing like below SELECT * FROM test_table WHERE ts > '2018-06-02 14:24:17.242' AND ts <= '2018-06-03 14:24:17.242' ORDER BY ts LIMIT 10000 As my max value column is ts and the sql statement generated by GTF processor having order by clause if your table is big then preparing the query and executing the query to pull incremental rows will be running long. Once you got the sql statements generated then use SelectHiveQL processor to execute them and the result set should be in Avro format. Use PutHiveStreaming processor and configure the processor with your TargetDB, Table Name ,Partition column info. Flow: By using this method you are going to incrementally pull the data from external table then store into Final managed tables. (or) Method2: Use hiveQL language queries i.e insert into managed table select * from external table where ts >'${last_state_value}' and ts <'${new_state_value}'
then store new state value in HDFS/Hive/Hbase/DistributedCache then in the next run fetch the stored state value keep that as last state value and define your new state value run the insert statement to do dynamic partition into Managed table. in this step we are not going to do order by clause and if the ts column is partitioned then you are going to skip the table scan also. Refer to this link regarding storing the state, fetching the state again and storing data to hive manged table.
... View more