Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Kafka Producer is not working

avatar
Contributor
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.Properties;


public class KafkaProducer {
    private static Producer<Integer, String> producer;
    private final Properties properties = new Properties();


    public KafkaProducer() {
        properties.put("metadata.broker.list", "buckland:6667,laverne:6667,mahoney:6667");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
      
        properties.put("security.protocol", "PLAINTEXTSASL");
        properties.put("producer.type", "async");
        
        ProducerConfig config = new ProducerConfig(properties);
        producer = new Producer<Integer, String>(config);
    }


    public static void main(String[] args) {
    	System.out.println("in Kakfa Producer ****");
        new KafkaProducer();
        String topic = "dev.raw.mce.energy.gas";
        String msg = "***** message from KafkaProducer class *****";
        KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, msg);
        producer.send(data);
        producer.close();
        System.out.println(" Kakfa Producer is over ****");
    }
}



 -- From Program Producer
 spark-submit \
--master local[2] \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 2g \
--conf "spark.driver.extraJavaOptions= -Djava.security.auth.login.config=/tmp/rchamaku/kafka/kafka_client_jaas.conf" \
--class KafkaProducer \
--name "Sample KafkaProducer by Ram" \
 /tmp/rchamaku/kafka/TestKafka-0.0.1-SNAPSHOT-driver.jar 

1 ACCEPTED SOLUTION

avatar
Master Guru

See this working example of a Kafka Producer

package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a => {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =>
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception => None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println

View solution in original post

3 REPLIES 3

avatar

what exception you are getting?

avatar
Master Guru

Are those broker's active?

Make sure you include the zookeepers

avatar
Master Guru

See this working example of a Kafka Producer

package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) => {
        val count = rdd.count()
        if (count > 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a => {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =>
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception => None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println