Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark Streaming job not reading data from Kafka

avatar
New Contributor

Hi, 

 

I am trying to run spark streaming job on cloudera v5.14.4 which will be reading data from kafka v0.10. I am using following version of spark streaming dependencies 

 

spark-streaming_2.11:2.3.0
spark-streaming-kafka-0-10_2.11:2.3.0

 

The job works fine on local but on cloudera it is not starting up. There is no streaming tab on AM and stages/storage tabs are blank. 

somant_0-1655367352711.png

 

There are no errors in the logs. Any suggestion what might be wrong here?

 

5 REPLIES 5

avatar
Cloudera Employee

Hii somant,

 

there are some information that need to be provided in order to drive the investigation:

  1. From where are you launching the job? E.g. from a gateway of your CDH cluster?
  2. Can you please share your spark-submit command?
  3. You are saying the job is not starting up, do you have any log (Spark driver logs, YARN logs)?

Thanks

avatar
New Contributor

Hi @amallegni ,

 

-I am launching the jobs from one of the cluster nodes

-SPARK_KAFKA_VERSION=0.10 spark2-submit --master yarn --deploy-mode cluster --name appName --keytab "****.keytab" --principal "****" --num-executors 3 --driver-memory 4g --executor-memory 3g --executor-cores 3 --files "/path/to/hive-site.xml,/path/to/jaas.conf#jaas.conf" --conf "spark.streaming.unpersist=true" --conf "spark.executor.userClassPathFirst=true" --conf "spark.streaming.kafka.maxRatePerPartition=10" --conf "spark.streaming.kafka.maxRetries=5" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=./jaas.conf" --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=./jaas.conf" --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures=24 --conf spark.yarn.executor.failuresValidityInterval=1h --properties-file /path/to/app.properties

-I am not finding anything in logs. 

 

The only similar issue I found out was this one https://stackoverflow.com/questions/44454520/spark-streaming-is-not-streaming-but-waits-showing-cons.... I also have a kafka installation version 0.9 and my job needs 0.10 I'm wondering if there is any way that the spark job doesn't depend on the CDH provided libraries and only use packaged dependencies.

avatar
Cloudera Employee

Hi @somant ,

 

regarding this " I'm wondering if there is any way that the spark job doesn't depend on the CDH provided libraries and only use packaged dependencies", it depends on the library you are referring to.

If you are referring to kafka libraries, they shouldn't be loaded by spark classpath by default, hence you can prepare a fat jar including the kafka dependencies choosing the version you need.

Moreover, did you try by specifying less options at launch time? 
E.g. I would start by removing the usage of G1GC and other advanced options, monitoring the behaviour one step at a time.

avatar
New Contributor

Hi @amallegni 

Thanks for replying. I created a simple word count job and I'm running it using this command 

SPARK_KAFKA_VERSION=0.10 /opt/cloudera/parcels/SPARK2/bin/spark2-submit --num-executors 2 --master local[4] --driver-java-options "-Djava.security.auth.login.config=/apps/raiot/dev/confluent/config/jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/apps/raiot/dev/confluent/config/jaas.conf" spark-dstream-secure-kafka-app-1.0-SNAPSHOT-jar-with-dependencies.jar

 

Here are the logs

