Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11196 | 04-15-2020 05:01 PM | |
| 7096 | 10-15-2019 08:12 PM | |
| 3088 | 10-12-2019 08:29 PM | |
| 11416 | 09-21-2019 10:04 AM | |
| 4310 | 09-19-2019 07:11 AM |
02-18-2018
02:50 PM
2 Kudos
@Mark Use csv serde to escape quote characters in csv file, ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = "\"") Example:- input data:- a, quick,"brown,fox jumps",over,"the, lazy" Crete table statement:- create table hcc(field1 string,
field2 string,
field3 string,
field4 string,
field5 string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = "\""); Select the data in table hive> select * from hcc;
+---------+---------+------------------+---------+------------+--+
| field1 | field2 | field3 | field4 | field5 |
+---------+---------+------------------+---------+------------+--+
| a | quick | brown,fox jumps | over | the, lazy |
+---------+---------+------------------+---------+------------+--+
1 row selected (0.055 seconds) So in our create table statement we have mentioned quote character as " and seperator as ,. When we query table hive considers all the data enclosing quotes as one filed.
... View more
02-18-2018
11:42 AM
@Chris R To match white space use the below regex ID "ID":\s(.*?), \s matches any whitespace character (equal to [\r\n\t\f\v ]). You can validate your regex here regex We need to match whitespaces if we are using extract text processor,don't need to match any whitespace characters if you are using Evaluatejson path processor to extract content and keep it as attribute to the flowfile.
... View more
02-17-2018
07:50 PM
@Chris R In Extract Text processor add new property as ID Regex:- "ID":(.*?), (or) As you are having json message instead of Extract text processor use Evaluatejson processor and change below properties Destination flowfile attribute add new property as ID $.ID will add new attribute called ID as flowfile attribute. Configs:-
... View more
02-14-2018
01:38 AM
1 Kudo
@dhieru singh You can extract the queue configurations i.e backPressureDataSizeThreshold (or) backPressureObjectThreshold from json response keep them as attributes by using evaluate json processor. QUEUE_Configured_Size $.component.backPressureObjectThreshold Compare the configured size with how many flowfiles are queued QUEUE_SIZE
$.status.aggregateSnapshot.flowFilesQueued Route on Attribute Configs:- Queue_size_ge80%
${QUEUE_SIZE:ge(${QUEUE_Configured_Size:multiply(0.8)})} so we are checking the queue_size(how many flowfiles are queued) greater than equal to queue_configured_size multiply with 0.8(means 80%), if the flowfiles queued are greater than or equal to 80% we are going to keep those flowfiles into Queue_size_ge80% relation. Example:- if my queue configured to have 10 objects then if the queued flowfiles are greater than equal to 8 then the ff routes to Queue_size_ge80% property. 2.To get all connection UUID's use the below api with recursive it will shows up all the connection id's curl -i -X GET http://localhost:9090/nifi-api/flow/process-groups/root/status?recursive=true then create a script (or) use a processor that can extract all the connection-id's(connectionStatusSnapshot.id) from json response then use those connection id's in your monitoring the queue size.
... View more
02-11-2018
05:17 PM
@Surya RS
Data for comm column will be available in HDFS table location and it won't create any error because Hive works "schema on read" when we query the table it will show the data for all the available columns only. If you reload(insert overwrite) data for the emp table with new set of columns(id,name,sal) then the data for comm column will be lost because we are going to overwrite the all existing data(having data for comm column) with new columns. After reloading with new data if you add comm column to the table it will show null values for comm column because there is no data existed for comm column in hdfs location. Example:- i have emp external table with same schema as you have mentioned in the question hive# select * from emp;
+-----+-------+-------+------------+--+
| id | name | sal | comm |
+-----+-------+-------+------------+--+
| 1 | hcc | 1000 | empsalary |
| 1 | hdp | 2000 | hdpsalary |
| 3 | hdf | 3000 | hdfsalary |
+-----+-------+-------+------------+--+ Now i'm going to remove comm column by using replace columns hive# ALTER TABLE emp REPLACE COLUMNS(id int,name string,sal int); Selecting the data from emp table again hive# select * from emp;
+-----+-------+-------+--+
| id | name | sal |
+-----+-------+-------+--+
| 1 | hcc | 1000 |
| 1 | hdp | 2000 |
| 3 | hdf | 3000 |
+-----+-------+-------+--+ comm column has been removed from table and there will be no issues when you query the table. If you specify comm column in select statement then hive returns error. hive# select comm from emp;
Error: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:7 Invalid table alias or column reference 'comm': (possible column names are: id, name, sal) (state=42000,code=10004) Hive returns error as the specified column is not in table. If you check HDFS directory the data for comm column will be still exists, when we add again comm column to the table it will returns the data for comm column again. Adding comm column to the table again:- hive# ALTER TABLE emp add COLUMNS(comm string);
hive# select * from emp;
+-----+-------+-------+------------+--+
| id | name | sal | comm |
+-----+-------+-------+------------+--+
| 1 | hcc | 1000 | empsalary |
| 1 | hdp | 2000 | hdpsalary |
| 3 | hdf | 3000 | hdfsalary |
+-----+-------+-------+------------+--+ After adding comm column again to the table we are able to see the data again.Hive works on schema on read it will displays the data as it matches with schema and datatypes(if datatype doesn't match it shows as nulls). If you are trying to access this table using spark then spark gets table info from hcatalog server and loads the data as per your schema. Loading data into spark after comm column gets removed. >>> hc.table("default.emp").show(10,False)
+---+----+----+
|id |name|sal |
+---+----+----+
|1 |hcc |1000|
|1 |hdp |2000|
|3 |hdf |3000|
+---+----+----+ Adding comm column again and loading into Spark >>> hc.table("default.emp").show(10,False)
+---+----+----+---------+
|id |name|sal |comm |
+---+----+----+---------+
|1 |hcc |1000|empsalary|
|1 |hdp |2000|hdpsalary|
|3 |hdf |3000|hdfsalary|
+---+----+----+---------+ . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
02-08-2018
01:49 PM
1 Kudo
@Kashif Amir Cast the column p as null (or) some value and data types of the column p needs to match on both tables. Example:- hive# select cast("ta" as string)p
union
select cast("tb" as string)p;
+--------+--+
| p |
+--------+--+
| ta |
| tb |
+--------+--+
In the example i'm having 2 tables having same column name(p) and datatype(String) after doing union we are getting merged results. (or) select "i" op
union
select "ji" p;
+---------+--+
| op |
+---------+--+
| i |
| ji |
+---------+--+
... View more
02-06-2018
01:29 PM
2 Kudos
@Varun R
After split text processor use extract text processor and add new property with matching regex, then the extracted attribute will be added as the flowfile attribute. Example:- After split text you are having each address in a flowfile as like this http://aaa.com/q="bigdata"≈i_key="", now you want to know which query param value(ex:bigdata) has been used. Then use extract text add new property query_values
q="(.*?)" output flowfile:- Once the flowfile processed by extract text processor it matches the regex and adds the attribute query_values to the flowfile. By this way you are going to know which query param values are used to get response. . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
02-05-2018
06:18 PM
5 Kudos
@Daminee Sao, By using restApi search url you can get the id of the process group RestApi URL:- curl -i -X GET http://localhost:9090/nifi-api/flow/search-results?q=<name(or)id>; Example:- I'm having flowfiledrop as the name of process group in my NiFi and to get the id of the flowfiledrop process group , RestApi url would be curl -i -X GET http://localhost:9090/nifi-api/flow/search-results?q=flowfiledrop As you can see in i've mentioned flowfiledrop process group name in q parameter $ curl -i -X GET http://localhost:9090/nifi-api/flow/search-results?q=flowfiledrop
HTTP/1.1 200 OK
Date: Mon, 05 Feb 2018 18:10:03 GMT
X-Frame-Options: SAMEORIGIN
Cache-Control: private, no-cache, no-store, no-transform
Content-Type: application/json
Vary: Accept-Encoding
Vary: User-Agent
Content-Length: 336
Server: Jetty(9.4.3.v20170317) {"searchResultsDTO":{"processorResults":[],"connectionResults":[],"processGroupResults":[{"id":"619a2801-0161-1000-a4c2-95e4430e977f","groupId":"3b738dba-0161-1000-c808-f7d38f21fcab","name":"flowfiledrop","matches":["Name: flowfiledrop"]}],"inputPortResults":[],"outputPortResults":[],"remoteProcessGroupResults":[],"funnelResults":[]}} the id of the flowfile drop process group is 619a2801-0161-1000-a4c2-95e4430e977f. . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
... View more
02-04-2018
05:20 PM
3 Kudos
@Daminee Sao
There is no restapi call supported in nifi that can clear off all the queues inside the process group level, all the drop requests rest api call expects connection id(queue id) to be included in call. But by making use of Process Group Connections and Drop Requests RestApi call we can clear off all the queues inside the process group. curl -i -X GET {nifi-ip-address}/nifi-api/process-groups/{id}/connections //to get all the connections in the process group
curl -i -X POST {nifi-ip-address}/nifi-api/flowfile-queues/{connection-id}/drop-requests //drop all flowfiles for the specific connection Below are the process group level rest-api calls refer to below link for more api calls supported on process group level. https://nifi.apache.org/docs/nifi-docs/rest-api/index.html Example:- i have my nifi instance running on localhost port 9090 and my process group id is 619a2801-0161-1000-a4c2-95e4430e977f, once i make below rest api call will result all the connections that i'm having in the specific process group curl -i -X GET http://localhost:9090/nifi-api/process-groups/619a2801-0161-1000-a4c2-95e4430e977f/connections i'm having 2 connection id's(58629c69-0161-1000-b2e4-254f8afcde9c,619d132e-0161-1000-f689-198ce61282da)inside my process group. To drop all requests in this connections, call drop requests api curl -i -X POST http://localhost:9090/nifi-api/flowfile-queues/58629c69-0161-1000-b2e4-254f8afcde9c/drop-requests
curl -i -X POST http://localhost:9090/nifi-api/flowfile-queues/619d132e-0161-1000-f689-198ce61282da/drop-requests Write a script that can extract all connection id's from connections api call and loop the connection id's in drop requests api call. (or) By using NiFi Flow:- GenerateFlowfile //trigger flow
InvokeHTTP //by using get method we are going to get json flowfile with all connections as content.
SplitJson //split on $.connections to get each connection as individual flowfile
EvaluateJsonPath //extract $.id from json message and keep as flowfile attribute
InvokeHTTP //by using post method use the extracted attribute and make a call to drop requests http://localhost:9090/nifi-api/flowfile-queues/${connection_id}/drop-requests I have attached the xml file of the above flow save and use the flow make changes as per your needs. drop-requests-167593.xml
... View more
02-04-2018
03:13 PM
@elango vaithiyanathan if you are having integer,float.. values represented as string datatypes,then you can use string datatypes in aggregation. Example:- 10,10.5 value in age column represented as string data type, we can use aggregate functions on this age column directly. gr_df2=gr_df.agg(sum(col('age'))) (or) You can cast String data types to int,double..etc in aggregations also. from pyspark.sql.types import *
gr_df2=gr_df.agg(sum(col('age').cast("int"))) Casting age column as integer and apply aggregate functions on age column. Create a temp table on the dataframe, use your sql queries on the temp table gr_df2.registerTempTable("people") hc.sql("select * from people").show(10,False)
... View more