Member since
05-26-2017
12
Posts
2
Kudos Received
0
Solutions
03-28-2018
02:08 PM
@JJ Meyer
... View more
03-23-2018
08:00 PM
Using spark streaming with kafka and creating a direct stream using below code- val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"zookeeper.connect" -> conf.getString("kafka.zookeeper"),
"group.id" -> conf.getString("kafka.consumergroups"),
"auto.offset.reset" -> args { 1 },
"enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"security.protocol" -> "SASL_PLAINTEXT",
"session.timeout.ms" -> args { 2 },
"max.poll.records" -> args { 3 },
"request.timeout.ms" -> args { 4 },
"fetch.max.wait.ms" -> args { 5 })
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
After some processing we commit the offset using commitAsync API. try
{
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
catch
{
case e:Throwable => e.printStackTrace()
}
Below error causes the job to crash- 18/03/20 10:43:30 INFO ConsumerCoordinator: Revoking previously assigned partitions [TOPIC_NAME-3, TOPIC_NAME-5, TOPIC_NAME-4] for group 21_feb_reload_2
18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
18/03/20 10:44:00 INFO AbstractCoordinator: Successfully joined group 21_feb_reload_2 with generation 20714
18/03/20 10:44:00 INFO ConsumerCoordinator: Setting newly assigned partitions [TOPIC_NAME-1, TOPIC_NAME-0, TOPIC_NAME-2] for group 21_feb_reload_2
18/03/20 10:44:00 ERROR JobScheduler: Error generating jobs for time 1521557010000 ms
java.lang.IllegalStateException: No current assignment for partition TOPIC_NAME-4
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
18/03/20 10:44:00 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalStateException: No current assignment for partition
My findings - 1- Similar issue from the post -Kafka Spark Stream throws Exception:No current assignment for partition
This does not give much explanation to why use Assign rather than Subscribe. 2- Tying to make sure there is no re-balancing, I increased the
session.timeout.ms to almost my batch duration as my processing gets
completed in less than 2 min(batch duration). session.timeout.ms-
The amount of time a consumer can be out of contact with the brokers while still considered alive
(https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html) 3- Came across Re-balance Listeners with methods -
a onPartitionsRevoked
b onPartitionsAssigned But was unable to understand how can I use the first one which commits offset just before rebalancing. Any Inputs will be much appreciated
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
10-14-2017
12:37 PM
Hive ctx..sql(s).show was not giving expected output i.e a blank table. Passed values to source and target db through arguments with spark submit. I was able to find the issue-Worked after I Converted dtshipment date and thedate both to timestamp for camparison. While running the query in hive cli it was able to compare dtshipment_date(date type) with thedate(timestamp) and give the records on join but through spark sql it was not giving same output until I converted both to timestamp.
... View more
10-11-2017
12:54 PM
1 Kudo
Below query when executed in Hive CLI produces result but not through spark using Hivecontext. val s = """select cast(date as String) as date_key, field1, field2, field3, field4, COUNT(CASE WHEN m.field5 in ('Core','A') THEN a.strRXNUM END) AS result1, COUNT(CASE WHEN m.field6 in ('Advanced','D') THEN a.strRXNUM END) AS resul2, COUNT(CASE WHEN m.field7 in ('Custom Core','g') THEN a.strRXNUM END) AS result3, COUNT(CASE WHEN m.field8 in ('Unassigned') THEN a.strRXNUM END) AS result4, COUNT(a.strRXNUM) AS result4 from $targetdb.table1 a left join $sourcedb.table2 m ON a.strTHERAPY_TYPE = m.strTherapyType inner join $sourcedb.table3 b on cast(a.dtshipment_date as string)=cast(b.thedate as string) where a.date between '2015-01-01' and date_sub(current_date(),1) group by cast(date as String), field1, field2, field3, field4 """ val df=Hivectx.sql(s)
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Spark
09-28-2017
04:58 AM
1 Kudo
Hello Everyone, We are using scala with maven to build spark applications along with git as code repository and jenkins integrated with git to build the jar. I am not sure how to use jenkins to deploy our apps on cluster. Can anyone explain what could be the next step? Is jenkins supporting deployment of spark apps like it does for other apps. Tha ks
... View more
09-26-2017
06:24 PM
Hi Riccardo, Could you confirm which version HDP you are using? Thanks
... View more
09-25-2017
07:06 PM
We have tested this in HDP 2.6.2 and the issue still exists. Thanks Varun Joshi
... View more
06-20-2017
04:45 PM
I am getting the error regarding specifying connection manager even though I have added the jar file at /usr/hdp/current/sqoop_client/lib Can anyone help? I have used the same command as given above. Error- WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time.
17/06/20 11:28:48 INFO manager.SqlManager: Using default fetchSize of 1000
17/06/20 11:28:48 INFO tool.CodeGenTool: Beginning code generation
17/06/20 11:28:48 ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.RuntimeException: Could not load db driver class: com.microsoft.jdbc.sqlserver.SQLServerDriver
... View more
06-20-2017
01:05 PM
Hi @Rajendra Manjunath, Did it work for you? Please confirm. Thanks
... View more
05-31-2017
09:37 PM
Hello Ayub, Why have you given 'hive' as an example of hadoop user group? Request you to please help me understand how can I run this using Hive as user. Thanks
... View more