Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11280 | 04-15-2020 05:01 PM | |
| 7174 | 10-15-2019 08:12 PM | |
| 3160 | 10-12-2019 08:29 PM | |
| 11618 | 09-21-2019 10:04 AM | |
| 4395 | 09-19-2019 07:11 AM |
04-22-2018
04:41 AM
@Sami Ahmad Could you please create new Hbase table(PUR_ACCT_PHX1) with cf1 as column family having 40 versions create 'PUR_ACCT_PHX1',{NAME =>'cf1', VERSIONS =>40} Then do your sqoop import to this table(PUR_ACCT_PHX1) and column family(cf1) name sqoop import--connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=patronQA)(port=1526))(connect_data=(service_name=patron)))"--username PATRON --table PATRON.TAB4 --hbase-table PUR_ACCT_PHX1 --column-family cf1 --hbase-row-key "ACCT_NUM"-m 4 Then do scan on the hbase table with the version 40 scan 'PUR_ACCT_PHX1',{VERSIONS=>40} (or) once you altered the versions of the hbase table to 40 then run sqoop import again to the same table then do scan table. let us know are you able to see all versions (or) not.
... View more
04-21-2018
05:54 PM
@Abhinav Joshi As you are Fetching results of Join in this case, We need to store the state in DistributeMapCache/HDFS/Hive/Hbase and then pull the state value from DistributeMapCache/HDFS/Hive/Hbase and use that value as lower bound, run the join incrementally. Example:- As i'm having emp,dept tables with joindate(timestamp datatype) as incremental column name in emp table. Flow:- Flow Explanation:- 1.GenerateFlowFile processor:- This processor will be trigger for the flow and Added a new property increment.value cache_key 2.FetchDistributeMapCache:- Configure and enable DistributedMapCacheClientService and change the below property values Cache Entry Identifier
${increment.value} Distributed Cache Service
DistributedMapCacheClientService Put Cache Value In Attribute
stored.state //if found then the value will be added to the flowfile as a attribute with name as stored.state 3.UpdateAttribute:- We have feeded both success/not-found relations to Update Attribute processor Add two properties current.state
${now():format("yyyy-MM-dd HH:mm:ss")} //current timestamp as current.state attribute stored.state
${stored.state:isNull():ifElse('1900-01-01 12:00:00','${stored.state}')} //if the value is null(in first run) then we are changing as 1900-01-01 12:00:00 and if the value is presented then we are keeping as is.
You can change the default value, stored/current.state values as per your requirements. 4.ExecuteSql:- SQL select query select e.id,d.name from
emp e join
dept d
on e.deptid=d.deptid
where
e.joindate > '${stored.state}'
and
e.joindate > '${current.state}'
as my emp table having joindate as incremental column so i have used stored.state and current.state attribute values in my join where clause to run incrementally. -- Fork the success relation from executesql processor Fork1: to stored the current.state in distributecache Fork2: For other processing -- Fork1:To store the state:- 5.Replace Text:- Search Value
(?s)(^.*$) Replacement Value
${current.state}
Replacement Strategy Always Replace Evaluation Mode Entire text
Now we are changing the flowfile content with current.state value as new flowfile content. - 6.PutDistributeCacheMap:- Cache Entry Identifier
${increment.value} - Fork2:-Further processing like processing (or) storing the results to HDFS..etc I have attached sample template below save/upload template to your NiFi instancce and change the configs as per your requirements. executesql-storestate-187764.xml
... View more
04-21-2018
03:26 PM
1 Kudo
@Siddarth Wardhan Probably it's a bug in Hive ODBC driver, As a workaround you can use the below query, this query will run in beeline and in ODBC driver also. INSERT INTO Target_table(col_1, col_2, col_3) SELECT col_1, col_2,int(null) col_3 FROM Source_table; (or) INSERT INTO Target_table SELECT col_1, col_2,int(null) col_3 FROM Source_table; //if the order of columns matches between select statement and target_table These both query will insert same results as your query does (i.e . INSERT INTO Target_table(col_1, col_2) SELECT col_1, col_2 FROM Source_table;)
... View more
04-21-2018
04:09 AM
2 Kudos
@Suresh Dendukuri One way of doing is to use Replace text processor to search for "NULL" in json record key/value and replace with empty string(blank), by using this way we can replace all NULL key/value pairs from flowfile content. Method1:- Input:- [ { "color": "black", "category": "hue", "type": "NULL", "code": { "rgba": [255,255,255,1], "hex": "#000" } }, { "color": "NULL", "category": "value", "code": { "rgba": [0,0,0,1], "hex": "#FFF" } }, { "color": "red", "category": "hue", "type": "primary", "code": { "rgba": NULL, "hex": "#FF0" } }] ReplaceText Configs:- Search Value
("type": "NULL",|"color": "NULL",|"rgba": NULL,) //search for all possible NULL key/value pairs(| is used as OR)
Replacement Value //replace the with empty Character Set
UTF-8
Maximum Buffer Size
1 MB //needs to change the size if your flowfile is more than 1 MB
Replacement Strategy
Regex Replace
Output From Replace Text Processor:- [ { "color": "black", "category": "hue", "code": { "rgba": [255,255,255,1], "hex": "#000" } }, { "category": "value", "code": { "rgba": [0,0,0,1], "hex": "#FFF" } }, { "color": "red", "category": "hue", "type": "primary", "code": { "hex": "#FF0" } }] All NULL values are replaced from the Flowfile Content. (OR) Method2:- This way we are using ConvertRecord processor with JsonTreeReader as Record Reader and JsonSetWriter as Record Writer with the below property as Suppress Null Values
Always Suppress //if the record having null value then we are not writing that into Output Flowfile. Flow:- GenerateFlowFile:- This processor is used to generate sample records Custom text property as [ { "color": "black", "category": "hue", "type": "NULL", "code": { "rgba": [255,255,255,1], "hex": "#000" } }, { "color": "NULL", "category": "value", "code": { "rgba": [0,0,0,1], "hex": "#FFF" } }, { "color": "red", "category": "hue", "type": "primary", "code": { "rgba": NULL, "hex": "#FF0" } }] add new property schema.name sch ReplaceText:- As we are going to use ConvertRecord processor and your input file having "NULL",NULL as values ,but these are not treated as null in Avro Schema Registry. so we are going to preparing json records which are going to understand by ConvertRecord Processor. --- Search Value
(NULL|"NULL") //search for NULL (or) "NULL" in the flowfile content
Replacement Value
null //replace with null Maximum Buffer Size
1 MB
Replacement Strategy
Regex Replace
Evaluation Mode
Entire text Output:- [ { "color": "black", "category": "hue", "type": null, "code": { "rgba": [255,255,255,1], "hex": "#000" } }, { "color": null, "category": "value", "code": { "rgba": [0,0,0,1], "hex": "#FFF" } }, { "color": "red", "category": "hue", "type": "primary", "code": { "rgba": null, "hex": "#FF0" } }] All the "NULL",NULL and replaced with null in the flowfile content. ConvertRecord Processor:- Record Reader/Writer controller services are configured and enable the controller service and add AvroSchemaRegistry as we need to configure the avro schema with can match your input json record. JsontreeReader Configs:- ArrayAvroSchemaRegistry Configs:- sch {
"type" : "record",
"name" : "schema",
"namespace" : "avroschema",
"fields" : [ {
"name" : "color",
"type" : ["null","string"]
}, {
"name" : "category",
"type" : "string"
}, {
"name" : "type",
"type" : ["null","string"]
}, {
"name" : "code",
"type" : {
"type" : "record",
"name" : "code",
"fields" : [ {
"name" : "rgba",
"type" : {
"type" : "array",
"items" : ["null","int"]
}
}, {
"name" : "hex",
"type" : ["null","string"]
}]
}
} ]
} JsonSetWriter Configs:- Change the Supress Null Values to Always Suppress(we are not writing the null values for the json key's in the record) Output:- [{"color":"black","category":"hue","code":{"rgba":[255,255,255,1],"hex":"#000"}},{"category":"value","code":{"rgba":[0,0,0,1],"hex":"#FFF"}},{"color":"red","category":"hue","type":"primary","code":{"rgba":[],"hex":"#FF0"}}] all the null values in the json records are not written in the output flowfile but the only issues is with rgba array even we are having null values for the array still jsonsetwriter writer an empty array for rgba ReplaceText:- Now we are replacing the empty rgba empty array in this processor Search Value
"rgba":[], //search for this value
Replacement Value //same as method1 replacetext configs Maximum Buffer Size
1 MB
Replacement Strategy
Literal Replace
Evaluation Mode
Entire text Output:- [{"color":"black","category":"hue","code":{"rgba":[255,255,255,1],"hex":"#000"}},{"category":"value","code":{"rgba":[0,0,0,1],"hex":"#FFF"}},{"color":"red","category":"hue","type":"primary","code":{"hex":"#FF0"}}] I have attached my template below, save/upload the template and change as per your needs. remove-nulls-187756.xml - If the Answer Addressed Your Question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-19-2018
05:32 PM
@Santanu Ghosh
That's correct.Just create table with \t delimited then the hive can read the file with the delimiter specified. Example:- Creating Tab delimited table in Hive:- I'm having tab delimited file in hdfs bash$ hadoop fs -cat /apps/hive/warehouse/t1/t.txt<br>1\tfoo
I have created a table in hive with tab delimited as hive> create table default.t1 (id string,name string) row format delimited fields terminated by '\t'; Do desc formatted default.t1; to see the delimiter for the table and all the Serde,Input,Output Formats are set to be default(as mentioned in the above answer). # Storage Information | NULL | NULL
SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL
OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
Compressed: | No | NULL
Storage Desc Params: | NULL | NULL
| field.delim | \t
Now check the data in t1 table hive> select * from default.t1;
+--------+----------+--+
| t1.id | t1.name |
+--------+----------+--+
| 1 | foo |
+--------+----------+--+ Hive read the input tab delimited t.txt file and shown the results. Let me know if you are having any additional questions..!! . If the Answer Addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-19-2018
03:12 PM
@Santanu Ghosh Q1:
What is the correct way to define default Hive table ?
It
depends on how you datafile is delimited if your datafile is delimited with
^A(ctrl-a) then you don’t need to specify any delimiter,because hive default delimiter is ^A(ctrl-a) in ASCII. in this case if we do desc formatted <table-name>; hive doesn't shows any delimiter because table is created with default delimiter i.e ^A If
your datafile is comma delimited then we need to create table with row format
delimited fields terminated by ','. when we created table with create table mydb.user (uid int,name string); this ddl statement without any format and delimiters then hive creates user table with default serde (serialize,deserializer ). This serde instructs hive on how to process a record (Row) and serde library is inbuilt to Hadoop API. Default serde --> LazySimpleSerDe<br>Default InputFormat --> TextInputFormat<br>Default OutputFormat --> HiveIgnoreKeyTextOutputFormat<br>Default Delimiter --> ^A(ctrl-a) //although the delimiter is not showing in desc formatted <table-name>; Q2:
Also, what is the default delimiter for Hive table ? By
default, tables are assumed to be of text input format and the delimiters are
assumed to be ^A(ctrl-a). Please
refer to this link for more details regarding hive ddl operations. Q3:
create table mydb.user (uid int,name string) row format delimited fields
terminated by '\t' ; If
we define table with specified delimiter so that’s the reason why you are able
to see delimiter as \t in desc formatted table command.
... View more
04-18-2018
12:02 PM
@Shantanu kumar If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-18-2018
12:02 PM
1 Kudo
@Hemu Singh For this use case you need to use Query Record processor and Based on the Record Reader controller services configured this processor will execute sql queries on the Flowfile Contents and The result of the SQL query then becomes the content of the output FlowFile in the format as specified in the Record Writer controller service. Flow:- Flow Explanation:- 1.Generate Flowfile //added some test data 2.UpdateAttribute //added schema to the flowfile 3.Filter Column QueryRecord // 3.1.need to configure/enable Record Reader/Writer controller services. 3.2.added new property that will run sql where query on the flowfile 3.3.Use the original relation to store the file as is i.e having 100 records in it. 4.QueryRecord // 4.1.add two new properties that can run row_number window function(i'm having id column in the flowfile) and get first 75 records in one relation first 75 records select * from (select *,Row_Number() over(order by id asc) as rn from FLOWFILE) r where r.rn <= 75 76 to 100 record
select * from
(select *,Row_Number() over(order by id asc) as rn from FLOWFILE) r where r.rn > 75 and r.rn <= 100
Use the above two relations first75records and 76to100 record relationships for further processing. In addition Query Record supports Limit offset..etc also so you can use either row_number/limit offset ..etc to get only the desired 75 records from the flowfile. Please refer to this and this for QueryRecord processor configs and usage.
... View more
04-18-2018
02:38 AM
1 Kudo
@Chris Grren We cannot add Stored State values to the flow files attributes, How ever you can create custom processor which can implement the logic of getting current batch timestamp attribute added to each flowfile. You can also use RestApi call to the AttributeRollingWindow processor to know what is the current batch timestamp value. We can use Get method with make a call to the below path to get the current state of the processor GET /processors/{id}/state //Gets the state for a processor For more details about Rest Api calls refer to this link. Example:- As i'm having AttributeRollingWindow processor with processor id as d372cfe5-0162-1000-398e-62f2d3652166 and from command line we need to make rest api call as below. Making Rest Api call from Command Line:- $ curl -i -X GET -H 'Content-Type:application/json' http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state
Response:- as you can view below these are the states saved in the processor and the first key is our current batch timestamp i.e 1524017336077 {
"componentState" : {
"componentId" : "d372cfe5-0162-1000-398e-62f2d3652166",
"stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.",
"localState" : {
"scope" : "LOCAL",
"totalEntryCount" : 7,
"state" : [ {
"key" : "1524017336077", //current batch timestamp
"value" : "1.0"
}, {
"key" : "1524017336079",
"value" : "1.0"
}, {
"key" : "1524017336082",
"value" : "1.0"
}, {
"key" : "1524017336083",
"value" : "1.0"
}, {
"key" : "1524017336085",
"value" : "1.0"
}, {
"key" : "1524017336086",
"value" : "1.0"
}, {
"key" : "count",
"value" : "7"
} ]
}
}
}
States stored in the processor:- Making same Rest Api call from NiFi processor:- use InvokeHttp processor with Remote URL as http://localhost:8080/nifi-api/processors/d372cfe5-0162-1000-398e-62f2d3652166/state Output:- From the response relationship you will get same response as above {
"componentState" : {
"componentId" : "d372cfe5-0162-1000-398e-62f2d3652166",
"stateDescription" : "Store the values backing the rolling window. This includes storing the individual values and their time-stamps or the batches of values and their counts.",
"localState" : {
"scope" : "LOCAL",
"totalEntryCount" : 7,
"state" : [ {
"key" : "1524017336077", //current batch timestamp
"value" : "1.0"
}, {
"key" : "1524017336079",
"value" : "1.0"
}, {
"key" : "1524017336082",
"value" : "1.0"
}, {
"key" : "1524017336083",
"value" : "1.0"
}, {
"key" : "1524017336085",
"value" : "1.0"
}, {
"key" : "1524017336086",
"value" : "1.0"
}, {
"key" : "count",
"value" : "7"
} ]
}
}
} As the first state array value will be your current batch timestamp. . If the Answer Addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-17-2018
11:49 AM
@Saikrishna Tarapareddy We can use series of Replace Text processors to make it a valid json and also we need to consider the file size that you are getting if size is big then you need to change maximum buffer size properties in replace text processor but this process will not be optimal, because each time replace text processor needs to keep the file in buffer then apply all the replacements that we configured in the processor. Instead of this you can easily use SplitText processor then use merge content processor with defragment as merge strategy (The 'Defragment' algorithm combines fragments(fragment id attribute is added by split text processor) that are associated by attributes back into a single cohesive FlowFile) by using this strategy we are going have same json messages in valid json array. so that you can use all record based processors to parse this file, by following this way will be more optimal.
... View more