Support Questions

Find answers, ask questions, and share your expertise

Kafka Producer is not working

avatar
Contributor
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.Properties;


public class KafkaProducer {
    private static Producer<Integer, String> producer;
    private final Properties properties = new Properties();


    public KafkaProducer() {
        properties.put("metadata.broker.list", "buckland:6667,laverne:6667,mahoney:6667");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
      
        properties.put("security.protocol", "PLAINTEXTSASL");
        properties.put("producer.type", "async");
        
        ProducerConfig config = new ProducerConfig(properties);
        producer = new Producer<Integer, String>(config);
    }


    public static void main(String[] args) {
    	System.out.println("in Kakfa Producer ****");
        new KafkaProducer();
        String topic = "dev.raw.mce.energy.gas";
        String msg = "***** message from KafkaProducer class *****";
        KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, msg);
        producer.send(data);
        producer.close();
        System.out.println(" Kakfa Producer is over ****");
    }
}



 -- From Program Producer
 spark-submit \
--master local[2] \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 2g \
--conf "spark.driver.extraJavaOptions= -Djava.security.auth.login.config=/tmp/rchamaku/kafka/kafka_client_jaas.conf" \
--class KafkaProducer \
--name "Sample KafkaProducer by Ram" \
 /tmp/rchamaku/kafka/TestKafka-0.0.1-SNAPSHOT-driver.jar 

1 ACCEPTED SOLUTION

avatar
Master Guru

See this working example of a Kafka Producer

package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a => {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =>
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception => None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println

View solution in original post

3 REPLIES 3

avatar
Contributor

what exception you are getting?

avatar
Master Guru

Are those broker's active?

Make sure you include the zookeepers

avatar
Master Guru

See this working example of a Kafka Producer

package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a => {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =>
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception => None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println