Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka Producer is not working

Solved Go to solution
Highlighted

Kafka Producer is not working

Contributor
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.Properties;


public class KafkaProducer {
    private static Producer<Integer, String> producer;
    private final Properties properties = new Properties();


    public KafkaProducer() {
        properties.put("metadata.broker.list", "buckland:6667,laverne:6667,mahoney:6667");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
      
        properties.put("security.protocol", "PLAINTEXTSASL");
        properties.put("producer.type", "async");
        
        ProducerConfig config = new ProducerConfig(properties);
        producer = new Producer<Integer, String>(config);
    }


    public static void main(String[] args) {
    	System.out.println("in Kakfa Producer ****");
        new KafkaProducer();
        String topic = "dev.raw.mce.energy.gas";
        String msg = "***** message from KafkaProducer class *****";
        KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, msg);
        producer.send(data);
        producer.close();
        System.out.println(" Kakfa Producer is over ****");
    }
}



 -- From Program Producer
 spark-submit \
--master local[2] \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 2g \
--conf "spark.driver.extraJavaOptions= -Djava.security.auth.login.config=/tmp/rchamaku/kafka/kafka_client_jaas.conf" \
--class KafkaProducer \
--name "Sample KafkaProducer by Ram" \
 /tmp/rchamaku/kafka/TestKafka-0.0.1-SNAPSHOT-driver.jar 

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Kafka Producer is not working

Super Guru

See this working example of a Kafka Producer

package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a => {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =>
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception => None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println
3 REPLIES 3

Re: Kafka Producer is not working

New Contributor

what exception you are getting?

Re: Kafka Producer is not working

Super Guru

Are those broker's active?

Make sure you include the zookeepers

Re: Kafka Producer is not working

Super Guru

See this working example of a Kafka Producer

package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a => {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =>
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception => None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println
Don't have an account?
Coming from Hortonworks? Activate your account here