Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11278 | 04-15-2020 05:01 PM | |
| 7167 | 10-15-2019 08:12 PM | |
| 3155 | 10-12-2019 08:29 PM | |
| 11614 | 09-21-2019 10:04 AM | |
| 4393 | 09-19-2019 07:11 AM |
05-07-2018
01:32 PM
@srini Please use the below DDL Create EXTERNAL TABLE SRGMSBI1417.json14_07_05_01(
purchaseid struct
<ticid:string,ticlocation:string,custnum :string,
Travleinfo:struct
<Trav:struct
<fname:string,mname:string,
freq:struct
<fre:struct
<frequencynumber:string,frequnecystatus:string>>,
food :struct
<foodpref:array<struct<foodcode:string,foodcodeSegment:string>>>,
Seats :struct<Seat:array<struct<seatberth:string,loc:string>>>, stations :struct<station:array<struct<sationname:string,trainnum:string>>>>>,
Comments :struct<Comment:array<struct<commentno:string,desc:string,passengerseat :struct<intele :string>,passengerloc :struct<intele :string>>>>>)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' location '/user/srgmsbi1417/json14_07_05_01'; i have tested some columns hive> select purchaseid.ticid from default.json limit 10;
+--------+--+
| ticid |
+--------+--+
| 1496 |
+--------+--+ hive> select purchaseid.Comments.comment.commentno from default.json limit 10;
+-----------------+--+
| commentno |
+-----------------+--+
| ["1","5","12"] |
+-----------------+--+ hive> select purchaseid.Comments.comment[0].commentno from default.json limit 10;
+------------+--+
| commentno |
+------------+--+
| 1 |
+------------+--+ **tip: use some sort of json formatter while creating ddl for so that we can understand easily where to end the struct or array. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-07-2018
01:19 PM
@Liana Napalkova Use write.mode to specify is it overwrite/append so that spark will write the file to test directory df.coalesce(1).write.mode("overwrite").format("parquet").save("/user/hdfs/test") if we won't mention any mode spark will fail with directory already exists error because you have already created the test directory.
... View more
05-07-2018
12:51 PM
1 Kudo
@Liana Napalkova .save action in spark writes the data to HDFS, but the permissions are changed in Local file system. Please change the permissions to /home/centos directory in HDFS Login as HDFS user hdfs dfs -chown -R centos /home/centos/*
... View more
05-07-2018
11:09 AM
@B
X
For reducing number of fields and renaming the fields we won't need to use Convert Record processor also, we can acheive by using one UpdateRecord processor, as update record processor expects to add atleast one user-defined properties(like swapping field name...) once we add the one property then we can do reduce or rename the fields. Please see this article as i'm reducing and renaming fields in first update record processor. if you are thinking to just reduce number of fields and not changing any contents then we need to use ConvertRecord processor.
... View more
05-07-2018
10:57 AM
1 Kudo
@vinayak krishnan
Yes it's possible but we need to prepare unique values(ex: uuid is unique for each flowfile) for the Cache Entry Identifiers. Also based on cache update strategy property determines when the already cache entry identifier appears again Cache update strategy replace Replace if present Keep original Determines how the cache is updated if the cache already contains the entry Property value set to replace then the new value will be replaced to already existing for the cache entry identifier. Property value set to Keep original then we Adds the specified entry to cache only if the key doesn't exist. For your reference to this link how to set up unique values for Cache Entry Identifiers. in the above link i have used update attribute processor to change the filename to UUID and then used ${filename} as Cache Entry Identifier, Like that way you can prepare your attribute (or) use ${UUID()} as the identifier.Then in Fetch Distribute Cache we need to have same key to fetch the cached value for the same identifier.
... View more
05-07-2018
03:08 AM
12 Kudos
Short Description: This Tutorial describes how to add fields,removing not required fields and change values of fields in flowfile. Article Introduction Using UpdateRecord processor we are can update the contents
of flowfile. Overview of article: Below sections describes the changes that are going to happen to the input flowfile content vs output flowfile contents. Input: input Json record as follows and having 6 fields/elements in it. [ {
"id" : 1,
"name" : "foo",
"age" : 20,
"state" : "FLORIDA",
"ts_milli" : 1525526792098,
"ts" : "2018-05-03 10:10:10.123"
}]
Expected Output: I'm writing output in Avro Format but for viewing purpose i have converted the output flowfile to json. [ {
"id" : "1",
"rename_id" : "1", //newly added field based on id
"state" : "florida", //changed the value of field
"rename_state" : "FLORIDA", //newly added field based on state
"unique_id" : "1-416425265990923-FLORIDA-aab46988-6a27-4008-9a4d-65655abe9c3c",//id-filename-state-UUID
"ts_milli" : 1525526792098,
"date" : "2018-05-05", //newly added field formatted as date based on ts_milli
"ts" : "2018-05-03 10:10:10.123",
"ts_tz" : "2018-05-03T10:10:10Z",//newly added field formatted as date based on ts
"current_ts" : "2018-05-05 14:23:15", //current timestamp value
"updated_by" : "NiFi", //newly added field based on user attribute value
"gmt_time" : "2018-05-05 18:23:18.611" //newly added field gmt time.
}]
We are missing name,age fields because those fields are not required in output contents. UpdateRecord Processor Params: Record Reader Specifies the Controller Service to use for reading incoming data Record Writer Specifies the Controller Service to use for writing out the records. Replacement Value Strategy Specifies how to interpret the configured replacement values 1.Literal Value The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated. 2.Record Path Value The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. if this option is selected, and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship. **Note** This Processor requires that at least one user-defined Property be added. Please refer to this link for more details regarding UpdateRecord Processor. Flow: GenerateFlowFile Configs: Configure
the processor as shown below Add new properties as schema.name sch user
NiFi
UpdateRecord Strategy as Record Path Configs:- As Replacement Value Strategy Record Path Value Record Reader: JsonTreeReader
Configure the controller service as shown below AvroSchemaRegistry Configs: Add new property sch
{
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","int"] },
{ "name": "name", "type": ["null","string"] },
{ "name": "age", "type": ["null","int"] },
{ "name": "state", "type": ["null","string"] },
{ "name": "ts_milli", "type" : ["null","long"] },
{ "name": "ts", "type": ["null","string"] }
]
}
By using above schema we are reading the incoming json message (or) you can even specify only the required fields in this schema then processor will read only those fields and all the names are case sensitive. Record Writer: I'm using AvroRecordSetWriter i.e we are reading input in json format and writing as avro format. Configure the Controller service as shown below. Schema Text: {
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","string"] },
{ "name": "rename_id", "type": ["null","string"] },
{ "name": "state", "type": ["null","string"] },
{ "name": "rename_state", "type": ["null","string"] },
{ "name": "unique_id", "type": ["null","string"] },
{ "name": "ts_milli", "type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
},
{ "name": "date", "type": ["null","string"] },
{ "name": "ts", "type": ["null","string"] },
{ "name": "ts_tz", "type": ["null","string"] },
{ "name": "current_ts", "type": ["null","string"] },
{ "name": "gmt_time", "type": ["null","string"] },
{ "name": "updated_by", "type": ["null","string"] }
]
}
As you can see there are bunch of new fields are added in the above avro schema. Now we are going to add user-defined properties in update record processor 1./current_ts
replaceRegex( /id, '(^.*$)', '${now():format("yyyy-MM-dd HH:mm:ss")}')
To get current_ts field value we are using replaceRegex function of RecordPathDomainSpecific language and replacing the id value with current timestamp with the specific format 2./date format(/ts_milli,"yyyy-MM-dd") Based on ts_milli record path value and changing the format to get only the date from epoch time in milliseconds 3./rename_id
/id
Assigning the id value to new rename_id field we are going to have same id value for both fields 4./rename_state
/state Assigning the state value to new rename_state field we are going to have same state value for both fields because in our output avro schema includes both fields. 5./ts_tz
format(toDate( /ts, "yyyy-MM-dd HH:mm:ss.SSS"),"yyyy-MM-dd'T'HH:mm:ss'Z'") Changing the format of the ts field value and adding T and Z and assigning the new value to ts_tz field. 6./unique_id
concat(/id,'-','${filename}','-',/state,'-','${UUID()}')
In this field value we are concatenating id,filename,state,UUID() values with - to make the value unique and assigning the value to unique_id field. Our output flowfile will be in avro format to view the flowfile content we are using ConvertAvroToJSON processor to view each processor output. Output: [ {
"id" : "1",
"rename_id" : "1",
"state" : "FLORIDA",
"rename_state" : "FLORIDA",
"unique_id" : "1-423955681497409-FLORIDA-3d71d47d-da7a-44bb-b506-9f7a027933e2",
"ts_milli" : 1525526792098,
"date" : "2018-05-05",
"ts" : "2018-05-03 10:10:10.123",
"ts_tz" : "2018-05-03T10:10:10Z",
"current_ts" : "2018-05-05 16:28:39",
"gmt_time" : null, //this field is going to populated in next UpdateRecord processor
"updated_by" : null //this field is going to populated in next UpdateRecord processor
}] Update Record Strategy as Literal value configs:- Record Reader as Avro reader as we are having feeding avro file with embedded schema so we are keeping Schema Access Strategy as Embedded Avro Schema AvroSet writer configs: Schema Text: {
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","string"] },
{ "name": "rename_id", "type": ["null","string"] },
{ "name": "state", "type": ["null","string"] },
{ "name": "rename_state", "type": ["null","string"] },
{ "name": "unique_id", "type": ["null","string"] },
{ "name": "ts_milli", "type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
},
{ "name": "date", "type": ["null","string"] },
{ "name": "ts", "type": ["null","string"] },
{ "name": "ts_tz", "type": ["null","string"] },
{ "name": "current_ts", "type": ["null","string"] },
{ "name": "gmt_time", "type": ["null","string"] },
{ "name": "updated_by", "type": ["null","string"] }
]
}
Add new user-defined properties as 1./gmt_time
${now():format("yyyy-MM-dd HH:mm:ss.SSS","GMT")} We are adding gmt time by using NiFi expression language 2./state
${field.value:isEmpty():not():ifElse('${field.value:toLower()}','${literal("empty")}')} in this value we are checking state field value and if it's not empty then changing the value to lower case,if the value is null or empty string or blank then keeping the value as literal "empty" 3./updated_by
${user}
We have added user attribute in GenerateFlowfile processor and now we assigning the user attribute value to update_by field value. Final Output: [ {
"id" : "1",
"rename_id" : "1",
"state" : "florida",
"rename_state" : "FLORIDA",
"unique_id" : "1-425389131155321-FLORIDA-c81faa7a-4f2e-414e-bff5-d8722485921c",
"ts_milli" : 1525526792098,
"date" : "2018-05-05",
"ts" : "2018-05-03 10:10:10.123",
"ts_tz" : "2018-05-03T10:10:10Z",
"current_ts" : "2018-05-05 16:52:32",
"gmt_time" : "2018-05-05 20:52:36.088",
"updated_by" : "NiFi"
}] As i have used output format as Avro but we can use same avro schema for other formats(json,csv..). Now we have prepared each record by adding new fields to it and removing unnecessary fields from the record, then Use success relation for further processing. Please refer to this link for more details regarding RecordPathDomainSpecific Language. Please refer this link for NiFi Expression Language. Reference flow.xml update-contents-using-updaterecord.xml
... View more
Labels:
05-06-2018
05:54 PM
1 Kudo
@adam chui As per the documentation of SelectHiveQL processor states that If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query. FlowFile attribute 'selecthiveql.row.count' indicates how many rows were selected. But you can file a jira addressing the issue found. As a work around to fix this issue You can use PutDistributeCacheMap processor to keep all your attributes in cache server and fetch the attributes from cache server using FetchDistributeCacheMap processor. Sample Flow Example:- i'm using GenerateFlowFile processor and adding 3 attributes to the flowfile attr1
56
attr2
67
attr3
89 ReplaceText Processor:- Search Value
(?s)(^.*$)
Replacement Value
${allAttributes("attr1","attr2","attr3"):join("|")} //i'm using allAttributes and keeping all the attributes with "|" pipe delimiter(output flowfile will be 1|2|3) Maximum Buffer Size
1 MB //needs to change the size if the content is more than 1MB size.
Replacement Strategy
Always Replace
Evaluation Mode
Entire text Use this link to evaluate multiple attributes. UpdateAttribute:- We are using this processor to change the filename of flowfile to UUID because We cannot not refer to use UUID as cache identifier reason is output from SelectHiveQL processor is having same filename but different UUID(i.e until selecthiveq processor flowfile having one uniqueid after selecthiveql processor different uuid).** Add new property as filename ${UUID()} PutDistributeCache processor:- Configure DistributedMapCacheServer,DistributedMapCacheClientService and enable them(you need to change cache number of entriesas per your needs,persistence directory if not mentioned then all the entries will be stored in memory). Cache Entry Identifier
${filename} Now we have changed the flowfile content and cached the output content with the filename. SelectHiveQL processor:- Feed success relation to SelectHiveQL processor once the processor outputs flowfile with content of the flowfile then feed the success relationship to FetchDistributeCacheMap:- Configs:- Cache Entry Identifier
${filename:substringBefore('.')} //because based on output ff format we are going to have .avro/.csv extensions
Distributed Cache Service
DistributedMapCacheClientService
Put Cache Value In Attribute
cache_value //the cached content will be put in to this attribute instead putting into flowfile content
Max Length To Put In Attribute
256 //needs to change the value if max length is more than this value Output:- Flowfile will have attribute called cached.value then you can rebuild all your attributes by using getDelimitedField function ${cached.value:getDelimitedField(1, '|')} //will give attr1 field Even without rebuilding all the attributes again by using above expression language you can directly pull the required attribute value and use them in your flow. I have attached my sample flow.xml below, save and upload the xml to your instance and change the as per your needs. selecthiveql-attributes-188338.xml - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-06-2018
03:23 PM
@Mahmoud Shash I think Processor is stuck on running one thread as you can see at top right corner. Can you once make sure you are able to access and get the response from the URL(mentioned invokehttp processor), As InvokeHttp processor is running on Primary Node make sure the queued flowfiles are also on the same Primary Node once. If the flowfiles are on other node(not primary node) then Stop the InvokeHTTP processor and change the Scheduling to All Nodes and if connection/request timeouts values are higher numbers please decrease the value to seconds then start the processor again. You can run InvokeHttp processor on all nodes(because you are triggering invoke http based on GetFaceBook pages processor group). I don't see any logs regarding invokehttp processor(there are some 420 status codes for GetTwitter[id=11b02d87-0163-1000-0000-00007f5a45a6] processor).
... View more
05-06-2018
02:18 PM
@Mahmoud Shash
Please add more details like your flow screenshot,processors scheduling information,JVM memory and logs if you found any issues, So that we can better understand the issue.
... View more
05-04-2018
12:21 PM
5 Kudos
Short Description: This Tutorial describes how to use
DeleteHBaseRow processor to delete one or more row keys in HBase table. Thanks alot @Ed Berezitsky helping out to write this article. Introduction: NiFi 1.6 introduced DeleteHbaseRow processor, based on the
row key values provided to this Processor, deletes those row key/s in Hbase
table. This processor does deleteall
operation on the row key that we intended to delete from the hbase table. This Processor works with row key values are presented as
FlowFile Content (or) FlowFile attributes. We can select Row ID
Location in this processor one of the below ways, by default this property
configured to have FlowFile content as the value. 1. FlowFile content -Get the row key(s) from the flowfile
content. 2. FlowFile attributes -Get the row key from an expression
language statement. 2.1.
Having one row key value associated with the flowfile. 2.2. Having
more than one row key values associated with the flowfile. DeleteHBaseRow Processor Params: Client Controller:
Configure and enable HBase_1_1_2_ClientService controller service. Table Name:
HBase Table name supporting Expression Language. Row Identifier:
Row key to be deleted when Row ID Location set to ‘Flow file attribute’. Value
will be ignored if a location set to ‘Content’. Note, it doesn’t support list
of values. Row ID Location:
Source of Row ID(s) to be deleted, either content of attribute. Flow File Fetch
Count: Amount of flow files to be consumed from incoming connection(s)
to be combined in single run. Batch Size:
Max number of deletes to send per batch. Actual Batch size won’t exceed number
of row keys in a content of each flow file. Delete Row Key Separator: Specify Delimiter and supports REGEX,
Expression Language. Character Set: Character set used to encode the row key for
HBase. 1.Delete HBase
Row/s based on Flow File content: Delete Row Key
Separator specifies delimiter for a list of row keys. It could be any
value, including new line character. Example: Flow: Explanation:
Generate Row Key(s) to delete using GenerateFlowFile processor: I have used Generate FlowFile processor with custom text has
all row key/s with comma separated 1,2,3,4,5,6. Then we need to feed flow file to DeleteHBaseRow processor. DeleteHBaseRow
Processor: As we are having comma separated list of row key values as
Flowfile contents, Configure and enable HbaseClientService Configure DeleteHBaseRow processor as following Once the deletion is done the processor will routes the
flowfile(s) to success relation and adds two new attributes to each flowfile (these
write attributes are added only when we are using Row ID Location as flowfile-content). Write Attributes: rowkey.start The first
rowkey in the flowfile. Only written when using the flowfile's content for
the row IDs. rowkey.end The last
rowkey in the flowfile. Only written when using the flowfile's content for
the row IDs. rowkey.start and rowkey.end attributes are added to the
flowfile with first and last values of flowfile content i.e our flowfile
contents are 1,2,3,4,5,6 so rowkey.start value is 1 and rowkey.end value is 6. **Note** If we try to delete a rowkey that doesn’t exist in hbase
table also this processor won’t throw any error message i.e. if we specify 99
value in our flowfile contents as we are not having 99 as rowkey value still
processor doesn’t show any error message. Reference flow.xml for deletehbaserow from flowfile content 1delete-hbase-row-s-based-on-flow-file-content.xml How to Configure
DeleteHbaseRow processor for Other Seperators/Delimiters? With Multi Separator/Delimiter: In this file we are having multi separator as colon and
comma(:,) 1:,2:,3 DeleteHbaseRow
Configs: Keep Delete Row Key separator value as :, With Newline
separator:- Configure Delete Row Key Separator as shift+enter (or) \n 2.Row ID Location FlowFile
Attributes: 2.1:If we are
having one row key value as attribute to the flowfile: If we are having row key to delete from Hbase table as
flowfile attribute then we can use expression language. Explanation: GenerateFlowFile
Configs: Add new properties as tab_name
and row_key attributes with
values delete_hbase_demo and 1 to
the flowfile. DeleteHbaseRow
Configs: Now we can configure DeleteHbaseRow processor with
expression language so that processor gets tab_name and row_key values from the
flowfile attributes and perform deletions dynamically. Reference flow.xml for deletehbaserow from flowfile
attribute 2delete-hbase-row-from-flow-file-attribute.xml 2.2. If
we are having single/multiple row key values as attribute to the flowfile: DeleteHbaseRow processor doesn’t support for comma separated
list of values presented as flowfile attributes. Here is workaround example on
how to delete row keys without changing flow file content. - Using expression language with indexof and ifElse functions loop
through all the list of row_keys values Flow: Explanation: GenerateFlowFile configs: Add new properties as tab_name
and row_key attributes with
values delete_hbase_demo and 1,2,3,4,5
to the flowfile. RouteOnAttribute Configs: Add new property to check row_keys attribute value Null or
empty and auto terminate this empty relationship. Feed the unmatched relationship from RouteOnAttribute
processor to DeleteHBaseRow processor. DeleteHBaseRow Configs: Configure and enable the controller service. Configure DeleteHBaseRow processor as following: Row Identifier property value as ${row_keys:indexOf(','):equals(-1):ifElse('${row_keys}','${row_keys:substringBefore(",")}')} //check the indexof delimiter if equals to -1 then use row_keys(one value in row_keys attribute else use the value before , and delete that row key in hbase table. Fork
the success relationship from DeleteHBaseRow processor Fork1 of Success
relationship:- UpdateAttribute Configs:- Configure the processor as following row_keys property with value as ${row_keys:indexOf(','):equals(-1):ifElse('','${row_keys:substringAfter(",")}')} //if index of “,” equals -1 then‘’(empty value set) else update the row_keys attribute value with substringAfter “,” This loop will continue until all the row_keys values will
be deleted in the Hbase table. Fork2-Success
relationship: Use this relationship for further processing. Reference flow.xml for DeleteHBaseRow having list of
row_keys as flowfile attribute 22delete-list-of-row-keys-as-attribute-values.xml Create and put
data into hbase table:- bash$ hbase shell hbase> create 'delete_hbase_demo','cf' hbase> put 'delete_hbase_demo','1','cf:name','foo'
hbase> put 'delete_hbase_demo','2','cf:name','bar'
hbase> put 'delete_hbase_demo','3','cf:name','foo'
hbase> put 'delete_hbase_demo','4','cf:name','bar'
hbase> put 'delete_hbase_demo','5','cf:name','foo'
hbase> put 'delete_hbase_demo','6','cf:name','bar'
hbase> scan 'delete_hbase_demo'
... View more