Support Questions
Find answers, ask questions, and share your expertise

sending simple message to kafka is not getting through

sending simple message to kafka is not getting through

Explorer

Hi have simple kafka producer program in scala that randomly create click events and and send them to kafka topic. here is the complete code. check that in loop there is one second delay. but when i run this program the loop runs VERY VERY slow. check that when a message is send then a Click message is shown, But a single Click takes around 5 minutes. Also kafka-console-consumer.sh running in other window does not show any activity even when Click are being produced slowly (3-4 minutes i guess for each click in loop) in first window. NO IDEA what is happening.

Note i have tested kafka working and can send and receive messages to topics (hortonworks sandbox)

Note i am producing AVRO records in each click. this code is tested and running as i have tested records are being encoded and decoded. just the code in WHILE LOOP is running very very slowly and also there is no activity in consumer.

Please suggest what should i look into or configure

object StreamingAvroProducer extends App {


	
	
	val brokers = "localhost:6667" 
	val topic = "testing"
	
	
    val random = new Random()


	
  val props = new util.HashMap[String, Object]()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer") 		// Kafka avro message stream comes in as a byte array
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")


  val producer = new KafkaProducer[String, Array[Byte]](props)


  while(true) 
  {
		val clickBytes = serializeClickEvent(newRandomClickEvent) // Avro schema serialization as a byte array
		val message = new ProducerRecord[String, Array[Byte]](topic, null, clickBytes) // Create a new producer record to send the message in
		producer.send(message)
		
		println("Click!")		
		Thread.sleep(1000)
  }
  
	
	
	
	
  // Generate a random click event
  def newRandomClickEvent: ClickEvent = {
    val userId = Random.nextInt(5) 
    val time = Random.nextInt(5).toString() 
	val action = Random.nextInt(5).toString()  
	val destination = Random.nextInt(5).toString()  
	val hotel = Random.nextInt(5).toString()  
    new ClickEvent(userId, time, action, destination, hotel)
  }	
  
  


  def serializeClickEvent(clickEvent: ClickEvent): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get.binaryEncoder(out, null)
    val writer = new SpecificDatumWriter[ClickEvent](ClickEvent.getClassSchema)


    writer.write(clickEvent, encoder)
    encoder.flush
    out.close
    out.toByteArray
  } 
  
  




}  


2 REPLIES 2
Highlighted

Re: sending simple message to kafka is not getting through

Mentor

@Shahzad Aslam what was your solution?

Highlighted

Re: sending simple message to kafka is not getting through

New Contributor

Hi,

Iam having same issue using Java. More than 30secs to send single message and Kafka consumer running on separate window does not show the message.

I am using Kafka 0.10 version. Have you figured out the solution? Any inputs truly appreciated.