Created 12-29-2016 11:26 AM
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer { private static Producer<Integer, String> producer; private final Properties properties = new Properties(); public KafkaProducer() { properties.put("metadata.broker.list", "buckland:6667,laverne:6667,mahoney:6667"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.required.acks", "1"); properties.put("security.protocol", "PLAINTEXTSASL"); properties.put("producer.type", "async"); ProducerConfig config = new ProducerConfig(properties); producer = new Producer<Integer, String>(config); } public static void main(String[] args) { System.out.println("in Kakfa Producer ****"); new KafkaProducer(); String topic = "dev.raw.mce.energy.gas"; String msg = "***** message from KafkaProducer class *****"; KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, msg); producer.send(data); producer.close(); System.out.println(" Kakfa Producer is over ****"); } } -- From Program Producer spark-submit \ --master local[2] \ --num-executors 2 \ --driver-memory 1g \ --executor-memory 2g \ --conf "spark.driver.extraJavaOptions= -Djava.security.auth.login.config=/tmp/rchamaku/kafka/kafka_client_jaas.conf" \ --class KafkaProducer \ --name "Sample KafkaProducer by Ram" \ /tmp/rchamaku/kafka/TestKafka-0.0.1-SNAPSHOT-driver.jar
Created 12-29-2016 08:17 PM
See this working example of a Kafka Producer
package com.dataflowdeveloper.kafka import java.io.ByteArrayOutputStream import java.util.HashMap import com.google.gson.Gson import org.apache.avro.SchemaBuilder import org.apache.avro.io.EncoderFactory import org.apache.avro.specific.SpecificDatumWriter import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.{Seconds, StreamingContext} import java.io.{IOException, File, ByteArrayOutputStream} import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.avro.SchemaBuilder import org.apache.avro.Schema /** * Created by timothyspann on 4/4/16. */ object TwitterKafkaProducer { private var gson = new Gson() def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR) val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator") // build a Tweet schema val schema = SchemaBuilder .record("tweet") .fields .name("tweet").`type`().stringType().noDefault() .name("timestamp").`type`().longType().noDefault() .endRecord val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP") // Set the system properties so that Twitter4j library used by twitter stream // can use them to generat OAuth credentials System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer") sparkConf.set("spark.cores.max", "24") sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.app.id", "TwitterKafkaProducer") sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "true") sparkConf.set("spark.streaming.backpressure.enabled", "true") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_)) try { stream.foreachRDD((rdd, time) => { val count = rdd.count() if (count > 0) { val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, Array[Byte]](props) val topList = rdd.collect() topList.foreach(a => { val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema) .set("tweet", a) .set("timestamp", time.milliseconds) .build) val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets) producer.send(message) println("Sent %s".format(a.substring(0, 512))) }) } }) } catch { case e: Exception => println("Twitter. Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } ssc.start() ssc.awaitTermination() } // convert to avro byte array def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = { val out = new ByteArrayOutputStream() try { val encoder = EncoderFactory.get.binaryEncoder(out, null) val writer = new GenericDatumWriter[GenericRecord](schema) writer.write(tweet, encoder) encoder.flush out.close } catch { case e: Exception => None; } out.toByteArray } } // scalastyle:on println
Created 12-29-2016 11:38 AM
what exception you are getting?
Created 12-29-2016 08:13 PM
Are those broker's active?
Make sure you include the zookeepers
Created 12-29-2016 08:17 PM
See this working example of a Kafka Producer
package com.dataflowdeveloper.kafka import java.io.ByteArrayOutputStream import java.util.HashMap import com.google.gson.Gson import org.apache.avro.SchemaBuilder import org.apache.avro.io.EncoderFactory import org.apache.avro.specific.SpecificDatumWriter import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.{Seconds, StreamingContext} import java.io.{IOException, File, ByteArrayOutputStream} import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.avro.SchemaBuilder import org.apache.avro.Schema /** * Created by timothyspann on 4/4/16. */ object TwitterKafkaProducer { private var gson = new Gson() def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR) val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator") // build a Tweet schema val schema = SchemaBuilder .record("tweet") .fields .name("tweet").`type`().stringType().noDefault() .name("timestamp").`type`().longType().noDefault() .endRecord val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP") // Set the system properties so that Twitter4j library used by twitter stream // can use them to generat OAuth credentials System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer") sparkConf.set("spark.cores.max", "24") sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.app.id", "TwitterKafkaProducer") sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "true") sparkConf.set("spark.streaming.backpressure.enabled", "true") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_)) try { stream.foreachRDD((rdd, time) => { val count = rdd.count() if (count > 0) { val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, Array[Byte]](props) val topList = rdd.collect() topList.foreach(a => { val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema) .set("tweet", a) .set("timestamp", time.milliseconds) .build) val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets) producer.send(message) println("Sent %s".format(a.substring(0, 512))) }) } }) } catch { case e: Exception => println("Twitter. Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } ssc.start() ssc.awaitTermination() } // convert to avro byte array def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = { val out = new ByteArrayOutputStream() try { val encoder = EncoderFactory.get.binaryEncoder(out, null) val writer = new GenericDatumWriter[GenericRecord](schema) writer.write(tweet, encoder) encoder.flush out.close } catch { case e: Exception => None; } out.toByteArray } } // scalastyle:on println