Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11242 | 04-15-2020 05:01 PM | |
| 7151 | 10-15-2019 08:12 PM | |
| 3129 | 10-12-2019 08:29 PM | |
| 11555 | 09-21-2019 10:04 AM | |
| 4360 | 09-19-2019 07:11 AM |
09-05-2018
01:10 AM
@Satya
H
Use Query Record processor and read the incoming csv file(with some delimiter that doesn't exist in your data) then processor will read the whole line as one field. Now we are making use of substring function we can prepare each field value from the csv file Add new query like select substring(<start_position>,<end_position>) col1 ...,substring(<start_position>,<end_position>) coln from flowfile We can also add case statements to add record type value If 100 -> FileHeader ..etc. Configure the Record Writer controller service as JsonSetWriter then the processor will writes the output flow file in json format. Refer to this and this for more details regards to QueryRecord Processor usage. (or) We can extract only the first line of the csv file using Extract text processor and add as attribute to the flow file, by using the attribute value you can identify record type. To parse fixed width file add regex that captures the characters for the fields and replace with some delimiter. Then by using Convert record processor we can convert to json format. Refer to this for more details regards to ReplaceText configs. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
08-31-2018
03:43 PM
@Thuy
Le
I dont think its possible in one call. For more debugging for this case use chrome/firefox developer tools and check the calls that are making while doing this action from UI. If we select two processor groups and hit start button then there are two put methods are running in the back ground to start the process groups.
... View more
08-31-2018
01:15 PM
@Raj ji Sorry to hear that 😞 Q1? i mean to say you can create temp table with some partition column and final table is also partitioned by the same column. then in your merge statement use this T.<partition_column>=T.<partition_column> statement. MERGE INTO merge_data.transactions AS T USING merge_data.merge_source AS S ON T.ID = S.ID and T.tran_date = S.tran_date and T.<partition_column>=T.<partition_column> WHEN MATCHED...
By using this way we are not updating the partition field value instead we are using partition column in ON clause and updating the partition column value will not be possible once the partition created. Refer to this link for some more details regards to this question. Q2? For Raw data table while selecting the data from the table use INPUT__FILE__NAME which is hive internal column by using this way you are not performing the join on full data(in hdfs directory) instead performing merge on single file from the hdfs directory. Refer to this link for more details regards to INPUT__FILE__NAME usage in hive. Yes we are able to use json table for merge also.
... View more
08-31-2018
12:57 PM
4 Kudos
@Thuy
Le
Yes by using NiFi RestAPI we can start and stop processor groups from command line. RestAPI commands: Start the processor group curl -i -X PUT -H 'Content-Type: application/json' -d '{"id":"<processor_group_id>","state":"RUNNING"}' http://<nifi_url>/nifi-api/flow/process-groups/<processor_group_id>; Stop the processor group: curl -i -X PUT -H 'Content-Type: application/json' -d '{"id":"<processor_group_id>","state":"STOPPED"}' http://<nifi_url>/nifi-api/flow/process-groups/<processor_group_id>; Refer to this and this links for more details regards to Start/Stop of processor groups. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
08-30-2018
11:13 PM
1 Kudo
@Saravanan Subramanian By using ExecuteSql processor you can achieve this use case. flow: Update attribute configs: last_iter_val
${getStateValue("next_iter_val")} //to get the last iteration value
next_iter_val
${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it ExecuteSql configs: By using the attribute values we are going to fetch the records incrementally from the table. i have attached the xml you can upload that xml and change as per your requirements. hcc-215124.xml In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table. Refer to this for more details regards to another ways of storing the state in NiFi - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
08-30-2018
12:33 PM
@Thuy
Le
Use UpdateRecord processor and add the new property as /timestamp value as ${now():format("yyyy-MM-dd HH:mm:ss.SSS")}+0000 Replacement value strategy as Literal value Refer to this link for configuring/usage of Update Record processor. In UpdateRecord processor configure RecordReader as JsonTreeReader and Record Writer as JsonSetWriter and include your new timestamp field with value as string in Avro Schema.Then processor will adds the timestamp field into output flowfile. By using UpdateRecord processor you can either use Record Path value to generate timestamp field value (or) literal value to get the timestamp field value. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
08-30-2018
12:27 PM
1 Kudo
@Mitthu Wagh You can use UpdateAttribute Processor by storing the state and incrementing the last state value then pass the attribute value to get HTTP response. Flow: As GetHTTP processor doesn't accept any incoming connections use InvokeHTTP processor instead of GetHTTP processor. UpdateAttribute Configs: In this processor i'm using getStateValue function to get the value for the variable and adding one to it so output flowfile from UpdateAttribute will have seq attribute with 1 as value in your first run. Then use the attribute in your InvokeHTTP processor to increment the page number. Refer to this link if you want to reset the state once the number reaches to limit. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
08-27-2018
12:42 PM
@Adnan
Chowdhury
Instead of PutMongo processor you can use PutMongoRecord processor and you don't need to split the json objects. By using PutMongoRecord processor your flow looks some thing like below Configure the PutMongoRecord processor RecordReader controller service as CsvReader then processor will reads and put the json objects in MongoDatabase. Then you can run RunMongoAggregation processor to run the aggregation. (or) With your existing flow: Use Merge Content processor after PutMOngo processor and configure the merge Content processor Merge Strategy as Defragment then this processor merges all the splitted json objects into one file. Then use Merged relationship from MergeCOntent processor to trigger RunMongoAggregation. By using this way we are going to wait until all the fragments are merged into one file then only we are triggering RunMongoAggregation Processor. Flow: Refer to this link for MergeContent configurations. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
08-27-2018
11:26 AM
@Surendra Shringi You don't have to copy the .xml files into nifi/conf directory instead you can keep them in some other directories then keep the path to the file in your processor configs. These two files hdfs-site.xml,core-site.xml needs to be copied over to your local nifi.
... View more
08-27-2018
11:08 AM
1 Kudo
Second flow i.e. using PutDatabaseRecord processor will be faster as these record oriented processors designed to work with chunk of records instead of one record at a time. PutDatabase record processors reads the incoming flowfile data based on your Record Reader controller service and then prepares and executes the sql statements as a single batch. Refer to this link for configuring/usage of PutDatabaseRecord processor and also explains how we are doing the same exact flow in Old NiFi versions vs New NiFi versions. 1.Replace Text processor is used to change/add the contents of flowfile and by using this processor we cannot change the attribute values of the flowfile. 2.Update Attribute processor: as this processor name describes, if we want to change/add the value of the attribute then we are going to use Update Attribute processor and by using this processor we cannot change the content of the flowfile. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more