Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11166 | 04-15-2020 05:01 PM | |
| 7070 | 10-15-2019 08:12 PM | |
| 3081 | 10-12-2019 08:29 PM | |
| 11340 | 09-21-2019 10:04 AM | |
| 4244 | 09-19-2019 07:11 AM |
12-11-2017
04:52 PM
@swati tiwari Just want to make sure you are extracting the contents of CSV by using Extract Text processor and keeping device_id,device_value attributes associated with the flowfile and using Detect Duplicate processor but you are having age off duration to 1 sec. Question1.I want to delete duplicate in 1 second window frame if both attributes values (ie, device_no and device_value) already exists in past 1 sec.
i don't know about 1 second window frame, if that is the case then your configurations are correct. what is Age off Duration means Time interval to age off cached FlowFiles. We are caching the flow files and detecting the duplicates so when you set the property to 1 sec then Let's consider you are having 2 flowfiles with same attributes i.e device_no=1 and device_value=10,if these 2 flowfiles processed through detect duplicate processor less than a sec, then only this processor can detect the second processed flowfile is a duplicate flowfile. if first flowfile processed 2017-12-11 04:00:29 and second flowfile processed at 2017-12-11 04:00:31 then processor won't detect the second flowfile is duplicate although second flowfile having same attribute values device_no=1 and device_value=10 as first flowfile, Because the time duration between first and second flowfile processing is 2 sec(2017-12-11 04:00:29 - 2017-12-11 04:00:31) and we configured age off to 1 sec it will caches the flowfile 1 sec, if it is more than a sec the cached flowfiles are no longer compared with the new flowfiles. 2.what if I will delete age off Duration (1 sec)?
When we take off the age off duration then the all the cached flowfile attributes will be compared with the new flowfiles that are processed from Detect Duplicate processor until distribute map cache server reaches Maximum Cache Entries then according to Eviction Strategy we are evict the cached flowfiles to make room for new entries and there is no window frame.
... View more
12-11-2017
03:00 PM
5 Kudos
@Mahendra Hegde, According to jira https://issues.apache.org/jira/browse/NIFI-3010 if you are using NiFi 1.2+ then we can use Attributes from the flowfile and add them to json. I'm having Company attributes associated with the flowfile and value is also Company. Input:- {
"exampledata": {
"name": "Test",
"age": null,
"ver": null,
"payloadType": "Text",
"payloadData": "adsadsdsdsdsdsdsdsdsds",
"sequenceNum": null,
"timeStamp": "2017-09-22T12:07:29.968Z"
}
} Jolt-spec:- [{
"operation": "shift",
"spec": {
"*": "&"
}
}, {
"operation": "default",
"spec": {
"Company": "${Company}"
}
}]
Output:- {
"exampledata": {
"name": "Test",
"age": null,
"ver": null,
"payloadType": "Text",
"payloadData": "adsadsdsdsdsdsdsdsdsds",
"sequenceNum": null,
"timeStamp": "2017-09-22T12:07:29.968Z"
},
"Company": "Company"
} but this spec won't give the expected output as per your question, as we cannot add the attribute into main exampledata list. Example1:- by flattening out exampledata then we can add company attribute to the json message as follow. Jolt-spec:- [{
"operation": "shift",
"spec": {
"exampledata": {
"*": "&"
}
}
}, {
"operation": "default",
"spec": {
"Company": "${Company}"
}
}] Output:- {
"name": "Test",
"age": null,
"ver": null,
"payloadType": "Text",
"payloadData": "adsadsdsdsdsdsdsdsdsds",
"sequenceNum": null,
"timeStamp": "2017-09-22T12:07:29.968Z",
"Company": "Company"
} Example2:- If you are using prior version than NiFi 1.2, Then you need to use Replace text Processor with Below Properties. Search Value }\s+} Replacement Value ,"company" : "${CompanyName}" } }
Replacement Strategy Regex Replace Evaluation Mode Entire text
By using this method also we can add the attribute dynamically based on the flowfile attributes to the json message.
... View more
12-11-2017
03:27 AM
@swati tiwari In your Detect Duplicate Processor Change the property Age Off Duration to No value //right now you have set the value to 1 sec. Then the processor should work as Expected. Age Off Duration means Time interval to age off cached FlowFiles. We are caching the flow files and detecting the duplicates so when you set the property to 1 sec. Let's consider you are having 2 flowfiles with same attributes,if these 2 flowfiles processed through detect duplicate processor less than a sec, then only this processor can detect the duplicate flowfile. if one of the flowfile processed 29 sec and another flowfile processed at 31 sec then processor won't detect the 2 flowfile is duplicate,because we configured age off to 1 sec. Configs:- Once you change the property then the duplicates flowfiles will be directed to Duplicate relationship instead of non-duplicate relationship.
... View more
12-09-2017
04:39 PM
@Rohan Naidu You have connected Failure and Original relationships of split json looped back to same processor, What does Original relationship means The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship i.e your original flowfile will be transferred to this relationship. Example of Original Flowfile:- This message will be original flowfile [{"id":1,"fname":"Michael","lname":"Jackson"},{"id":2,"fname":"Jim","lname":"Morrisson"},{"id":3,"fname":"John","lname":"Lennon"},{"id":4,"fname":"Freddie","lname":"Mercury"},{"id":5,"fname":"Elton","lname":"John"}] in your case original relation loop back to split json and generating duplicates. To resolve this issue auto terminate original relation by
Right Click on the Split json processor Goto Settings tab Click on Check box before Original relation Then click on Apply button Right Below of the screen. Auto terminate original Relationship:- Splitjson Configs:- As you can see above screenshot only failure relationship is loop back to the processor and we have auto terminated original relationship.
... View more
12-08-2017
03:17 PM
1 Kudo
@Mark
Directory needs to be in local not in hadoop directory to work with zip command. Make sure zip is installed in your node. Command to check zip is installed #zip after executing zip if it shows output as above that means zip is installed on the node. if not installed then do #yum install zip If you want to do zip the hdfs files then follow below steps:- Use Get HDFS processor to pick your files from HDFS,Use Configs for gethdfs same as my first answer then use MergeContent processor with As every flowfile from GetHDFS processor will have path attribute associated with it, we are using path attribute as our Correlation Attribute Name in merge content processor. Processor waits for 1 min and merges all the flow files that having same path attribute. Change Keep Path property as per your requirements. Keep Path false true false If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names; if using other merge strategy, this value is ignored But you can change the configs as per your requirements by following below reference to configure merge content processor. https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html?childToView=148309#answer-148309 Then in Put HDFS processor Use configs as my first answer and change property to Compression codec
NONE Because we are doing zipping in merge content processor it self no need to do compression again in PutHDFS processor.
... View more
12-08-2017
02:05 PM
@Umer Umer
As per your logs Caused by: java.sql.SQLException: [Teradata Database] [TeraJDBC 16.00.00.23] [Error 3807] [SQLState 42S02] Object 'DIM_PROD' does not exist. dim_prod does not exist make sure you are connecting to right database. In your JDBC connection string in connection pool, if you have mentioned database name then check the table dim_prod exists in that database. Sample Teradata JDBC connection string:- Make sure you are having the below jars in your nifi lib path.
tdgssconfig.jar teradata-connector-1.5.1-hadoop2.jar terajdbc4.jar hortonworks-teradata-connector-1.5.1.2.5.0.0-1245.jar Teradata Connection Pool Configs:- If above jars not in nifi lib path then you need to give path to all those jars with comma separated list. Database Driver Location(s) Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar' Supports Expression Language: true For both cases Database Driver Class Name property should be com.teradata.jdbc.TeraDriver
... View more
12-08-2017
02:58 AM
@Mahendiran Palani Samy Method 1:- Move the .avro hdfs file to local by using below command [cloudera@quickstart~]hdfs dfs -get /user/Mahe/custom_retail_db/orders_AVRO/part-m-00000.avro hdfs dfs -get command will copy file from hadoop to local file system. then [cloudera@quickstart~]ls-ltr //list all the files by sorting time modified
[cloudera@quickstart~]avro-tools getschema part-m-00000.avro
Avro-tools utility will expects file to be in local file system not in hadoop. That's the reason why you are getting part-m-00000.avro does not exist. (or) Method 2:- If you want to get schema from HDFS file instead of copy the file to Local file system then You need to download avro tools dependencies from below link https://jar-download.com/explore-java-source-code.php?a=avro-tools&g=org.apache.avro&v=1.8.1&downloadable=1 Click on Download button, once you download is completed then move avro-tools-1.8.1.jar to local file system.. then run [cloudera@quickstart~]hadoop jar <path-to>/avro-tools-1.8.1.jar getschema /user/Mahe/custom_retail_db/orders_AVRO/part-m-00000.avro |hdfs dfs -put -f - /user/Mahe/avro_schema.avsc In this command we are using hadoop file path and extracting the schema and storing the schema to avro_schema.avsc file in /user/Mahe directory. [cloudera@quickstart~]hdfs dfs -cat /user/Mahe/avro_schema.avsc //to cat the contents of avro_schema.avsc file In this way you can get schema from the .avro file, you can choose the best way for your case.
... View more
12-07-2017
09:56 PM
1 Kudo
@Mark, I think you are using Windows and windows won't have zip utility by default, Zip utility will be presented in linux env as i tried in linux. To resolve this you need to download https://www.microsoft.com/en-us/download/details.aspx?id=17657 and run the .exe file. In Execute Process Processor use Command
C:\Program Files (x86)\Windows Resource Kits\Tools\compress.exe //path where compress.exe got installed
Command Arguments C:\<input directory> C:\<output-directory.zip> Configs:- So we are creating zip directory in Execute Process processor. Your case Input directory like C:\day\${now():format('yyyyMMdd')} Output Directory C:\day\${now():format('yyyyMMdd')}.zip Then use Execute Stream Command Processor to delete the input Directory(Source directory). We need to create .bat file that would delete the input directory in this processor. cmd>remove_dir.bat @RD /s/q %1 So the above script would get argument and delete the directory we are passing that argument as our input directory. What is /s and /Q? RD [/S] [/Q] [drive:]path
/S Removes all directories and files in the specified directory
in addition to the directory itself. Used to remove a directory
tree.
/Q Quiet mode, do not ask if ok to remove a directory tree with /S Configs:- Command Arguments
"C:\day\${now():format('yyyyMMdd')}"
Command Path
C:<delete-directory.bat file path> For testing i tried with below configs:- In this processor we are deleting the input directory.
... View more
12-06-2017
09:53 PM
1 Kudo
<br> @Mark Method1:- Use Execute Process processor with below configs:- Properties:- Command zip Command Arguments -rm /day/${now():format('yyyyMMdd')}.zip /day/${now():format('yyyyMMdd')}
i have configured above argument with Expression language but you can change above arguments as per your requirements. we are zipping the source folder and Deletes the original files after zipping. If a directory becomes empty after removal of the files, the directory is also removed. No deletions are done until zip has created the archive without error. This is useful for conserving disk space, but is potentially dangerous removing all input files..! (or) Method2:- we can zip the folder by using execute process processor then use execute stream command processor to delete the source directory. Use Execute Process Processor and Configure the processor as below. Command zip Command Arguments -r /day/${now():format('yyyyMMdd')}.zip /day/${now():format('yyyyMMdd')} So in this processor we are using Expression language and Zip command and passing our desired zip folder name and source folder path. Then use Execute Process(success relation) to Execute Stream command processor to delete the source directory. Configs:- For removing directory we need to use a simple shell script bash# cat del.sh
#!/bin/bash
rm -rf $1 the above shell script will expects an argument and we are passing that from command Arguments property as /day/${now():format('yyyyMMdd')} so in this processor we are removing the directory. Make sure nifi user having access to delete these directories. You can choose the best method that fit for your case.
... View more
12-05-2017
02:45 PM
1 Kudo
@Mark You can do that by using GetHDFS,GetFTP,GetSFTP processors by using Keep Source File
false //by default it is set to false. So once you configure GET processors then all the files in that directory will be deleted. GetHDFS Configs:- Then use PutHDFS,PutFTP,PutSFTP processors and change the property Compression codec
BZIP Directory <same-directory-path-as-gethdfs-directory-info> PutHDFS Configs:- Right now in Put hdfs processor has been configured the same directory as GetHDFS processo,r we have configured puthdfs processor with Compression codec as BZIP. When we are storing the data into HDFS directory we are compressing the files and storing them in HDFS directory. FLOW:- GetHDFS(Success Relation) //get the files from hdfs directory and delete them in the source directory-->
PutHDFS //Compress the files and store them in the same directory source directory. If you are thinking to merge the files then use merge content processor before PutHDFS processor. Use the below reference to configure merge content processor. https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html?childToView=148309#answer-148309
... View more