Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11291 | 04-15-2020 05:01 PM | |
| 7189 | 10-15-2019 08:12 PM | |
| 3168 | 10-12-2019 08:29 PM | |
| 11643 | 09-21-2019 10:04 AM | |
| 4400 | 09-19-2019 07:11 AM |
03-27-2018
08:43 PM
@Matt Burgess Thank a ton Matt for the valuable info, Could you please convert your comment to answer, As my answer is not pointing to the root cause.
... View more
03-27-2018
02:40 PM
@swathi thukkaraju
I think you are having header as first line in your file so we need to skip that header and then apply your case class to the file and also use escape for the special split character because if you specify split then spark takes as regex character for | regex matching character would be \\|. my input file:- name,age,state
swathi|23|us
srivani|24|UK
ram|25|London
case class schema(name:String,age:Int,brand_code:String)
val rdd = sc.textFile("file://<local-file-path>/test.csv") (or) val rdd = sc.textFile("/test.csv") //for hadoop file
val header = rdd.first()
val data = rdd.filter(row => row != header)
val df1 = data.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()<br> (or) case class schema(name:String,age:Int,brand_code:String) <br>val rdd = sc.textFile("file://<local-file>/test.csv") //for local file (or) val rdd = sc.textFile("/test.csv") //for hadoop file
val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
val df1 = rdd1.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF() in both ways we are skipping header line then applying our case class schema to the file once we apply case class and to df then we are going to have dataframe. If you don't have header then just load the file then apply split and case class and convert as dataframe case class schema(name:String,age:Int,brand_code:String)
val rdd = sc.textFile("file://<local-file-path>/test.csv") (or) val rdd = sc.textFile("/test.csv") //for hadoop file
val df1 = rdd.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF() (or) case class schema(name:String,age:Int,brand_code:String) <br>val rdd = sc.textFile("file://<local-file>/test.csv") //for local file (or) val rdd = sc.textFile("/test.csv") //for hadoop file
val df1 = rdd.map(_.split("\\|")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF() Output from both ways:- scala> df1.show()
+-------+---+----------+
| name|age|brand_code|
+-------+---+----------+
| swathi| 23| us|
|srivani| 24| UK|
| ram| 25| London|
+-------+---+----------+ How ever i have used csv package in spark 1.6.2 version it works fine, using this package is simple method than assigning case class . But you can choose either of those methods as per your requirements..!!
... View more
03-27-2018
11:45 AM
1 Kudo
@swathi thukkaraju By using Csv package we can do this use case easily here is what i tried i had a csv file in hdfs directory called test.csv name,age,state
swathi,23,us
srivani,24,UK
ram,25,London
sravan,30,UK initialize spark shell with csv package spark-shell --master local --packages com.databricks:spark-csv_2.10:1.3.0 loading the hdfs file into spark dataframe using csv format as we are having header so i have included header while loading val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/test/test.csv") if your file is in local the use val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("file:///<local-path>/test.csv") once the loading completes to view schema scala> df.printSchema()
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- state: string (nullable = true) Now we are having df dataframe with schema then we can apply all the filter operations on the schema Filtering and storing state is us,UK,London:- val df2=df.filter($"state"==="us") (or) val df2=df.filter(col("state")==="us") scala> df2.show()
+------+---+-----+
| name|age|state|
+------+---+-----+
|swathi| 23| us|
+------+---+-----+ as we can see above we are having only state is us in df2 dataframe. In the same way we need to filter and create new dataframes for state is UK and London val df3=df.filter(col("state")==="UK") val df4=df.filter(col("state")==="London") once the filtering and creating new data frames is done now we need to write df2,df3,df4 dataframes into hdfs with headers included. As we cannot create specific files while writing the data back to hdfs, with below command i'm creating us directory in hdfs then loading the df2 data frame data into us directory df2.write.format("com.databricks.spark.csv").save("/user/test/us") same way we need to store df3,df4 into different directories in hdfs df3.write.format("com.databricks.spark.csv").save("/user/test/UK") df4.write.format("com.databricks.spark.csv").save("/user/test/London") now when you run hadoop fs -ls /user/test/ you are going to have 3 directories(us,UK,London) and the corresponding part-00000 files in those directories. In addition we can create register temp table once the data loaded into df dataframe, then we can run sql queries on top of temp table using sqlContext. val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/test/test.csv")
df.registerTempTable("temp")
val df2=sqlContext.sql("select * from temp where state ='us')
val df3=sqlContext.sql("select * from temp where state ='UK')
val df4=sqlContext.sql("select * from temp where state ='London')
df2.write.format("com.databricks.spark.csv").save("/user/test/us")
df3.write.format("com.databricks.spark.csv").save("/user/test/UK")
df4.write.format("com.databricks.spark.csv").save("/user/test/London")
in both ways(using filter and using register temp table) results will be the same.
... View more
03-27-2018
10:51 AM
1 Kudo
@Chen Yimu There is nothing wrong with your operation, the table used in PutHivestreaming processor needs to have all the data types are strings. if you want to have id type as Bigint then after Querydatabasetable processor store the Avro file into HDFS location using PutHDFS processor, then create avro table on top of HDFS directory. (or) Use ConvertAvroToORC processor after QueryDatabasetable processor then store the Orc to HDFS location using PutHDFS processor,then create table on top of HDFS directory.
... View more
03-27-2018
10:09 AM
1 Kudo
@TAMILMARAN c HDFS works on write once read many. It means only one client can write a file at a time. Multiple clients cannot write into an HDFS file at same time.
When one client is given permission by Name node to write data on data node block, the block gets locked till the write operations is completed. If some other client requests to write on the same block of a particular file in data node, it is not permitted to do so. It has to wait till the write lock is revoked on a particular data node. All the requests are in the queue and only one client is allowed to write at a time. These are the very good links regarding HDFS read and write operations, could you please refer to them https://data-flair.training/blogs/hadoop-hdfs-data-read-and-write-operations/ http://hadoop.apache.org/docs/r1.2.1/hdfs_design.html#Replication+Pipelining https://data-flair.training/blogs/hdfs-data-write-operation/ https://data-flair.training/blogs/hdfs-data-read-operation/
... View more
03-27-2018
12:12 AM
@Shantanu kumar Strange,I have tried with your configs i.e running on primary node,Monitoring Scope cluster Reporting primary with Cron Driven as scheduling strategy and i got inactivity flow file if the flow has been inactive for 1 minute. How ever i would suggest to run monitor activity processor on all nodes here is the scenario you are having nifi cluster of 2 nodes and querydatabasetable (QDT) runs on primary node(1 node) once QDT fetches all the delta records then the primary node got changed to 2 node while your Monitor activity processor running on primary node(i.e 2 node now). At this time the QDT processor output flowfile is in 1 node at this time monitor activity processor doesn't process because it is running on 2 node now and the flowfile will be waiting in the queue before Monitor activity processor until when the 1 node will be primary node again. if Continually Send Messages is set to true:- if we keep this property to true also we don't get messages continuesly because we are running processor at specific time so we will get only 1 message at 10:01 am. If you have scheduled the processor to run at 0 sec then threshold duration at 1 min then at every minute processor sends a message until the activity got restored. if Copy Attributes is set to True:- If we keep this property to True i.e will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file. As we are auto terminating restored and success relationships we don't have to keep this property value to true.
... View more
03-25-2018
06:02 PM
@Shantanu kumar You can use Query Record processor for this use case as we can run sql queries on the content of the flowfile, based on Store_id we can route the flowfile contents. Here is what i tried.. I'm having a flow file content as follows STORE_ID|Name
1|online
2|online
3|online
4|online
5|online
6|online
7|online
8|online
9|online
10|online
11|online
12|online
13|online
14|online
15|online
16|online
17|online
18|online
19|online
20|online Query Record processor configs:- As i have added new properties so that we can determine which relation does the record will routes to Store_id 11 to 20 SELECT * FROM FLOWFILE WHERE STORE_ID >'10' and STORE_ID <'21' (or) SELECT * FROM FLOWFILE WHERE STORE_ID >=11 and STORE_ID <=20 Store_id 1 to 10 SELECT * FROM FLOWFILE WHERE STORE_ID >'0' and STORE_ID <'11' (or) SELECT * FROM FLOWFILE WHERE STORE_ID >= 1 and STORE_ID <= 10 As you are having Pipe delimited separator we need to specify the separator in CsvReader Controller service. CsvReaderConfigs:- as we are using Schema Name property as Schema Access Strategy we need to setup the schema.name property attribute to the flowfile by using update attribute processor. Update Attribute configs:- Avro Schema Registry configs:- Validate Field Names
true
sch
{ "type": "record", "name": "nifiRecord", "fields" : [ {"name": "STORE_ID", "type": ["null", "int"]}, {"name": "Name", "type": ["null", "string"]} ] } This avro schema registry is used by both CsvReader and CsvSetwriter Controller service. CsvSetWriter Configs:- In this csvsetwriter processor also we are using same avro schema registry and separator as | and change schema access strategy as Use 'Schema Name' Property. Now once the flow file content feed to QueryRecord processor then processor runs the sql query that we have written and routes the records to the respective Relations. Store_id 1 to 10 relation will gets below records STORE_ID|Name
1|online
2|online
3|online
4|online
5|online
6|online
7|online
8|online
9|online
10|online Store_id 11 to 20 relation will gets below records STORE_ID|Name
11|online
12|online
13|online
14|online
15|online
16|online
17|online
18|online
19|online
20|online My sample Flow:- 1.GenerateFlow File //to produce input data
2.Updateattribute //to setup schema name
3.QueryRecord //to route the content based on query. for your reference I have attached my xml file upload and change as per your requirements 178446-split-file.xml Let us know if you are facing issues/questions..!!
... View more
03-25-2018
06:23 AM
4 Kudos
@John Smith Extract the b attribute value before using it in Jolt transform by using EvaluateJson path processor and keep destination as flowfile-attibute.Then add your b attribute as a new property in evaluatejson path processor once we add b attribute to the flowfile then your jolt transform will apply substring function on the b attribute. EvaluateJsonPath configs:- Example:- for testing i have added b attribute value as default in my jolt transform Jolt spec testing:- As you can see at right below corner we are having output json content with c key having e as value. Let us know if you are having issues/questions..!!
... View more
03-25-2018
05:28 AM
@Pramod Kalvala In NiFi we are having Count Text processor which will adds the number of lines,non empty lines,characters in the text file. Count text processor write Attributes:- Name Description text.line.count The number of lines of text present in the FlowFile content text.line.nonempty.count The number of lines of text (with at least one non-whitespace character) present in the original FlowFile text.word.count The number of words present in the original FlowFile text.character.count The number of characters (given the specified character encoding) present in the original FlowFile Example:- If you are having content of the flowfile as below and we are having empty line as second line in the flowfile. Once we feed this content to the Count text processor having below configs:- Count Lines true Count Non-Empty Lines true Count Words true Count Characters true Split Words on Symbols true
Output Flowfile Attributes:- count text processor has been added line.count,nonempty lines count, character count to the flowfile. (or) By using ExecuteStream command processor we can run wc -l command to get the number of lines in the text document. (or) By using query record processor to get lines in the flowfile content Useful links for Query record processor https://community.hortonworks.com/articles/140183/counting-lines-in-text-files-with-nifi.html https://community.hortonworks.com/articles/146096/counting-lines-in-text-files-with-nifi-part-2.html If you are using QueryDatabase table,execute sql processors then we will have row.count attribute associated with the output flowfile from the which will give the number of rows has been fetched from the source. To Convert Content as Flowfile Attribute:- for this use case we can use Extract text processor to extract the content and store as flowfile attribute Extract text Configs:- Add new property with the regex (.*) i.e capture all the content and keep the content as flowfile attribute name data. change the Enable DOTALL Mode to true if your flowfile content having new lines in it. Most important properties are Maximum Buffer Size 1 MB Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated. Maximum Capture Group Length 1024 Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated. You have to increase these properties values in order of your flowfile size to get all the content of the flow file into attribute. It's not recommended to extract all the contents and keep them as attributes, as the attributes are kept in-memory. please refer to below link for nifi best practices and deeper https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#DeeperView https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#best-practice
... View more
03-25-2018
04:51 AM
@Pramod Kalvala Could you please add more details about your question like are you expecting count number of line in the flowfile?
... View more