Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Structured Streaming with Kafka in Python

Highlighted

Spark Structured Streaming with Kafka in Python

New Contributor

I'm trying to use Spark Structured Streaming to read data from Kafka topic and print batches in console. ( CDH 6.3.2 with Spark 2.4.0, Kafka 2.2.1 , pyspark 2.4.0).

But nothing happens, except that i actually was able to subscribe to my topic.

What could be the reason behind that?

 

Spark code i the following:

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('firstStream').getOrCreate()

df = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers','myserver:9092')\
.option('subscribe','kafka')\
.load()\
.writeStream\
.format('console')\
.start()\
.awaitTermination()

 

the spark submit launch log:

root@test-bigdata:~# spark-submit --master local[*] --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 /root/PycharmProjects/BigData101/sparkWordCount.py

------------------------------------------------------------
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d37513fc-467d-44ca-be0a-a167a291fd3b;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.11;2.4.0 in central
found org.apache.kafka#kafka-clients;2.0.0 in central
found org.lz4#lz4-java;1.4.0 in central
found org.xerial.snappy#snappy-java;1.1.7.1 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 487ms :: artifacts dl 12ms
:: modules in use:
org.apache.kafka#kafka-clients;2.0.0 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.11;2.4.0 from central in [default]
org.lz4#lz4-java;1.4.0 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.1 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 6 | 0 | 0 | 0 || 6 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-d37513fc-467d-44ca-be0a-a167a291fd3b
confs: [default]
0 artifacts copied, 6 already retrieved (0kB/12ms)
20/09/01 13:14:34 INFO spark.SparkContext: Running Spark version 2.4.0-cdh6.3.2
20/09/01 13:14:34 INFO logging.DriverLogger: Added a local log appender at: /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/driver_logs/driver.log
20/09/01 13:14:34 INFO spark.SparkContext: Submitted application: firstStream
20/09/01 13:14:34 INFO spark.SecurityManager: Changing view acls to: root
20/09/01 13:14:34 INFO spark.SecurityManager: Changing modify acls to: root
20/09/01 13:14:34 INFO spark.SecurityManager: Changing view acls groups to:
20/09/01 13:14:34 INFO spark.SecurityManager: Changing modify acls groups to:
20/09/01 13:14:34 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
20/09/01 13:14:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 36047.
20/09/01 13:14:34 INFO spark.SparkEnv: Registering MapOutputTracker
20/09/01 13:14:34 INFO spark.SparkEnv: Registering BlockManagerMaster
20/09/01 13:14:34 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/09/01 13:14:34 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/09/01 13:14:34 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-b633f90b-ef14-48ad-8072-e07dd5a1658c
20/09/01 13:14:35 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
20/09/01 13:14:35 INFO spark.SparkEnv: Registering OutputCommitCoordinator
20/09/01 13:14:35 INFO util.log: Logging initialized @3912ms
20/09/01 13:14:35 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-09-04T21:11:46Z, git hash: 3ce520221d0240229c862b122d2b06c12a625732
20/09/01 13:14:35 INFO server.Server: Started @4056ms
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4049. Attempting port 4050.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4050. Attempting port 4051.
20/09/01 13:14:35 WARN util.Utils: Service 'SparkUI' could not bind on port 4051. Attempting port 4052.
20/09/01 13:14:35 INFO server.AbstractConnector: Started ServerConnector@72ad3c01{HTTP/1.1,[http/1.1]}{0.0.0.0:4052}
20/09/01 13:14:35 INFO util.Utils: Successfully started service 'SparkUI' on port 4052.
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@52c0d39c{/jobs,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@127ebf07{/jobs/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@32bac42a{/jobs/job,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5ce3cf61{/jobs/job/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3b82d57d{/stages,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5ffdddb3{/stages/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@68296d71{/stages/stage,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@36567bc4{/stages/stage/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1537ca9a{/stages/pool,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2689fa41{/stages/pool/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1836b192{/storage,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1dbb710b{/storage/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@36f31b3c{/storage/rdd,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5f41a451{/storage/rdd/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@72d6340a{/environment,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@34a32072{/environment/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1aa7f721{/executors,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4896bb08{/executors/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b23a074{/executors/threadDump,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26aea1d2{/executors/threadDump/json,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@39554d9{/static,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7502dff1{/,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4fc5bdcc{/api,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74c54bc8{/jobs/job/kill,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7dd14bbe{/stages/stage/kill,null,AVAILABLE,@Spark}
20/09/01 13:14:35 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://myserver:4052
20/09/01 13:14:35 INFO spark.SparkContext: Added JAR file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar at spark://myserver:36047/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar with timestamp 1598966075528
20/09/01 13:14:35 INFO spark.SparkContext: Added JAR file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar at spark://myserver:36047/jars/org.apache.kafka_kafka-clients-2.0.0.jar with timestamp 1598966075529
20/09/01 13:14:35 INFO spark.SparkContext: Added JAR file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at spark://myserver:36047/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1598966075529
20/09/01 13:14:35 INFO spark.SparkContext: Added JAR file:///root/.ivy2/jars/org.lz4_lz4-java-1.4.0.jar at spark://myserver:36047/jars/org.lz4_lz4-java-1.4.0.jar with timestamp 1598966075529
20/09/01 13:14:35 INFO spark.SparkContext: Added JAR file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.7.1.jar at spark://myserver:36047/jars/org.xerial.snappy_snappy-java-1.1.7.1.jar with timestamp 1598966075529
20/09/01 13:14:35 INFO spark.SparkContext: Added JAR file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar at spark://myserver:36047/jars/org.slf4j_slf4j-api-1.7.16.jar with timestamp 1598966075530
20/09/01 13:14:35 INFO spark.SparkContext: Added file file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar at file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar with timestamp 1598966075550
20/09/01 13:14:35 INFO util.Utils: Copying /root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar to /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/userFiles-63081020-ede2-4e3d-88f0-645538db27f0/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar
20/09/01 13:14:35 INFO spark.SparkContext: Added file file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar at file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar with timestamp 1598966075586
20/09/01 13:14:35 INFO util.Utils: Copying /root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar to /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/userFiles-63081020-ede2-4e3d-88f0-645538db27f0/org.apache.kafka_kafka-clients-2.0.0.jar
20/09/01 13:14:35 INFO spark.SparkContext: Added file file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1598966075607
20/09/01 13:14:35 INFO util.Utils: Copying /root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar to /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/userFiles-63081020-ede2-4e3d-88f0-645538db27f0/org.spark-project.spark_unused-1.0.0.jar
20/09/01 13:14:35 INFO spark.SparkContext: Added file file:///root/.ivy2/jars/org.lz4_lz4-java-1.4.0.jar at file:///root/.ivy2/jars/org.lz4_lz4-java-1.4.0.jar with timestamp 1598966075624
20/09/01 13:14:35 INFO util.Utils: Copying /root/.ivy2/jars/org.lz4_lz4-java-1.4.0.jar to /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/userFiles-63081020-ede2-4e3d-88f0-645538db27f0/org.lz4_lz4-java-1.4.0.jar
20/09/01 13:14:35 INFO spark.SparkContext: Added file file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.7.1.jar at file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.7.1.jar with timestamp 1598966075640
20/09/01 13:14:35 INFO util.Utils: Copying /root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.7.1.jar to /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/userFiles-63081020-ede2-4e3d-88f0-645538db27f0/org.xerial.snappy_snappy-java-1.1.7.1.jar
20/09/01 13:14:35 INFO spark.SparkContext: Added file file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar at file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar with timestamp 1598966075678
20/09/01 13:14:35 INFO util.Utils: Copying /root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar to /tmp/spark-64215e57-c5bd-4b09-b1f1-88f9006dd2bd/userFiles-63081020-ede2-4e3d-88f0-645538db27f0/org.slf4j_slf4j-api-1.7.16.jar
20/09/01 13:14:35 INFO executor.Executor: Starting executor ID driver on host localhost
20/09/01 13:14:35 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45833.
20/09/01 13:14:35 INFO netty.NettyBlockTransferService: Server created on myserver:45833
20/09/01 13:14:35 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/09/01 13:14:36 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, myserver, 45833, None)
20/09/01 13:14:36 INFO storage.BlockManagerMasterEndpoint: Registering block manager myserver:45833 with 366.3 MB RAM, BlockManagerId(driver, myserver, 45833, None)
20/09/01 13:14:36 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, myserver, 45833, None)
20/09/01 13:14:36 INFO storage.BlockManager: external shuffle service port = 7337
20/09/01 13:14:36 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, myserver, 45833, None)
20/09/01 13:14:36 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2e9a7f14{/metrics/json,null,AVAILABLE,@Spark}
20/09/01 13:14:37 INFO scheduler.EventLoggingListener: Logging events to hdfs://myserver:8020/user/spark/applicationHistory/local-1598966075812
20/09/01 13:14:37 WARN lineage.LineageWriter: Lineage directory /var/log/spark/lineage doesn't exist or is not writable. Lineage for this application will be disabled.
20/09/01 13:14:37 INFO util.Utils: Extension com.cloudera.spark.lineage.NavigatorAppListener not being initialized.
20/09/01 13:14:37 INFO logging.DriverLogger$DfsAsyncWriter: Started driver log file sync to: /user/spark/driverLogs/local-1598966075812_driver.log
20/09/01 13:14:38 INFO internal.SharedState: loading hive config file: file:/etc/hive/conf.cloudera.hive/hive-site.xml
20/09/01 13:14:38 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
20/09/01 13:14:38 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
20/09/01 13:14:38 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@45fefd84{/SQL,null,AVAILABLE,@Spark}
20/09/01 13:14:38 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3a2c8b1e{/SQL/json,null,AVAILABLE,@Spark}
20/09/01 13:14:38 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5286f5bd{/SQL/execution,null,AVAILABLE,@Spark}
20/09/01 13:14:38 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@111885ea{/SQL/execution/json,null,AVAILABLE,@Spark}
20/09/01 13:14:38 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7323bf28{/static/sql,null,AVAILABLE,@Spark}
20/09/01 13:14:39 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/09/01 13:14:39 WARN lineage.LineageWriter: Lineage directory /var/log/spark/lineage doesn't exist or is not writable. Lineage for this application will be disabled.
20/09/01 13:14:39 INFO util.Utils: Extension com.cloudera.spark.lineage.NavigatorQueryListener not being initialized.
20/09/01 13:14:41 INFO streaming.MicroBatchExecution: Starting [id = 8d82dd12-27cd-4067-9d10-be0a70e865e4, runId = afb8d34f-9ee2-4c00-b472-81dbee88b5cb]. Use hdfs://myserver:8020/tmp/temporary-1676448a-8106-4ddd-9f80-00a18716364d to store the query checkpoint.
20/09/01 13:14:41 INFO streaming.MicroBatchExecution: Using MicroBatchReader [KafkaV2[Subscribe[kafka]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@12227294]
20/09/01 13:14:41 INFO streaming.MicroBatchExecution: Starting new streaming query.
20/09/01 13:14:41 INFO streaming.MicroBatchExecution: Stream started from {}
20/09/01 13:14:41 INFO consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [myserver:9092]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-d1351e6a-0ac2-4791-a9d8-c188f5d4a16e--1565512910-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

20/09/01 13:14:41 INFO utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.2
20/09/01 13:14:41 INFO utils.AppInfoParser: Kafka commitId: null
20/09/01 13:14:41 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-1, groupId=spark-kafka-source-d1351e6a-0ac2-4791-a9d8-c188f5d4a16e--1565512910-driver-0] Subscribed to topic(s): kafka
20/09/01 13:14:41 INFO clients.Metadata: Cluster ID: gH1lODLfSna0P3kYaXOfDA

Don't have an account?
Coming from Hortonworks? Activate your account here