Support Questions

Find answers, ask questions, and share your expertise

Create different schemas at run time for different data frames

avatar
Expert Contributor

In my data I have 8 different schemas. I want to create 8 different data frame for them and save them in 8 different tables in hive. So far I created a super bean class which holds shared attributes and each bean class extends it. Based on the type attribute I created different objects. The problem is I am unable to save them in different data frame. Is there any way I can do that? Here is my code so far, which works fine for one schema.

 xmlData.foreachRDD(
    	      new Function2<JavaRDD<String>, Time, Void>() {
    	        public Void call(JavaRDD<String> rdd, Time time) {
    	          HiveContext hiveContext = JavaHiveContext.getInstance(rdd.context());
    	       // Convert RDD[String] to RDD[case class] to DataFrame
    	          JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
    	            public JavaRow call(String line) throws Exception{
    	            	String[] fields = line.split("\\|");
			//JavaRow is my super class
    	            	JavaRow record = null;
    		 		   if(fields[2].trim().equalsIgnoreCase("CDR")){

    		 			  record = new GPRSClass(fields[0], fields[1]);
    		 		   }

    		 	if(fields[2].trim().equalsIgnoreCase("Activation")){
    	 record = new GbPdpContextActivation(fields[0], fields[1], fields[2], fields[3]); }
    	              return record;}});
    	           DataFrame df;
    	          df = hiveContext.createDataFrame(rowRDD, JavaRow.class);
    	          df.toDF().registerTempTable("Consumer");
    	          System.out.println(df.count()+" ************Record  Recived************");
    	          df = hiveContext.createDataFrame(rowRDD, GPRSClass.class);
     	          hiveContext.sql("CREATE  TABLE if not exists gprs_data ( processor string, fileName string, type string, version string, id string )STORED AS ORC ");
df.save("/apps/hive/warehouse/data", "org.apache.spark.sql.hive.orc",SaveMode.Append);
 }
      return null; } });
1 ACCEPTED SOLUTION

avatar
Master Guru

so the data is in the same stream? I.e. one row will have one format and the second one will have another? If you have 8 Kafka Streams I suppose you wouldn't ask.

In that case you have two chances:

- Make a identify function apply it and then filter the RDD 8 times for each type then each time do the correct parsing and persisting in SQL.

As an illustration

val inputStream ...

inputStream.map(record => ( identifyType(record), record))

type1Stream = inputStream.filter ( record._1 == "type1" );

type2Stream = inputStream.filter ( record._1 == "type2" );

...

type1Stream.map(record => myParse1Function(record._2);

type1Stream.map(persist as my dataframe in table1);

type2Stream.map(record => myParse2Function(record._2);

type2Stream.map(persist as my dataframe in table2);

- Make a identify function apply it and then group by the type somehow, problem is how do you save the grouped by values they will all end up in the same executor I think. Would be a bit more work but more efficient because above you filter the same stream 8 times.

Unfortunately there is no tee yet that could split apart a stream. That would be exactly what you need. ( If I understood the question correctly )

https://issues.apache.org/jira/browse/SPARK-13378

View solution in original post

5 REPLIES 5

avatar
Master Guru

so the data is in the same stream? I.e. one row will have one format and the second one will have another? If you have 8 Kafka Streams I suppose you wouldn't ask.

In that case you have two chances:

- Make a identify function apply it and then filter the RDD 8 times for each type then each time do the correct parsing and persisting in SQL.

As an illustration

val inputStream ...

inputStream.map(record => ( identifyType(record), record))

type1Stream = inputStream.filter ( record._1 == "type1" );

type2Stream = inputStream.filter ( record._1 == "type2" );

...

type1Stream.map(record => myParse1Function(record._2);

type1Stream.map(persist as my dataframe in table1);

type2Stream.map(record => myParse2Function(record._2);

type2Stream.map(persist as my dataframe in table2);

- Make a identify function apply it and then group by the type somehow, problem is how do you save the grouped by values they will all end up in the same executor I think. Would be a bit more work but more efficient because above you filter the same stream 8 times.

Unfortunately there is no tee yet that could split apart a stream. That would be exactly what you need. ( If I understood the question correctly )

https://issues.apache.org/jira/browse/SPARK-13378

avatar
Expert Contributor

yes the data is in the same stream. For example, one string will have 6 columns and the second one will have 8 . thank you I will try this see if it is gonna work.

avatar
Master Guru

Yeah the first approach is simple and what I did before so I know it works. Scanning the whole data 8 times is a bit wasteful but the operation should be very fast ( you only parse the dataset once and filters are quick) . Groupby might be more efficient for large number of types but you need to somehow implement a file save for an array and he will put everything for one type in memory of one executor I think. So more work and less robust. If you go that second way an article here would be cool.

avatar
Expert Contributor

I decided to use multiple streams instead. life is easier that way. thank u

avatar
Guru

@hoda moradi

If I understand your question correctly, you could try to use a state management function with UpdateStateByKey (http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) where the key is the schema type field (I am assuming this is a String). Create a global map with the schema type field as the key and the corresponding data frame as the value. The function itself would look up the data frame object in the map you created earlier and then operate on that data frame, the data you want to save should now also be passed to the function. The stateful function is typically used to keep a running aggregate. However, because it actually partitions the DStream (I believe by creating separate DStreams) based on the key you provide it should allow you to write generic logic where you lookup the specifics (like target table and columns) at run time. Let me know if that makes sense, I can post some code if not.