Member since
08-03-2019
186
Posts
34
Kudos Received
26
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1909 | 04-25-2018 08:37 PM | |
5833 | 04-01-2018 09:37 PM | |
1546 | 03-29-2018 05:15 PM | |
6657 | 03-27-2018 07:22 PM | |
1949 | 03-27-2018 06:14 PM |
03-22-2018
07:12 PM
1 Kudo
@archana v You can read the data from HDFS directly to DF provided it's a data format with the embedded schema. For example. Reading Avro in Spark - Refer this link Reading Parquet in Spark val parquetFileDF = spark.read.parquet("people.parquet") Reading ORC in spark - Refer this link Reading JSON in spark val peopleDF = spark.read.json("examples/src/main/resources/people.json")
But if your data do not belong to any of these "enriched" formats, you can always read those files as RDD and convert them to DF. Follows some examples. Import necessary classes import org.apache.spark.sql.{Row,SparkSession}
import org.apache.spark.sql.types.{DoubleType,StringType,StructField,StructType}
Create SparkSession Object, Here it's spark val spark:SparkSession=SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs
Let's an RDD to make it DataFrame val rdd = sc.parallelize(Seq(("first",Array(2.0,1.0,2.1,5.4)),("test",Array(1.5,0.5,0.9,3.7)),("choose",Array(8.0,2.9,9.1,2.5))))
Method 1 Using SparkSession.createDataFrame(RDD obj) . val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()
+------+--------------------+
| _1| _2|
+------+--------------------+
| first|[2.0,1.0,2.1,5.4] |
| test|[1.5,0.5,0.9,3.7] |
|choose|[8.0,2.9,9.1,2.5] |
+------+--------------------+
Method 2 Using SparkSession.createDataFrame(RDD obj) and specifying column names. val dfWithSchema = spark.createDataFrame(rdd).toDF("id","vals")
dfWithSchema.show()
+------+--------------------+
| id| vals|
+------+--------------------+
| first|[2.0,1.0,2.1,5.4] |
| test|[1.5,0.5,0.9,3.7] |
|choose|[8.0,2.9,9.1,2.5] |
+------+--------------------+
Method 3 This way requires the input rdd should be of type RDD[Row] . val rowsRdd: RDD[Row]= sc.parallelize(Seq(Row("first",2.0,7.0),Row("second",3.5,2.5),Row("third",7.0,5.9)))
create the schema val schema =newStructType().add(StructField("id",StringType,true)).add(StructField("val1",DoubleType,true)).add(StructField("val2",DoubleType,true))
Now apply both rowsRdd and schema to createDataFrame() val df = spark.createDataFrame(rowsRdd, schema)
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first|2.0|7.0 |
|second|3.5|2.5 |
| third|7.0|5.9 |
+------+----+----+
... View more
03-22-2018
06:54 PM
@priyal patel Your statement has a syntax issue and that is why you are getting the current error. So if you retype your pig script as follows, you won't have the syntax error. extract = FOREACH a GENERATE FLATTEN(REGEX_EXTRACT_ALL(rec,'^(Mapped)\\"(\\{+(\\[+([^/].*)+\\]),methods=(\\[+([A-Z].*)+\\]),produces=(\\[+([^ ].*)+\\])+\\}\\)"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(throws)\\s+(.*)')) AS (t1:chararray,url:chararray,type:chararray,produces:chararray,t2:chararray,t3:chararray,classes:chararray,throw:chararray,exception:chararray); Now, your regex is as is as you mentioned in the question. I need some more info regarding what you want to fetch out of this error message. And then we can work on the desired regex.
... View more
03-22-2018
02:47 PM
@rajdip chaudhuri As mentioned by @Abdelkrim Hadjidj, NiFi is a great candidate to solve these kinds of issues! He talked about processors like list & fetch files. As the name would have suggested, they list and fetch the data without you having to write any code and also gives you the properties attached to those files. For example the directory structure, a very important need you have mentioned in your use case. A bunch of advantages that you can have using NiFi for this use case. No need to write any code Advanced functionality that will help you "maintain the state". That is the files which are only new will be listed. This will help you run your "file server" functionality in the real time as well if needed. Very high throughput with low latency. Rapid data acquisition pipeline development without writing a lot of code. Provides a highly concurrent model without a developer having to worry about the typical complexities of concurrency. Is inherently asynchronous which allows for very high throughput and natural buffering even as processing and flow rates fluctuate The resource-constrained connections make critical functions such as back-pressure and pressure release very natural and intuitive. The points at which data enters and exits the system as well as how it flows through are well understood and easily tracked And biggest of all, OPEN SOURCE. Let know if you need some any other help!
... View more
03-22-2018
05:12 AM
@Utkal Sinha 1. Try deleting localhost from your NiFi properties. Keep the web host property as follows nifi.web.http.host= 2. Running run-nifi.bat runs the process in foreground. DO NOT close the terminal. I assume you are not doing that. Make the aforementioned change, restart NiFi and wait for a few minutes. Sometimes the process take some time before being fully initiated. Let know if that works.
... View more
03-22-2018
04:28 AM
@umang s Can you please share some more details about your use case?
... View more
03-22-2018
04:27 AM
@Mark Lin You shouldn't use the same topic for both these operations, IMHO. Here is a scenario I can think of. Let's say we are using topic topic1 to fetch the data in an increment manner. Kafka persists the data by default for 7 days. So let's say you have last 7 days data in the topic already! Now you get this request to do the "bulk" data load again and suddenly you have all the data pulled to this topic, with some 7 days of data already in the table. That will cause data duplication in Kafka and your downstream consumer needs to do this extra effort to make sure you don't persist the duplicate records in the final storage. That might be some extra work to do. Also, a bulk load can bypass Kafka all together and we can use either NiFi or Sqoop to do it, whenever needed. Hope that helps!
... View more
03-22-2018
04:18 AM
@JP One way of doing it is using ExecuteScript Processor. Follows a sample solution. I generated a flow file using GenerateFlowFile Processor which has an attribute named "Property1" with the value set to "Property2". Now, this goes to an ExecuteScript Processor where I am using a Groovy script to create a new property "on the fly"! Follows the groovy script which is creating the property on the fly. flowFile = session.get()
if(!flowFile) return
myAttr = flowFile.getAttribute('property1')
flowFile = session.putAttribute(flowFile, myAttr, 'myValue')
session.transfer(flowFile, REL_SUCCESS) The flow file coming out of this processor looks something like this. Hope that helps!
... View more
03-21-2018
06:05 PM
1 Kudo
@Saikrishna Tarapareddy The FlattenJSON processor "flats out" any nesting in the JSON documents which are sent to it as inputs. Let me give you a quick example. So my JSON looks like this originally. {
"Name":"A","age":"23",
"gender":"m",
"score":{
"sub1":"56",
"sub2":"66",
"sub3":"76"
}
} Pay special attention to the column "score". It's a nested column! After passing it through FlattenJSON, it "flatten out" to something like this. {
"Name":"A",
"age":"23",
"gender":"m",
"score.sub1":"56",
"score.sub2":"66",
"score.sub3":"76"
} Now have a look at the format and "score" column(s) specifically! This is what FlattenJSON does. Now since we have some level of understanding, do you have any nested column in your input which is not flattened? PS - Arrays will remain as is after processing JSON data with FlattenJSON! So don't confuse arrays with nested columns.
... View more
03-21-2018
05:22 PM
@Vivek Singh The query in GetMongo Processor should be in Extended JSON format. The format you are mentioning is the Mongo Shell format and not compatible with the processors. Switch your query to look something like below. {
"createdAt":{$ne:null},
"updatedAt" : {$ne:null}
}
... View more
03-21-2018
01:57 AM
1 Kudo
"Absolute path" of the flow file is where it is read from and where it is written to! For example "ListFile" lists all the files in a given directory and creates an attribute "absolute.path" which DO NOT include the file name. Another example can be PutHDFS. It writes the file to HDFS and, no prize for guessing, create an attribute called "absolute.path" similar to the previous example. Question is do you have any such processor in your flow? At least the snapshot doesn't show any! Second, do you need to commit the data to disk before writing to MySQL, probably not! So how to write the data to MySQL? I would recommend using PutDatabaseRecord processor! It fits the bill in this use case. Hope that helps!
... View more