Member since
02-06-2017
14
Posts
0
Kudos Received
0
Solutions
02-26-2017
02:19 AM
thanks. The application is in Java, so what is going wrong above in the join statement?
... View more
02-25-2017
10:25 PM
I have the following join which is making my spark application hang here and never produces the result. Is OR condition on supported in Spark Dataframes? DataFrame DFJoin = DF1.join(DF2, DF1.col("device").equalTo(DF2.col("id")).or(DF1.col("device").equalTo(DF2.col("new_id"))), "inner");
... View more
Labels:
- Labels:
-
Apache Spark
02-15-2017
10:27 PM
Thanks! Does it then write to the check point dir after parsing every message (i.e after processing each streaming RDD) ? Is it an HDFS path? Will the HDFS hit everytime slow down the process? Also, If there are many kafka consumer groups then will this create a separate checkpoint dir for each consumer group?
... View more
02-09-2017
08:29 PM
I have a Spark consumer that reads messages from Kafka using KafkaUtils.CreateDirectStream() for a set of 100 topics. Then processes the messages and persists in the data lake. However, if my spark consumer suddenly crashes or dies and isn't up for a few hours or a day then all the millions of messages that have come in the duration in the Kafka topics are never picked up when I restart the consumer. How to get older messages without impacting performance?
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
02-09-2017
07:08 PM
Thanks this worked. Somehow a when() function was declared in my class returning null and hence the NPE.
... View more
02-07-2017
10:55 PM
Some more testing showed that if I just add the new col with concat, it concats the values and shows up fine. But on using when() and otherwise() it is throwing an NPE
... View more
02-07-2017
09:51 PM
I am getting a NPE as below df is a dataframe that I have from previous joins with other dataframes
DataFrame df = df1.join(....)
// On doing tis I get the following NPE
df.withColumn("id",
((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
concat(df.col("device"), lit("#"),
df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
The line 235 in ApplDriver is
df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
I have checked some input values of domain and device and they are coming fine. NPE tells me its expecting some object? But cols are there, "id" is anyway a new col so its not defined earlier and df is ofcourse the resulting DF from prev steps..Then what can cause the NPE? Its not going forward because of the NPE.
17/02/07 15:39:18 ERROR ApplicationMaster: User class threw exception: java.lang.NullPointerException
java.lang.NullPointerException
at com.Driver.ApplDriver.lambda$main$1acd672$1(ApplDriver.java:235)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
... View more
02-07-2017
07:55 PM
I have a few dataframes that I have been loaded from Hive tables before creating my JavaDStream. My spark streaming code performs joins on these reference data to come up with the final dataframe to persist in Hive. Every hour the reference data Hive tables are refreshed with new data and reloaded by separate Hive jobs (not the spark job) externally. Hence it mandates that my streaming job should also refresh the dataframes every hour or periodically to get the latest data. As I cannot stop my streaming job and restart to fetch the hive tables again and load into Dataframes, I need a way to refresh these dataframes on the fly. // This is where I get the messages from kafka JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream...... // get all hive tables (runs only once when the spark job is submitted, outside of the streaming loop) DataFrame superbowlDF=hiveContext.table("nfl.superbowl").cache(); DataFrame patriotsDF=hiveContext.table("nfl.patriots").cache(); DataFrame falconsDF=hiveContext.table("nfl.falcons").cache(); // streaming loop - create RDDs for all streaming messages, runs contiunously JavaDStream<team> NFLTeams = messages.map...etc. How can I refresh patriotsDF, falconsDF, superbowlDF etc while the streaming job is still on? Can another process refresh and reload these dataframes and share with this process and new messages can use them to join seamlessly?
... View more
02-07-2017
07:38 PM
Labels:
- Labels:
-
Apache Spark