22/08/02 00:53:18 INFO spark.SparkContext: Running Spark version 2.3.0.cloudera3
22/08/02 00:53:18 INFO spark.SparkContext: Submitted application: DirectKafkaWordCount
22/08/02 00:53:18 INFO spark.SecurityManager: Changing view acls to:
22/08/02 00:53:18 INFO spark.SecurityManager: Changing modify acls to:
22/08/02 00:53:18 INFO spark.SecurityManager: Changing view acls groups to:
22/08/02 00:53:18 INFO spark.SecurityManager: Changing modify acls groups to:
22/08/02 00:53:18 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(user, loaddev); groups with view permissions: Set(); users with modify permissions: Set(skumar116, loaddev); groups with modify permissions: Set()
22/08/02 00:53:18 INFO util.Utils: Successfully started service 'sparkDriver' on port 33638.
22/08/02 00:53:19 INFO spark.SparkEnv: Registering MapOutputTracker
22/08/02 00:53:19 INFO spark.SparkEnv: Registering BlockManagerMaster
22/08/02 00:53:19 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/08/02 00:53:19 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/08/02 00:53:19 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-1811ccb1-0aa0-489c-aec0-e771d23f4a88
22/08/02 00:53:19 INFO memory.MemoryStore: MemoryStore started with capacity 12.3 GB
22/08/02 00:53:19 INFO spark.SparkEnv: Registering OutputCommitCoordinator
22/08/02 00:53:19 INFO util.log: Logging initialized @2986ms
22/08/02 00:53:19 INFO server.Server: jetty-9.3.z-SNAPSHOT
22/08/02 00:53:19 INFO server.Server: Started @3102ms
22/08/02 00:53:19 INFO server.AbstractConnector: Started ServerConnector@1804f60d{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
22/08/02 00:53:19 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@512d92b{/jobs,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3c321bdb{/jobs/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3abd581e{/jobs/job,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6f0628de{/jobs/job/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1e392345{/stages,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4ced35ed{/stages/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7bd69e82{/stages/stage,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@27dc79f7{/stages/stage/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3aaf4f07{/stages/pool,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@18e8473e{/stages/pool/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a38ba58{/storage,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6058e535{/storage/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1deb2c43{/storage/rdd,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1cefc4b3{/storage/rdd/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6f6a7463{/environment,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@79f227a9{/environment/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@50d68830{/executors,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7674a051{/executors/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6754ef00{/executors/threadDump,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@323e8306{/executors/threadDump/json,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4acf72b6{/static,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7dd712e8{/,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@22ee2d0{/api,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4b770e40{/jobs/job/kill,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@54a3ab8f{/stages/stage/kill,null,AVAILABLE,@Spark}
22/08/02 00:53:19 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hostname:4040
22/08/02 00:53:19 INFO spark.SparkContext: Added JAR file:/apps/raiot/dev/confluent/build/spark-dstream-secure-kafka-app-1.0-SNAPSHOT-jar-with-dependencies.jar at spark://hostname:33638/jars/spark-dstream-secure-kafka-app-1.0-SNAPSHOT-jar-with-dependencies.jar with timestamp 1659401599561
22/08/02 00:53:19 INFO executor.Executor: Starting executor ID driver on host localhost
22/08/02 00:53:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43751.
22/08/02 00:53:19 INFO netty.NettyBlockTransferService: Server created on hostname:43751
22/08/02 00:53:19 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/08/02 00:53:19 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hostname, 43751, None)
22/08/02 00:53:19 INFO storage.BlockManagerMasterEndpoint: Registering block manager hostname:43751 with 12.3 GB RAM, BlockManagerId(driver, hostname, 43751, None)
22/08/02 00:53:19 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hostname, 43751, None)
22/08/02 00:53:19 INFO storage.BlockManager: external shuffle service port = 7337
22/08/02 00:53:19 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, hostname, 43751, None)
22/08/02 00:53:19 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@340b7ef6{/metrics/json,null,AVAILABLE,@Spark}
22/08/02 00:53:21 INFO scheduler.EventLoggingListener: Logging events to hdfs://nameservice1/user/spark/spark2ApplicationHistory/local-1659401599608
22/08/02 00:53:21 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.NavigatorAppListener
22/08/02 00:53:22 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
22/08/02 00:53:22 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
22/08/02 00:53:22 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-db_c1_parser_pap_dev
22/08/02 00:53:22 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
22/08/02 00:53:22 INFO kafka010.DirectKafkaInputDStream: Slide time = 20000 ms
22/08/02 00:53:22 INFO kafka010.DirectKafkaInputDStream: Storage level = Serialized 1x Replicated
22/08/02 00:53:22 INFO kafka010.DirectKafkaInputDStream: Checkpoint interval = null
22/08/02 00:53:22 INFO kafka010.DirectKafkaInputDStream: Remember interval = 20000 ms
22/08/02 00:53:22 INFO kafka010.DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@19661827
22/08/02 00:53:22 INFO dstream.MappedDStream: Slide time = 20000 ms
22/08/02 00:53:22 INFO dstream.MappedDStream: Storage level = Serialized 1x Replicated
22/08/02 00:53:22 INFO dstream.MappedDStream: Checkpoint interval = null
22/08/02 00:53:22 INFO dstream.MappedDStream: Remember interval = 20000 ms
22/08/02 00:53:22 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@40afb4d2
22/08/02 00:53:22 INFO dstream.FlatMappedDStream: Slide time = 20000 ms
22/08/02 00:53:22 INFO dstream.FlatMappedDStream: Storage level = Serialized 1x Replicated
22/08/02 00:53:22 INFO dstream.FlatMappedDStream: Checkpoint interval = null
22/08/02 00:53:22 INFO dstream.FlatMappedDStream: Remember interval = 20000 ms
22/08/02 00:53:22 INFO dstream.FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@41a0f437
22/08/02 00:53:22 INFO dstream.MappedDStream: Slide time = 20000 ms
22/08/02 00:53:22 INFO dstream.MappedDStream: Storage level = Serialized 1x Replicated
22/08/02 00:53:22 INFO dstream.MappedDStream: Checkpoint interval = null
22/08/02 00:53:22 INFO dstream.MappedDStream: Remember interval = 20000 ms
22/08/02 00:53:22 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@63e6273b
22/08/02 00:53:22 INFO dstream.ShuffledDStream: Slide time = 20000 ms
22/08/02 00:53:22 INFO dstream.ShuffledDStream: Storage level = Serialized 1x Replicated
22/08/02 00:53:22 INFO dstream.ShuffledDStream: Checkpoint interval = null
22/08/02 00:53:22 INFO dstream.ShuffledDStream: Remember interval = 20000 ms
22/08/02 00:53:22 INFO dstream.ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@4a68ac40
22/08/02 00:53:22 INFO dstream.ForEachDStream: Slide time = 20000 ms
22/08/02 00:53:22 INFO dstream.ForEachDStream: Storage level = Serialized 1x Replicated
22/08/02 00:53:22 INFO dstream.ForEachDStream: Checkpoint interval = null
22/08/02 00:53:22 INFO dstream.ForEachDStream: Remember interval = 20000 ms
22/08/02 00:53:22 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@77dc88fb
22/08/02 00:53:22 INFO consumer.ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [bootstrap-servers]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = PLAIN
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = db_c1_parser_pap_dev
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_SSL
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

22/08/02 00:53:22 INFO authenticator.AbstractLogin: Successfully logged in.
22/08/02 00:53:22 INFO consumer.ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [bootstrap-servers]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = PLAIN
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = db_c1_parser_pap_dev
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_SSL
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

22/08/02 00:53:22 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
22/08/02 00:53:22 INFO utils.AppInfoParser: Kafka commitId : unknown

 

I'm running it on one of the cluster nodes. The job gets stuck at this point. Although when I'm running it from local system it is working fine. 

 

spark-submit2 --num-executors 2 --master local[4] --driver-java-options "-Djava.security.auth.login.config=C:\\Projects\\LocalJobs\\jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=C:\\Projects\\LocalJobs\\jaas.conf" spark-dstream-secure-kafka-app-1.0-SNAPSHOT-jar-with-dependencies.jar

 

22/08/02 06:35:09 INFO AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
22/08/02 06:35:09 INFO AppInfoParser: Kafka commitId : unknown
22/08/02 06:35:09 INFO CachedKafkaConsumer: Initial fetch for spark-executor-group_id Test101 1 1405900
22/08/02 06:35:11 INFO AbstractCoordinator: Discovered coordinator broker:9092 (id: 2147483643 rack: null) for group spark-executor-group_id.
22/08/02 06:35:13 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1036 bytes result sent to driver
22/08/02 06:35:13 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7722 bytes)
22/08/02 06:35:13 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
22/08/02 06:35:13 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 3330 ms on localhost (executor driver) (2/3)
22/08/02 06:35:13 INFO KafkaRDD: Beginning offset 1403127 is the same as ending offset skipping Test101 0
22/08/02 06:35:13 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 864 bytes result sent to driver
22/08/02 06:35:13 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 11 ms on localhost (executor driver) (3/3)
22/08/02 06:35:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
22/08/02 06:35:13 INFO DAGScheduler: ShuffleMapStage 0 (map at DirectKafkaWordCount.scala:63) finished in 3.517 s
22/08/02 06:35:13 INFO DAGScheduler: looking for newly runnable stages
22/08/02 06:35:13 INFO DAGScheduler: running: Set()
22/08/02 06:35:13 INFO DAGScheduler: waiting: Set(ResultStage 1)
22/08/02 06:35:13 INFO DAGScheduler: failed: Set()
22/08/02 06:35:13 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at DirectKafkaWordCount.scala:63), which has no missing parents
22/08/02 06:35:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.8 KB, free 1944.9 MB)
22/08/02 06:35:13 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1732.0 B, free 1944.9 MB)
22/08/02 06:35:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on LAPTOP-TFO6GTNG.bbrouter:61776 (size: 1732.0 B, free: 1944.9 MB)
22/08/02 06:35:13 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
22/08/02 06:35:13 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at DirectKafkaWordCount.scala:63) (first 15 tasks are for partitions Vector(0))
22/08/02 06:35:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
22/08/02 06:35:13 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, executor driver, partition 0, ANY, 7649 bytes)
22/08/02 06:35:13 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
22/08/02 06:35:13 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
22/08/02 06:35:13 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
22/08/02 06:35:13 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1420 bytes result sent to driver
22/08/02 06:35:13 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 31 ms on localhost (executor driver) (1/1)
22/08/02 06:35:13 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
22/08/02 06:35:13 INFO DAGScheduler: ResultStage 1 (print at DirectKafkaWordCount.scala:64) finished in 0.038 s
22/08/02 06:35:13 INFO DAGScheduler: Job 0 finished: print at DirectKafkaWordCount.scala:64, took 3.615580 s
22/08/02 06:35:13 INFO JobScheduler: Finished job streaming job 1659402300000 ms.0 from job set of time 1659402300000 ms
22/08/02 06:35:13 INFO JobScheduler: Total delay: 13.190 s for time 1659402300000 ms (execution: 3.643 s)
22/08/02 06:35:13 INFO ReceivedBlockTracker: Deleting batches:
22/08/02 06:35:13 INFO InputInfoTracker: remove old batch metadata:
-------------------------------------------
Time: 1659402300000 ms

 

I'm not sure what might be the issue when I'm trying to run it on cloudera nodes. Any help is appreciated.

 

avatar
Super Collaborator

Hi @somant 

 

Please don't use open source libraries and use cluster-supported spark/kafka versions. 

 

Check the following example code:

 

https://community.cloudera.com/t5/Community-Articles/Running-DirectKafkaWordCount-example-in-CDP/ta-...