Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11298 | 04-15-2020 05:01 PM | |
| 7195 | 10-15-2019 08:12 PM | |
| 3171 | 10-12-2019 08:29 PM | |
| 11670 | 09-21-2019 10:04 AM | |
| 4405 | 09-19-2019 07:11 AM |
03-22-2018
09:06 AM
@umang s Could you please mention how you are expecting to see the above record in hbase. i.e same row key for both json data?
... View more
03-21-2018
06:38 PM
@Saikrishna Tarapareddy
Flatten json processor doesn't work if you are having arrays,nested arrays in the json content and the flowfile will route to failure if you are having array,nested arrays in the content. we still need to use splitjson (or) jolt transform processors to split the array. As this processor joins all The keys are combined at each level with a user-defined separator that we have specified in the processor configs. Input Json:- {
"id": 17,
"name": "John",
"child": {
"id": "1"
}} Output json:- {"id":17,"name":"John","child.id":"1"} As you can see the nested json message has been joined with .(period) in the output json content.
... View more
03-21-2018
04:42 PM
@Saikrishna Tarapareddy
I tried a load test of 500k records input:- {
"Customer_Id": 11111,
"dsc": [{
"Brand_Nm": "test",
"qa": [{
"Assignment_Id": 22,
"addr": [{
"AddressLine_1": null,
"City": "Amelia"
}]
}]
}]
} I repeated the above message for 500k records and used convert record processor it got completed in 5 sec. In your case you will have record size is more but still if you are having enough heap space, convert record will work without any issues..!! Could you please share 10 records that having multiple addresses for a customer and the expected output so that i can try on my end. For reference i created new template please try to save this template. 178185-1.xml
... View more
03-20-2018
11:09 PM
@Sami Ahmad You have to change the data types in your create table statement as table used by put hive streaming processor expects all the datatypes would be strings(except for timestamp..etc types). use the below create table statement CREATE TABLE default.purchase_acct_orc (
acct_num STRING,
pur_id STRING,
pur_det_id STRING,
product_pur_product_code STRING,
prod_amt STRING,
accttype_acct_type_code STRING,
accttype_acct_status_code STRING,
emp_emp_code STRING,
plaza_plaza_id STRING,
purstat_pur_status_code STRING)
PARTITIONED BY (pur_trans_date TIMESTAMP)
CLUSTERED BY(acct_num) INTO 5 BUCKETS
STORED AS ORC
TBLPROPERTIES ("transactional"="true"); Then start the put hive streaming processor. As i tried with same create table statement as above and the data that you have provided in the comment and i'm able to load the data into the table. Flow:- Select Hive Ql configs:- HiveQL Select Query
select string(16680934) ACCT_NUM,string(747032990) PUR_ID,string(656564856) PUR_DET_ID,string(31) PRODUCT_PUR_PRODUCT_CODE,string(50) PROD_AMT,string('2015-01-01 08:12:03.0') PUR_TRANS_DATE,string(1) ACCTTYPE_ACCT_TYPE_CODE,string(01) ACCTTYPE_ACCT_STATUS_CODE,string(9999) EMP_EMP_CODE,string(009500 )PLAZA_PLAZA_ID,string(10) PURSTAT_PUR_STATUS_CODE
Output Format
Avro Puthivestreaming Configs:- Output:- select * from default.purchase_acct_orc;
+-----------+------------+-------------+---------------------------+-----------+--------------------------+----------------------------+---------------+-----------------+--------------------------+------------------------+--+
| acct_num | pur_id | pur_det_id | product_pur_product_code | prod_amt | accttype_acct_type_code | accttype_acct_status_code | emp_emp_code | plaza_plaza_id | purstat_pur_status_code | pur_trans_date |
+-----------+------------+-------------+---------------------------+-----------+--------------------------+----------------------------+---------------+-----------------+--------------------------+------------------------+--+
| 16680934 | 747032990 | 656564856 | 31 | 50 | 1 | 1 | 9999 | 9500 | 10 | 2015-01-01 08:12:03.0 |
+-----------+------------+-------------+---------------------------+-----------+--------------------------+----------------------------+---------------+-----------------+--------------------------+------------------------+--+ Let us know if you have any issues..!!
... View more
03-20-2018
10:24 PM
@Saikrishna Tarapareddy I think it's better to go with 3 convert record processors in parallel with same JsonReader(as the json content is same for all) and different JsonSetWriters(we need different elements in each file) to get 3 different files i.e Customer_Id,Brand_Nm,BarCode_No,Offer_Ds,Offer_Expire_Dt(convertrecord1) Customer_Id,Assignment_id,Assign_Dt,Offer_Nm(convertrecord2) Customer_Id,Address_Line_1,Address_Line_2,City,State_Cd,Postal_Cd,Vendor_Desc,Offer,Source(convertrecord3) Flow:- As input json object/message having nested arrays i have used jsonpathreader controller service and added avro controller service and added all the user defined properties with matching json expressions. this reader will be same for all the three Convertrecord processors. in JsonSetwriter controller service i have used schema text property to define our required schema. i tried with one ConvertRecord processor and attached .xml for reference, you can save and add your required elements as per your requirements. Let us know if you have any issues..!! 178185-hcc.xml
... View more
03-20-2018
07:46 PM
1 Kudo
@Saikrishna Tarapareddy It's easy to use ConvertRecord processor because this processor even work with Array of Json objects, so you don't need to use split json processor. I think the issue that you are facing is because of there are no default values specified for the json fields, if the default values are specified for all the fields then if the value is not present for the field/element then conversion to avro doesn't fail it will have the default value that was defined for the field. Example:- Case1:- i'm having the below json array with 3 json objects in it [{"year" : "2018","mon":"01","mday":"02"},
{"year" : "2018","mon":"01","mday":"02","hour":"01"},
{"year" : "2018","mon":"01","mday":"02","hour":"01"}] trying to convert json to avro using convert record processor and my avro schema looks like {
"type": "record",
"name": "nifi_logs",
"fields": [
{ "name": "year", "type": ["null","string"] },
{ "name": "mon", "type": ["null","string"] },
{ "name": "mday", "type": ["null","string"] },
{ "name": "hour", "type": ["null","string"] }
]
} now i'm having all the default values for each field/element as null, as if you see our input json array i'm missing hour filed in first json object, even though we are missing the field with the above avro schema we are going to have null for hour field. Case2:- {
"type": "record",
"name": "nifi_logs",
"fields": [
{ "name": "year", "type": ["null","string"] },
{ "name": "mon", "type": ["null","string"] },
{ "name": "mday", "type": ["null","string"] },
{ "name": "hour", "type": "string" }
] if our avro schema like above then we don't have any default value for the hour field so if the incoming data doesn't having hour field then that record goes to failure because there is no default value defined for the hour field. Define all the default values for the all fields/elements that you are having then run the processor again, Let us know if you are having any issues..!!
... View more
03-19-2018
10:36 PM
@Mark McGowan You need to change the Delimiter Strategy
Text
set
Demarcator property value to New line(shift + enter) Then you can get the merged file contents in new lines. Output:- The merged output flowfile will looks like below with the above properties 2018,1,12,3,0,0 2018,1,12,2,0,0
... View more
03-19-2018
02:20 PM
@Xander L Method1:- You can add new extract text processor before replace text processor and keep the whole content of the flowfile as attribute and use the extracted attribute in Replace Text processor to get escapeJson function. Extract text Processor Configs:- Add new property by clicking on + sign at top right corner in my case i have added extract property with below regex extract
(.*) Now we are going to extract all the content of the flowfile and keep that content as extract attribute to the flowfile. You need to increase the below properties sizes if you are having big content to extract Maximum Buffer Size 1 MB
Maximum Capture Group Length 1024 Output from extract text processor:- Replace text Configs:- Change the configs according to the screenshot, Search Value (?s)(^.*$)
Replacement Value ${extract:escapeJson()}
Character Set UTF-8
Maximum Buffer Size 1 MB //needs to change if the size of flowfile is more than 1 MB
Replacement Strategy Always Replace
Evaluation Mode Entire text Now we are applying escapeJson function to the extract attribute as a replacement value. Output:- https 2018-03-18T23:55:36.990541Z app\/abc-pxx-p001\/abc123 12.34.56.78:12345 87.65.43.21:80 0.000 0.092 0.000 200 200 1124 364 \"GET https:\/\/mysite.nl:443\/test HTTP\/1.1\" \"abc\/1 CFN\/987 Darwin\/12.3.4\" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:eu-west-1:012345679:targetgroup\/abc-external-abc-de\/abc123def456 \"Root=1-5a-d2a18df071c\" \"mysite.nl\" \"session-reused\" 0 (or) Method2:- By using two Replace text processor in series we can have same expected result instead of using extract text processor. Replace / with \/:- Search Value / Replacement Value \/ Character Set UTF-8 Maximum Buffer Size 1 MB Replacement Strategy Literal Replace Evaluation Mode Entire text
in this processor we are searching for / literal and replacing / with \/. feed the success relation from this replacetext processor to the next replace text processor Replace text processor 2 for replace " with \":- Search Value " Replacement Value \" Character Set UTF-8 Maximum Buffer Size 1 MB Replacement Strategy Literal Replace Evaluation Mode Entire text
In the next replace text processor we are searching for " and replacing " with \" by using literal replace as Replacement strategy. Output flowfile content from second replace text processor will be same as our method 1. https 2018-03-18T23:55:36.990541Z app\/abc-pxx-p001\/abc123 12.34.56.78:12345 87.65.43.21:80 0.000 0.092 0.000 200 200 1124 364 \"GET https:\/\/mysite.nl:443\/test HTTP\/1.1\" \"abc\/1 CFN\/987 Darwin\/12.3.4\" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:eu-west-1:012345679:targetgroup\/abc-external-abc-de\/abc123def456 \"Root=1-5a-d2a18df071c\" \"mysite.nl\" \"session-reused\" 0 Both ways we are getting same expected result, choose the way which can better fit for your case. Let us know if you are having any issues..!! . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
03-19-2018
11:42 AM
@Ashish Wadnerkar To get full query running for the applicationid goto TEZ ui from ambari(there you can see query history) Steps go to tez view via ambari:- 1) From the Ambari home page, hover over the top right corner, and select "Tez View" 2) Next, you can either search by application ID or the hive query itself to find your application. 3) Select your application - the entire hive query should be displayed here and also you can see the status of the query. (or) from hiveserver2.log
... View more
03-19-2018
07:35 AM
1 Kudo
@mel mendoza The execute stream command processor is still running on the own Flow File and is allowing the last thread to complete before releasing the thread i.e one thread is running on the flow file to execute the script/command and another thread is stopping the thread. In this NiFi processor state will no longer schedule to run, but it does not interrupt any running threads as you can see there is small 2 number in the upper right corner indicates the active number of threads. This situation will happens when the processor is not configured properly (or) you need to look into your script that is running in execute stream command processor that causing this situation,To find out where the processor is hung then run several nifi.sh dump to figure out where exactly the processor hung bin/nifi.sh dump thread-dump.txt
If the processor doesn't release the thread with in few minutes, you will have to Restart NiFi to force it to release the thread, as of now there is no force kill kind of operation in NiFi that can kill all the hanging threads. Do NiFi restart then all the threads running on the processor will be released then you can clear the queue.
... View more