Created 04-07-2016 05:00 PM
In my data I have 8 different schemas. I want to create 8 different data frame for them and save them in 8 different tables in hive. So far I created a super bean class which holds shared attributes and each bean class extends it. Based on the type attribute I created different objects. The problem is I am unable to save them in different data frame. Is there any way I can do that? Here is my code so far, which works fine for one schema.
xmlData.foreachRDD( new Function2<JavaRDD<String>, Time, Void>() { public Void call(JavaRDD<String> rdd, Time time) { HiveContext hiveContext = JavaHiveContext.getInstance(rdd.context()); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() { public JavaRow call(String line) throws Exception{ String[] fields = line.split("\\|"); //JavaRow is my super class JavaRow record = null; if(fields[2].trim().equalsIgnoreCase("CDR")){ record = new GPRSClass(fields[0], fields[1]); } if(fields[2].trim().equalsIgnoreCase("Activation")){ record = new GbPdpContextActivation(fields[0], fields[1], fields[2], fields[3]); } return record;}}); DataFrame df; df = hiveContext.createDataFrame(rowRDD, JavaRow.class); df.toDF().registerTempTable("Consumer"); System.out.println(df.count()+" ************Record Recived************"); df = hiveContext.createDataFrame(rowRDD, GPRSClass.class); hiveContext.sql("CREATE TABLE if not exists gprs_data ( processor string, fileName string, type string, version string, id string )STORED AS ORC "); df.save("/apps/hive/warehouse/data", "org.apache.spark.sql.hive.orc",SaveMode.Append); } return null; } });
Created 04-07-2016 05:16 PM
so the data is in the same stream? I.e. one row will have one format and the second one will have another? If you have 8 Kafka Streams I suppose you wouldn't ask.
In that case you have two chances:
- Make a identify function apply it and then filter the RDD 8 times for each type then each time do the correct parsing and persisting in SQL.
As an illustration
val inputStream ...
inputStream.map(record => ( identifyType(record), record))
type1Stream = inputStream.filter ( record._1 == "type1" );
type2Stream = inputStream.filter ( record._1 == "type2" );
...
type1Stream.map(record => myParse1Function(record._2);
type1Stream.map(persist as my dataframe in table1);
type2Stream.map(record => myParse2Function(record._2);
type2Stream.map(persist as my dataframe in table2);
- Make a identify function apply it and then group by the type somehow, problem is how do you save the grouped by values they will all end up in the same executor I think. Would be a bit more work but more efficient because above you filter the same stream 8 times.
Unfortunately there is no tee yet that could split apart a stream. That would be exactly what you need. ( If I understood the question correctly )
Created 04-07-2016 05:16 PM
so the data is in the same stream? I.e. one row will have one format and the second one will have another? If you have 8 Kafka Streams I suppose you wouldn't ask.
In that case you have two chances:
- Make a identify function apply it and then filter the RDD 8 times for each type then each time do the correct parsing and persisting in SQL.
As an illustration
val inputStream ...
inputStream.map(record => ( identifyType(record), record))
type1Stream = inputStream.filter ( record._1 == "type1" );
type2Stream = inputStream.filter ( record._1 == "type2" );
...
type1Stream.map(record => myParse1Function(record._2);
type1Stream.map(persist as my dataframe in table1);
type2Stream.map(record => myParse2Function(record._2);
type2Stream.map(persist as my dataframe in table2);
- Make a identify function apply it and then group by the type somehow, problem is how do you save the grouped by values they will all end up in the same executor I think. Would be a bit more work but more efficient because above you filter the same stream 8 times.
Unfortunately there is no tee yet that could split apart a stream. That would be exactly what you need. ( If I understood the question correctly )
Created 04-07-2016 05:21 PM
yes the data is in the same stream. For example, one string will have 6 columns and the second one will have 8 . thank you I will try this see if it is gonna work.
Created 04-07-2016 06:16 PM
Yeah the first approach is simple and what I did before so I know it works. Scanning the whole data 8 times is a bit wasteful but the operation should be very fast ( you only parse the dataset once and filters are quick) . Groupby might be more efficient for large number of types but you need to somehow implement a file save for an array and he will put everything for one type in memory of one executor I think. So more work and less robust. If you go that second way an article here would be cool.
Created 04-07-2016 07:59 PM
I decided to use multiple streams instead. life is easier that way. thank u
Created 04-07-2016 09:20 PM
If I understand your question correctly, you could try to use a state management function with UpdateStateByKey (http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) where the key is the schema type field (I am assuming this is a String). Create a global map with the schema type field as the key and the corresponding data frame as the value. The function itself would look up the data frame object in the map you created earlier and then operate on that data frame, the data you want to save should now also be passed to the function. The stateful function is typically used to keep a running aggregate. However, because it actually partitions the DStream (I believe by creating separate DStreams) based on the key you provide it should allow you to write generic logic where you lookup the specifics (like target table and columns) at run time. Let me know if that makes sense, I can post some code if not.