<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Kafka Producer is not working in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103793#M50394</link>
    <description>&lt;P&gt;See this working example of a Kafka Producer&lt;/P&gt;&lt;PRE&gt;package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) =&amp;gt; {
        val count = rdd.count()
        if (count &amp;gt; 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a =&amp;gt; {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =&amp;gt;
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception =&amp;gt; None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println&lt;/PRE&gt;</description>
    <pubDate>Fri, 30 Dec 2016 04:17:52 GMT</pubDate>
    <dc:creator>TimothySpann</dc:creator>
    <dc:date>2016-12-30T04:17:52Z</dc:date>
    <item>
      <title>Kafka Producer is not working</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103790#M50391</link>
      <description>&lt;PRE&gt;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.Properties;


public class KafkaProducer {
    private static Producer&amp;lt;Integer, String&amp;gt; producer;
    private final Properties properties = new Properties();


    public KafkaProducer() {
        properties.put("metadata.broker.list", "buckland:6667,laverne:6667,mahoney:6667");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
      
        properties.put("security.protocol", "PLAINTEXTSASL");
        properties.put("producer.type", "async");
        
        ProducerConfig config = new ProducerConfig(properties);
        producer = new Producer&amp;lt;Integer, String&amp;gt;(config);
    }


    public static void main(String[] args) {
    	System.out.println("in Kakfa Producer ****");
        new KafkaProducer();
        String topic = "dev.raw.mce.energy.gas";
        String msg = "***** message from KafkaProducer class *****";
        KeyedMessage&amp;lt;Integer, String&amp;gt; data = new KeyedMessage&amp;lt;&amp;gt;(topic, msg);
        producer.send(data);
        producer.close();
        System.out.println(" Kakfa Producer is over ****");
    }
}



 -- From Program Producer
 spark-submit \
--master local[2] \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 2g \
--conf "spark.driver.extraJavaOptions= -Djava.security.auth.login.config=/tmp/rchamaku/kafka/kafka_client_jaas.conf" \
--class KafkaProducer \
--name "Sample KafkaProducer by Ram" \
 /tmp/rchamaku/kafka/TestKafka-0.0.1-SNAPSHOT-driver.jar 

&lt;/PRE&gt;</description>
      <pubDate>Thu, 29 Dec 2016 19:26:17 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103790#M50391</guid>
      <dc:creator>rambabuch</dc:creator>
      <dc:date>2016-12-29T19:26:17Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Producer is not working</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103791#M50392</link>
      <description>&lt;P&gt;what exception you are getting?&lt;/P&gt;</description>
      <pubDate>Thu, 29 Dec 2016 19:38:03 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103791#M50392</guid>
      <dc:creator>sanjeev_verma82</dc:creator>
      <dc:date>2016-12-29T19:38:03Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Producer is not working</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103792#M50393</link>
      <description>&lt;P&gt;Are those broker's active?&lt;/P&gt;&lt;P&gt;Make sure you include the zookeepers&lt;/P&gt;</description>
      <pubDate>Fri, 30 Dec 2016 04:13:13 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103792#M50393</guid>
      <dc:creator>TimothySpann</dc:creator>
      <dc:date>2016-12-30T04:13:13Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Producer is not working</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103793#M50394</link>
      <description>&lt;P&gt;See this working example of a Kafka Producer&lt;/P&gt;&lt;PRE&gt;package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
  * Created by timothyspann on 4/4/16.
  */
object TwitterKafkaProducer {
  private var gson = new Gson()
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
    // build a Tweet schema
    val schema = SchemaBuilder
      .record("tweet")
      .fields
      .name("tweet").`type`().stringType().noDefault()
      .name("timestamp").`type`().longType().noDefault()
      .endRecord
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
    sparkConf.set("spark.cores.max", "24") 
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "TwitterKafkaProducer")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
    try {
      stream.foreachRDD((rdd, time) =&amp;gt; {
        val count = rdd.count()
        if (count &amp;gt; 0) {
          val props = new HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, Array[Byte]](props)
          val topList = rdd.collect()
          topList.foreach(a =&amp;gt; {
            val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
              .set("tweet", a)
              .set("timestamp", time.milliseconds)
              .build)
            val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
            producer.send(message)
            println("Sent %s".format(a.substring(0, 512)))
          })
        }
      })
    } catch {
      case e: Exception =&amp;gt;
        println("Twitter. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  // convert to avro byte array
  def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    try {
      val encoder = EncoderFactory.get.binaryEncoder(out, null)
      val writer = new GenericDatumWriter[GenericRecord](schema)
      writer.write(tweet, encoder)
      encoder.flush
      out.close
    } catch {
        case e: Exception =&amp;gt; None;
      }
      out.toByteArray
    }
  }
// scalastyle:on println&lt;/PRE&gt;</description>
      <pubDate>Fri, 30 Dec 2016 04:17:52 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-is-not-working/m-p/103793#M50394</guid>
      <dc:creator>TimothySpann</dc:creator>
      <dc:date>2016-12-30T04:17:52Z</dc:date>
    </item>
  </channel>
</rss>

