Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Create different schemas at run time for different data frames

Solved Go to solution
Highlighted

Create different schemas at run time for different data frames

Rising Star

In my data I have 8 different schemas. I want to create 8 different data frame for them and save them in 8 different tables in hive. So far I created a super bean class which holds shared attributes and each bean class extends it. Based on the type attribute I created different objects. The problem is I am unable to save them in different data frame. Is there any way I can do that? Here is my code so far, which works fine for one schema.

 xmlData.foreachRDD(
    	      new Function2<JavaRDD<String>, Time, Void>() {
    	        public Void call(JavaRDD<String> rdd, Time time) {
    	          HiveContext hiveContext = JavaHiveContext.getInstance(rdd.context());
    	       // Convert RDD[String] to RDD[case class] to DataFrame
    	          JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
    	            public JavaRow call(String line) throws Exception{
    	            	String[] fields = line.split("\\|");
			//JavaRow is my super class
    	            	JavaRow record = null;
    		 		   if(fields[2].trim().equalsIgnoreCase("CDR")){

    		 			  record = new GPRSClass(fields[0], fields[1]);
    		 		   }

    		 	if(fields[2].trim().equalsIgnoreCase("Activation")){
    	 record = new GbPdpContextActivation(fields[0], fields[1], fields[2], fields[3]); }
    	              return record;}});
    	           DataFrame df;
    	          df = hiveContext.createDataFrame(rowRDD, JavaRow.class);
    	          df.toDF().registerTempTable("Consumer");
    	          System.out.println(df.count()+" ************Record  Recived************");
    	          df = hiveContext.createDataFrame(rowRDD, GPRSClass.class);
     	          hiveContext.sql("CREATE  TABLE if not exists gprs_data ( processor string, fileName string, type string, version string, id string )STORED AS ORC ");
df.save("/apps/hive/warehouse/data", "org.apache.spark.sql.hive.orc",SaveMode.Append);
 }
      return null; } });
1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Create different schemas at run time for different data frames

so the data is in the same stream? I.e. one row will have one format and the second one will have another? If you have 8 Kafka Streams I suppose you wouldn't ask.

In that case you have two chances:

- Make a identify function apply it and then filter the RDD 8 times for each type then each time do the correct parsing and persisting in SQL.

As an illustration

val inputStream ...

inputStream.map(record => ( identifyType(record), record))

type1Stream = inputStream.filter ( record._1 == "type1" );

type2Stream = inputStream.filter ( record._1 == "type2" );

...

type1Stream.map(record => myParse1Function(record._2);

type1Stream.map(persist as my dataframe in table1);

type2Stream.map(record => myParse2Function(record._2);

type2Stream.map(persist as my dataframe in table2);

- Make a identify function apply it and then group by the type somehow, problem is how do you save the grouped by values they will all end up in the same executor I think. Would be a bit more work but more efficient because above you filter the same stream 8 times.

Unfortunately there is no tee yet that could split apart a stream. That would be exactly what you need. ( If I understood the question correctly )

https://issues.apache.org/jira/browse/SPARK-13378

View solution in original post

5 REPLIES 5
Highlighted

Re: Create different schemas at run time for different data frames

so the data is in the same stream? I.e. one row will have one format and the second one will have another? If you have 8 Kafka Streams I suppose you wouldn't ask.

In that case you have two chances:

- Make a identify function apply it and then filter the RDD 8 times for each type then each time do the correct parsing and persisting in SQL.

As an illustration

val inputStream ...

inputStream.map(record => ( identifyType(record), record))

type1Stream = inputStream.filter ( record._1 == "type1" );

type2Stream = inputStream.filter ( record._1 == "type2" );

...

type1Stream.map(record => myParse1Function(record._2);

type1Stream.map(persist as my dataframe in table1);

type2Stream.map(record => myParse2Function(record._2);

type2Stream.map(persist as my dataframe in table2);

- Make a identify function apply it and then group by the type somehow, problem is how do you save the grouped by values they will all end up in the same executor I think. Would be a bit more work but more efficient because above you filter the same stream 8 times.

Unfortunately there is no tee yet that could split apart a stream. That would be exactly what you need. ( If I understood the question correctly )

https://issues.apache.org/jira/browse/SPARK-13378

View solution in original post

Highlighted

Re: Create different schemas at run time for different data frames

Rising Star

yes the data is in the same stream. For example, one string will have 6 columns and the second one will have 8 . thank you I will try this see if it is gonna work.

Highlighted

Re: Create different schemas at run time for different data frames

Yeah the first approach is simple and what I did before so I know it works. Scanning the whole data 8 times is a bit wasteful but the operation should be very fast ( you only parse the dataset once and filters are quick) . Groupby might be more efficient for large number of types but you need to somehow implement a file save for an array and he will put everything for one type in memory of one executor I think. So more work and less robust. If you go that second way an article here would be cool.

Highlighted

Re: Create different schemas at run time for different data frames

Rising Star

I decided to use multiple streams instead. life is easier that way. thank u

Highlighted

Re: Create different schemas at run time for different data frames

Guru

@hoda moradi

If I understand your question correctly, you could try to use a state management function with UpdateStateByKey (http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) where the key is the schema type field (I am assuming this is a String). Create a global map with the schema type field as the key and the corresponding data frame as the value. The function itself would look up the data frame object in the map you created earlier and then operate on that data frame, the data you want to save should now also be passed to the function. The stateful function is typically used to keep a running aggregate. However, because it actually partitions the DStream (I believe by creating separate DStreams) based on the key you provide it should allow you to write generic logic where you lookup the specifics (like target table and columns) at run time. Let me know if that makes sense, I can post some code if not.

Don't have an account?
Coming from Hortonworks? Activate your account here