Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11132 | 04-15-2020 05:01 PM | |
| 7028 | 10-15-2019 08:12 PM | |
| 3074 | 10-12-2019 08:29 PM | |
| 11272 | 09-21-2019 10:04 AM | |
| 4191 | 09-19-2019 07:11 AM |
11-07-2017
04:01 PM
1 Kudo
@Andy Gisbo, If you want to extract those values then you can do that in 2 ways Don't want to change the flowfile content method:- after Extract text processor use update attribute processor:- we are going to prepare id,name attributes in this processor add new properties to the update attribute processor 1. using replaceall function id as ${last_line:replaceAll('.*"id":(.*)(?:,).*','$1')} //extract value after "id": and replace whole last_line attribute value with the value $1(first capturing group) name property as ${last_line:replaceAll('.*"name":"(.*\d)(?:").*','$1')} ////extract value after "name": and replace whole last_line attribute value with the value $1(first capturing group) Update Attribute Configs:- 2.Using JsonPath function:- instead of replace all function as last_line attribute having json message so we can use jsonPath function by using nifi expression language id as ${last_line:jsonPath('$.id')} name as ${last_line:jsonPath('$.name')} Configs:- Both these ReplaceAll and JsonPath functions will result as same it's better and easy to use JsonPath function instead of replace all. After this processor we will have id,name attr associated with the flowfile so that you can use them in your insert statement insert into tableA (id, name) values ('${id}', '${name}') Flow:- getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> Extract Text -> UpdateAttribute -> SplitJson -> EvaluateJsonPath ->....->insert statement Method2:- Changing contents of the flowfile:- Replace Text Processor instead of Extract Text processor //replace text processor will change the flowfile contents here in this processor we are going to change the whole content of flow file(i.e whole array to last record) Example:- [{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}] //input flowfile content output:- {"id":3,"name":"bar"} //output flowfile content Then use Evaluate json path processor to extract contents of the flowfile Your goal is to prepare insert statement so keep Destination as flowfile-attribute in this processor. Output:- Flow:- getFile -> UpdateAttribute(adding avro shema name) -> ConvertRecord -> ReplaceText --> Evaluatejson -> insert to cassandra. Both these methods will add id,name attributes to the flowfile Method1:- we are not going to change the contents of the flowfile Method2:- we are going to change the contents of the flowfile. //replacing whole content with last record As you can decide which one is better fits for you use case.
... View more
11-07-2017
01:33 PM
2 Kudos
@Andy Gisbo Before splitjson processor use an ExtractText processor and add new property to the processor as last_line as ^.*\,(\{.*)]$ Example:- As you are converting csv to json and then using split json processor to split json array into individual messages. Lets consider your json array is like [{"id":1,"name":"foo"},{"id":2,"name":"bar"},{"id":3,"name":"bar"}] you are interested in only {"id":3,"name":"bar"} //which is last record in the array Use extract text processor before split json and add new property as last_line Extract Text processor Configs:- in this processor you can see Max Capture Group Length property which having default value as 1024(i.e 1 kb) //it only captures 1 kb length of message. if your last record (or) message having more than 1 kb size then increase the size as you can see above i have given 4000(i.e almost 4 kb). Max Buffer Size default value is 1 MB //Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. you need to configure this property as per your flow file size. Output of this processor will add last_line attribute to the flow file so that you can use that attribute by using ${last_line}
... View more
11-06-2017
11:39 PM
@dhieru singh You can get directory structure from path attribute associated with the flow file, path //The fully qualified name of the directory on the SFTP Server from which the file was pulled. Filename attribute will give you the actual file name that got pulled from the directory filename //The name of the file on the SFTP Server in Fetchsftp processor we are going to use ${path}/${filename} //to fetch the actual file this expression will give directory and also file path. if you give top level directory and Recurse Subdirectories property to true the processor will navigate through all the subdirectories are in that top level directory(if nifi having access to those sub directories) and list all the files in those sub directories. Each flowfile will have path,filename attribute as path attribute gives you from which directory the file got pulled from and filename attribute gives you actual filename of the file.
... View more
11-06-2017
01:35 PM
@Team Spark Your TEMP_tab table having 3 columns and your insert query having 4 columns(* means 3 columns from temp_tab and substr(mytime,0,10) means extra 1 column) use the below query will work for your case FROM TEMP_TAB INSERT OVERWRITE TABLE main_TAB
PARTITION (mytime)
SELECT id,age,substr(mytime,0,10) as mytime; * *in addition in the above insert statement you are going to miss mytime column value as you are doing sub string that means source data is going to miss from temp_tab table to main_tab table. Ex:- temp_tab having 2017-10-12 12:20:23 but main_tab will have 2017-10-12, here we are going to miss 12:20:23 time from temp_tab to main _tab. In case if you dont want to miss the data then create main tab table with 4 columns in with dt as partition column CREATE TABLE IF NOT EXISTS main_TAB(id int,mytime STRING,age int)
PARTITIONED BY (dt string)
STORED AS ORC
tblproperties ("orc.compress"="ZLIB"); then do insert statement as below FROM TEMP_TAB INSERT OVERWRITE TABLE main_TAB PARTITION (mytime) SELECT *,substr(mytime,0,10) as mytime; in this case partition column would be dt and you are not missing temp_tab data at all.
... View more
11-04-2017
01:16 PM
1 Kudo
@Andy Gisbo if your column.2 attribute having value as "20170115" //with double quotes surrounded. you need to remove double quotes before todate function, try the below Expression in your update attribute processor. ${column.2:replaceAll('"',''):toDate("yyyyMMdd","GMT")} //first replace double quotes with empty value and then parse column.2 value to date and convert that to GMT. In addition if you want to format "20170115" to "2017-01-15" then ${column.2:replaceAll('"',''):toDate("yyyyMMdd","GMT"):format("yyyy-MM-dd"):append('"'):prepend('"')} //converting column.2 to gmt format then append and prepend the value with "(double quotes) In addition if you want to format "20170115" to 2017-01-15 then ${column.2:replaceAll('"',''):toDate("yyyyMMdd"):format("yyyy-MM-dd")} 2.if your column.2 attribute having value as 20170115 //without double quotes Change the expression in update attribute processor as below ${column.2:toDate("yyyyMMdd", "GMT")} This expression will return GMT value. In addition if you want to format 20170115 to 2017-01-15 then ${column.2:toDate("yyyyMMdd","GMT"):format("yyyy-MM-dd")} what if still doesn't work? Goto Right top corner on you NiFi UI and click on GlobalMenu(i.e you can see three horizontal lines at top right corner) Click on Help then click on Expression Language Guide and look for toDate function is it supports converting to GMT format or not. Because these conversion to GMT format is not supported in NiFi 1.1.0 version.
... View more
11-03-2017
11:18 PM
1 Kudo
@manisha jain Yeah you can manipulate results coming out from your Query database processor itself. if you want to add static fields to the each avro record then in your Querydatabase processor. change Columns to return property as *,'in' type //return all columns from table and add type column with value in for each row. Processor Config:- as you can view above screenshot how we are configuring Columns to Return property above. (or) Another way doing this is Once you get results from Query database processor then 1.Use convertavrotojson processor //we are converting avro data into json.
2.Use ReplaceText processor //to search for literal value } and do replacement value as your desired value in jsonformat
ex:- ,"type":"in"}
in this processor using replace text we are having json array of records and we are looking for } that means at end of each record and we are going to replace with ,"type":"in"} for each record so the json record structure will wont change.
3.Use convertjsontoavro processor //we are converting json data into avro data again and this data having new field that we have added to the json message.
... View more
11-03-2017
04:34 AM
@Viswa For regular unix timestamp field to human readable without T in it is lot simpler as you can use the below conversion for that. pyspark
>>> hiveContext.sql("select from_unixtime(cast(1509672916 as bigint),'yyyy-MM-dd HH:mm:ss.SSS')").show(truncate=False)
+-----------------------+
|_c0 |
+-----------------------+
|2017-11-02 21:35:16.000|
+-----------------------+ pyspark
>>>hiveContext.sql("select from_unixtime(cast(<unix-timestamp-column-name> as bigint),'yyyy-MM-dd HH:mm:ss.SSS')") But you are expecting format as yyyy-MM-ddThh:mm:ss For this case you need to use concat date and time with T letter pyspark
>>>hiveContext.sql("""select concat(concat(substr(cast(from_unixtime(cast(1509672916 as bigint),'yyyy-MM-dd HH:mm:ss.SS') as string),1,10),'T'),substr(cast(from_unixtime(cast(1509672916 as bigint),'yyyy-MM-dd HH:mm:ss.SS') as string),12))""").show(truncate=False)
+-----------------------+
|_c0 |
+-----------------------+
|2017-11-02T21:35:16.00|
+-----------------------+
Your query:- pyspark
>>>hiveContext.sql("""select concat(concat(substr(cast(from_unixtime(cast(<unix-timestamp-column-name> as bigint),'yyyy-MM-dd HH:mm:ss.SS') as string),1,10),'T'),
substr(cast(from_unixtime(cast(<unix-timestamp-column-name> as bigint),'yyyy-MM-dd HH:mm:ss.SS') as string),12))""").show(truncate=False) //replace <unix-timestamp-column-name> with your column name in case if you want to test in hive then use the below query hive# select concat(concat(substr(cast(from_unixtime(cast(1509672916 as bigint),'yyyy-MM-dd HH:mm:ss.SSS') as string),1,10),'T'),
substr(cast(from_unixtime(cast(1509672916 as bigint),'yyyy-MM-dd HH:mm:ss.SSS') as string),12));
+--------------------------+--+
| _c0 |
+--------------------------+--+
| 2017-11-02T21:35:16.00 |
+--------------------------+--+
Hope this will help to resolve your issue...!!!
... View more
11-02-2017
12:23 PM
1 Kudo
@Gayathri Devi As per logs you have shared
starttime=2017-08-01 18:40:03.0
starttime=2017-08-02 18:40:03.0
starttime=2017-08-03 18:40:03.0
starttime=2017-08-04 18:40:03.0
starttime=2017-08-05 18:40:03.0
starttime=2017-08-06 18:40:03.0
starttime=2017-08-09 18:40:03.0 partitions are created to view them Do hive# desc formatted <database-name>.<table-name> get the location where table got stored and check the hdfs location bash# hdfs dfs -ls -R <table-location>
... View more
11-02-2017
02:41 AM
3 Kudos
@Gayathri Devi First you need to create a hive non partition table on raw data. Then you need to create partition table in hive then insert from non partition table to partition table. For testing i have tried an example as below:- Right now my hive normal table(i.e not partition table) having these list of records. Normal table(without partition column):- hive# create table text_table(id int, dt string,name string) stored as textfile location '/user/yashu/text_table';
hive# select * from text_table;
+----------------+----------------------+------------------+--+
| text_table.id | text_table.dt | text_table.name |
+----------------+----------------------+------------------+--+
| 1 | 2017-10-31 10:12:09 | foo |
| 1 | 2017-10-31 12:12:09 | bar |
| 1 | 2017-10-30 12:12:09 | foobar |
| 1 | 2017-10-30 10:12:09 | barbar |
+----------------+----------------------+------------------+--+ Then i want to do daily partition table for this case i need to create a new table having dt as partition column in it Partition table:- There are 2 kinds of partitions in hive 1.Static partitions //adding partition statically and loading data into it,takes less time than dynamic partitions as it won't need to look into data while creating partitions.
2.Dynamic partitions //creating partitions dynamically based on the column value, take more time than static partitions if data is huge because it needs to look into data while creating partitions.
hive# create table partition_table(
id int,
name string)
partitioned by (dt string); 1.Dynamic Partition:- once you create partition table then select from non partition table, hive# insert into partition_table partition(dt) select id,name, substring(dt,0,10) from text_table; //we need to have daily partition so i'm doing sub string from 0-10 i.e 2017-10-31 so this will create date partitions
INFO : Time taken to load dynamic partitions: 0.066 seconds
INFO : Loading partition {dt=2017-10-30} //creating 2017-10-30 partition
INFO : Loading partition {dt=2017-10-31} //creating 2017-10-30 partition
INFO : Time taken for adding to write entity : 0
INFO : Partition default.partition_table{dt=2017-10-30} stats: [numFiles=1, numRows=2, totalSize=18, rawDataSize=16]
INFO : Partition default.partition_table{dt=2017-10-31} stats: [numFiles=1, numRows=2, totalSize=12, rawDataSize=10]
No rows affected (10.055 seconds) We are doing dynamic partitions in our above statement i.e we are creating partition based on our data. if you want to view the partitions then give hive# show partitions partition_table; //we can view all partitions that has create in the table.
+----------------+--+
| partition |
+----------------+--+
| dt=2017-10-30 |
| dt=2017-10-31 |
+----------------+--+
2 rows selected (0.064 seconds) Drop partitions:- hive# alter table partition_table drop partition(dt>'0') purge; //it will drop all the partitions (or) you can drop specific partition by mentioning as dt='2017-10-30'(it will drop only 2017-10-30 partition)
INFO : Dropped the partition dt=2017-10-30
INFO : Dropped the partition dt=2017-10-31
No rows affected (0.132 seconds) To view all partition directories information hadoop fs -ls -R /apps/hive/warehouse/partition_table/
drwxrwxrwx - hdfs 0 2017-11-01 21:45 /apps/hive/warehouse/partition_table/dt=2017-10-30 //partition directory
-rwxrwxrwx 3 hdfs 18 2017-11-01 21:45 /apps/hive/warehouse/partition_table/dt=2017-10-30/000000_0 //file in the partition
drwxrwxrwx - hdfs 0 2017-11-01 21:45 /apps/hive/warehouse/partition_table/dt=2017-10-31
-rwxrwxrwx 3 hdfs 12 2017-11-01 21:45 /apps/hive/warehouse/partition_table/dt=2017-10-31/000000_0 To view data from one partition select * from partition_table where dt='2017-10-30';
+---------------------+-----------------------+---------------------+--+
| partition_table.id | partition_table.name | partition_table.dt |
+---------------------+-----------------------+---------------------+--+
| 1 | foobar | 2017-10-30 |
| 1 | barbar | 2017-10-30 |
+---------------------+-----------------------+---------------------+--+ As you can see out dt column in non partitioned table having 2017-10-30 12:12:09 but in partition table having 2017-10-30 because as we are loading the data to partition table we did sub string on dt column. --> if you don't want to change the source data i.e dt column from non partition table to partition table then create partition table with hive# create table partition_table(
id int,
name string,
dt string)
partitioned by (daily string); //new partition column
hive# insert into partition_table partition(daily) select id,name,dt, substring(dt,0,10) from text_table; //we are having daily as partition column and in select statement we have used dt column twice one is to load actual dt column data and second one is to create partition column.
show partitions partition_table;
+-------------------+--+
| partition |
+-------------------+--+
| daily=2017-10-30 |
| daily=2017-10-31 |
+-------------------+--+
2 rows selected (0.066 seconds)
0: jdbc:hive2://usor7dhc01w01.use.ucdp.net:21> select * from partition_table; //as you can see we haven't changed dt column data as we have new daily column as partition column
+---------------------+-----------------------+----------------------+------------------------+--+
| partition_table.id | partition_table.name | partition_table.dt | partition_table.daily |
+---------------------+-----------------------+----------------------+------------------------+--+
| 1 | foobar | 2017-10-30 12:12:09 | 2017-10-30 |
| 1 | barbar | 2017-10-30 10:12:09 | 2017-10-30 |
| 1 | foo | 2017-10-31 10:12:09 | 2017-10-31 |
| 1 | bar | 2017-10-31 12:12:09 | 2017-10-31 |
+---------------------+-----------------------+----------------------+------------------------+--+ **keep in mind partition column needs to be last column in your select statement, if not hive creates partitions based on what ever the last column is in your select statement. 2.Static partition:- We are statically creating partition and loading all the data into that partition, hive# insert into partition_table partition(dt='2017-10-30') select id,name from text_table; //we are mentioned partition name here as dt='2017-10-30' so all data will be loaded into 2017-10-30 partition if you are doing static partition that means all the dt data should be 2017-10-30 and you can view we haven't mentioned dt column in select statement. hive# show partitions partition_table;
+----------------+--+
| partition |
+----------------+--+
| dt=2017-10-30 |
+----------------+--+
hive# select * from partition_table; //all dt will be 2017-10-30 because we are doing static partition column
+---------------------+-----------------------+---------------------+--+
| partition_table.id | partition_table.name | partition_table.dt |
+---------------------+-----------------------+---------------------+--+
| 1 | foo | 2017-10-30 |
| 1 | bar | 2017-10-30 |
| 1 | foobar | 2017-10-30 |
| 1 | barbar | 2017-10-30 |
+---------------------+-----------------------+---------------------+--+
As you need to decide which kind of partitions are best fit for your case. Hope this will help you to understand about partitions..!!
... View more
11-02-2017
01:13 AM
@sri chaturvedi if you are having d_last_updt attribute value as 2017-03-12 06:07:12:01 then in Route On Attribute processor add new property as ${d_last_updt:toDate('yyyy-MM-dd'):format('yyyyMMdd'):gt('20170731')} //todate extracts year month date from your lastupdatedate attribute and formats as yyyyMMdd then give your value in gt function. This property checks all d_last_updt attribute value if the value is greater than 20170731 if attribute value satisfies the condition then it routes to property name relation , if not it routes to unmatched relation. Route on Attribute processor Configs:-
... View more