Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Unable to read Kafka topic messages

avatar
Contributor

Hi, 

 

I am working on one of the Kafka use-cases in my project, where the scenario is as follows:

Step 1:  I have to update records of a table (lets say CUSTOMER) in web portal 

Step 2:  A Spark Streaming job will be running which will capture the DES (Data Event Streaming) eventId related to the above. 

Step 3:  It will connect to the Broker at port 9092, pull the messages, process them and put them as records in one of RDBMS table. 

 

This entire scenario is being executed in Spark Cluster setup in-side Kubernetes. 

 

The same job is running perfectly fine outside Kubernetes.   But, inside Kubernetes, although the job is able to connect to Broker at port 9092 without any issues, it's not reading any real-time events and I am getting a loop of the below messages:

 

19/12/23 16:00:00.180 DEBUG Fetcher: [Consumer clientId=consumer-1, groupId=test] Added READ_UNCOMMITTED fetch request for partition table-update-0 at offset 102 to node 10.20.0.44:29092 (id: 1 rack: null)
19/12/23 16:00:00.180 DEBUG FetchSessionHandler: [Consumer clientId=consumer-1, groupId=test] Built incremental fetch (sessionId=96537117, epoch=359) for node 1. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
19/12/23 16:00:00.180 DEBUG Fetcher: [Consumer clientId=consumer-1, groupId=test] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(table-update-0)) to broker 10.20.0.44:29092 (id: 1 rack: null)
19/12/23 16:00:00.246 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0
19/12/23 16:00:00.683 DEBUG FetchSessionHandler: [Consumer clientId=consumer-1, groupId=test] Node 1 sent an incremental fetch response for session 96537117 with 0 response partition(s), 1 implied partition(s)

 

I need your help in understanding what could be wrong with the setup in Kubernetes. 

 

Please let me know if any additional information is required. 

 

Thanks and Regards, 

Sudhindra

1 ACCEPTED SOLUTION

avatar
Contributor

Hi All,

 

The issue has got fixed.  It was due to Spark Executor JVM Option being set incorrectly. 

 

Thanks and Regards,

Sudhindra

View solution in original post

3 REPLIES 3

avatar
Rising Star

Hello 

 

Could you please ensure if spark streaming connect to the right Kafka broker host; Check if 10.20.0.44:29092 is the correct IP:port.

 

Also please monitor Kafka broker logs to verify if Spark streaming job is connected to Kafka broker.

avatar
Contributor

Hi @senthh

 

Thanks a lot for your reply.  I have monitored the Spark Streaming logs and verified that the connection to broker is established correctly. 

 

Given below are the logs confirming the same:

 

The interesting thing is the same Spark Streaming job is working outside Kubernetes setup, without any issues. 

 

Please help!!

 

9/12/23 15:56:59.562 INFO AppInfoParser: Kafka version: 2.2.1
19/12/23 15:56:59.563 INFO AppInfoParser: Kafka commitId: 55783d3133a5a49a
19/12/23 15:56:59.566 DEBUG KafkaConsumer: [Consumer clientId=consumer-1, groupId=test] Kafka consumer initialized
19/12/23 15:56:59.569 INFO KafkaConsumer: [Consumer clientId=consumer-1, groupId=test] Subscribed to partition(s): table-update-0
19/12/23 15:56:59.593 DEBUG NetworkClient: [Consumer clientId=consumer-1, groupId=test] Initialize connection to node 10.20.0.44:29092 (id: -1 rack: null) for sending metadata request
19/12/23 15:56:59.596 DEBUG NetworkClient: [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 10.20.0.44:29092 (id: -1 rack: null) using address /10.20.0.44
19/12/23 15:56:59.640 DEBUG Metrics: Added sensor with name node--1.bytes-sent
19/12/23 15:56:59.642 DEBUG Metrics: Added sensor with name node--1.bytes-received
19/12/23 15:56:59.642 DEBUG Metrics: Added sensor with name node--1.latency
19/12/23 15:56:59.643 DEBUG Selector: [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
19/12/23 15:56:59.928 DEBUG NetworkClient: [Consumer clientId=consumer-1, groupId=test] Completed connection to node -1. Fetching API versions.
19/12/23 15:56:59.929 DEBUG NetworkClient: [Consumer clientId=consumer-1, groupId=test] Initiating API versions fetch from node -1.
19/12/23 15:56:59.962 DEBUG NetworkClient: [Consumer clientId=consumer-1, groupId=test] Recorded API versions for node -1: (Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 2 [usable: 2], StopReplica(5): 0 to 1 [usable: 1], UpdateMetadata(6): 0 to 5 [usable: 5], ControlledShutdown(7): 0 to 2 [usable: 2], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11):

avatar
Contributor

Hi All,

 

The issue has got fixed.  It was due to Spark Executor JVM Option being set incorrectly. 

 

Thanks and Regards,

Sudhindra