Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Super Guru

Kafka Producer (Python)

yum install -y python-pip
pip install kafka-python

//kafka producer sample code
vim kafka_producer.py
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['rkk1.hdp.local:6667'])
topic = "kafkatopic"

producer.send(topic, b'test message')

//run it
python kafka_consumer.py 

//test it
[root@rkk1 ~]# /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper `hostname`:2181 --topic kafkatopic
{metadata.broker.list=rkk1.hdp.local:6667,rkk2.hdp.local:6667,rkk3.hdp.local:6667, request.timeout.ms=30000, client.id=console-consumer-41051, security.protocol=PLAINTEXT}
test message

Kafka Producer (Scala)

mkdir kafkaproducerscala
cd kafkaproducerscala/
mkdir -p src/main/scala
cd src/main/scala
vim KafkaProducerScala.scala

object KafkaProducerScala extends App {
 
       import java.util.Properties
 
         import org.apache.kafka.clients.producer._
 
           val  props = new Properties()
             props.put("bootstrap.servers", "rkk1:6667")
               props.put("acks","1")
                 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
                   props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
 
                     val producer = new KafkaProducer[String, String](props)
 
                       val topic="kafkatopic"
 
 
                         for(i<- 1 to 50) {
                                 val record = new ProducerRecord(topic, "key"+i, "value"+i)
                                     producer.send(record)
                                       }
 
                                         producer.close()
 }
 
 cd -
 vim build.sbt
 val kafkaVersion = "0.9.0.0"
 scalaVersion := "2.11.7"
 
 libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafkaVersion
 resolvers += Resolver.mavenLocal
 
 sbt package
 sbt run 
18,056 Views
0 Kudos