Created on 12-23-2016 06:56 PM
Kafka Producer (Python)
yum install -y python-pip pip install kafka-python //kafka producer sample code vim kafka_producer.py from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['rkk1.hdp.local:6667']) topic = "kafkatopic" producer.send(topic, b'test message') //run it python kafka_consumer.py //test it [root@rkk1 ~]# /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper `hostname`:2181 --topic kafkatopic {metadata.broker.list=rkk1.hdp.local:6667,rkk2.hdp.local:6667,rkk3.hdp.local:6667, request.timeout.ms=30000, client.id=console-consumer-41051, security.protocol=PLAINTEXT} test message
Kafka Producer (Scala)
mkdir kafkaproducerscala cd kafkaproducerscala/ mkdir -p src/main/scala cd src/main/scala vim KafkaProducerScala.scala object KafkaProducerScala extends App { import java.util.Properties import org.apache.kafka.clients.producer._ val props = new Properties() props.put("bootstrap.servers", "rkk1:6667") props.put("acks","1") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val topic="kafkatopic" for(i<- 1 to 50) { val record = new ProducerRecord(topic, "key"+i, "value"+i) producer.send(record) } producer.close() } cd - vim build.sbt val kafkaVersion = "0.9.0.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafkaVersion resolvers += Resolver.mavenLocal sbt package sbt run