Created 12-29-2016 11:26 AM
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer {
private static Producer<Integer, String> producer;
private final Properties properties = new Properties();
public KafkaProducer() {
properties.put("metadata.broker.list", "buckland:6667,laverne:6667,mahoney:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
properties.put("security.protocol", "PLAINTEXTSASL");
properties.put("producer.type", "async");
ProducerConfig config = new ProducerConfig(properties);
producer = new Producer<Integer, String>(config);
}
public static void main(String[] args) {
System.out.println("in Kakfa Producer ****");
new KafkaProducer();
String topic = "dev.raw.mce.energy.gas";
String msg = "***** message from KafkaProducer class *****";
KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, msg);
producer.send(data);
producer.close();
System.out.println(" Kakfa Producer is over ****");
}
}
-- From Program Producer
spark-submit \
--master local[2] \
--num-executors 2 \
--driver-memory 1g \
--executor-memory 2g \
--conf "spark.driver.extraJavaOptions= -Djava.security.auth.login.config=/tmp/rchamaku/kafka/kafka_client_jaas.conf" \
--class KafkaProducer \
--name "Sample KafkaProducer by Ram" \
/tmp/rchamaku/kafka/TestKafka-0.0.1-SNAPSHOT-driver.jar
Created 12-29-2016 08:17 PM
See this working example of a Kafka Producer
package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
* Created by timothyspann on 4/4/16.
*/
object TwitterKafkaProducer {
private var gson = new Gson()
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
// build a Tweet schema
val schema = SchemaBuilder
.record("tweet")
.fields
.name("tweet").`type`().stringType().noDefault()
.name("timestamp").`type`().longType().noDefault()
.endRecord
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
sparkConf.set("spark.cores.max", "24")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "TwitterKafkaProducer")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
try {
stream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, Array[Byte]](props)
val topList = rdd.collect()
topList.foreach(a => {
val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
.set("tweet", a)
.set("timestamp", time.milliseconds)
.build)
val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
producer.send(message)
println("Sent %s".format(a.substring(0, 512)))
})
}
})
} catch {
case e: Exception =>
println("Twitter. Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
ssc.start()
ssc.awaitTermination()
}
// convert to avro byte array
def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
val out = new ByteArrayOutputStream()
try {
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val writer = new GenericDatumWriter[GenericRecord](schema)
writer.write(tweet, encoder)
encoder.flush
out.close
} catch {
case e: Exception => None;
}
out.toByteArray
}
}
// scalastyle:on println
Created 12-29-2016 11:38 AM
what exception you are getting?
Created 12-29-2016 08:13 PM
Are those broker's active?
Make sure you include the zookeepers
Created 12-29-2016 08:17 PM
See this working example of a Kafka Producer
package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
* Created by timothyspann on 4/4/16.
*/
object TwitterKafkaProducer {
private var gson = new Gson()
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
// build a Tweet schema
val schema = SchemaBuilder
.record("tweet")
.fields
.name("tweet").`type`().stringType().noDefault()
.name("timestamp").`type`().longType().noDefault()
.endRecord
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
sparkConf.set("spark.cores.max", "24")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "TwitterKafkaProducer")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
try {
stream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, Array[Byte]](props)
val topList = rdd.collect()
topList.foreach(a => {
val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
.set("tweet", a)
.set("timestamp", time.milliseconds)
.build)
val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
producer.send(message)
println("Sent %s".format(a.substring(0, 512)))
})
}
})
} catch {
case e: Exception =>
println("Twitter. Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
ssc.start()
ssc.awaitTermination()
}
// convert to avro byte array
def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
val out = new ByteArrayOutputStream()
try {
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val writer = new GenericDatumWriter[GenericRecord](schema)
writer.write(tweet, encoder)
encoder.flush
out.close
} catch {
case e: Exception => None;
}
out.toByteArray
}
}
// scalastyle:on println