Created on 05-13-2016 07:50 PM
Below is a short Scala program and SBT build script to generate a Spark application to submit to YARN that will run on the HDP Sandbox with Spark 1.6.
We consume KAFKA messages in microbatches of 2 seconds. We set a number of parameters for performance of the SparkSQL write to HDFS including enabling Tungsten, using Snappy compression, Backpressure and the KryoSerializer.
We setup the Spark Context and the Spark Streaming Context, then use the KafkaUtils to prepare a Stream of generic Avro data records from a byte array. From there we then convert to a Scala Case Class that models the Twitter tweet from our source. We convert to a DataFrame, register it as a Temp Table and then save to ORC File, AVRO, Parquet and JSON.
To Build this Application:
sbt clean assembly
Create Kafka Topic
cd /usr/hdp/current/kafka-broker bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meetup
To Prepare Hadoop Environment
# Create HDFS Directories (also avro, orc, json, ... ) hdfs dfs -mkdir /parquetresults hdfs dfs -chmod -R 777 /parquetresults hdfs dfs -ls /
To Deploy to YARN
# Check YARN yarn application --list # If need be # yarn application --kill <id from list> spark-submit --class "com.sparkdeveloper.receiver.KafkaConsumer" \ --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --conf spark.ui.port=4244 kafkaconsumer.jar
Spark Scala Program
package com.sparkdeveloper.receiver import java.io.ByteArrayOutputStream import java.util.HashMap import org.apache.avro.SchemaBuilder import org.apache.avro.io.EncoderFactory import org.apache.avro.specific.SpecificDatumWriter import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import kafka.serializer._ import org.apache.spark.streaming.{Seconds, StreamingContext} import java.io.{ByteArrayOutputStream, File, IOException} import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.avro.io._ import org.apache.avro.SchemaBuilder import org.apache.avro.Schema import org.apache.avro._ import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import com.databricks.spark.avro._ import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature case class HashtagEntities(text: String, start: Double, end: Double) case class User(id: Double, name: String, screenName: String, location: String, description: String, url: String, statusesCount: Double) case class Tweet(text: String, createdAt: String, lang: String, source: String, expandedURL: String, url: String, screenName: String, description: String, name: String, retweetCount: Double, timestamp: Long, favoriteCount: Double, user: Option[User], hashtags: HashtagEntities) /** * Created by timothyspann */ object KafkaConsumer { val tweetSchema = SchemaBuilder .record("tweet") .fields .name("tweet").`type`().stringType().noDefault() .name("timestamp").`type`().longType().noDefault() .endRecord def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR) val logger: Logger = Logger.getLogger("com.sparkdeveloper.receiver.KafkaConsumer") val sparkConf = new SparkConf().setAppName("Avro to Kafka Consumer") sparkConf.set("spark.cores.max", "24") // For my sandbox sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.app.id", "KafkaConsumer") // want to know your app in the UI sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "true") sparkConf.set("spark.streaming.backpressure.enabled", "true") sparkConf.set("spark.sql.parquet.compression.codec", "snappy") sparkConf.set("spark.sql.parquet.mergeSchema", "true") sparkConf.set("spark.sql.parquet.binaryAsString", "true") val sc = new SparkContext(sparkConf) sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") val ssc = new StreamingContext(sc, Seconds(2)) try { val kafkaConf = Map( "metadata.broker.list" -> "sandbox.hortonworks.com:6667", "zookeeper.connect" -> "sandbox.hortonworks.com:2181", // Default zookeeper location "group.id" -> "KafkaConsumer", "zookeeper.connection.timeout.ms" -> "1000") val topicMaps = Map("meetup" -> 1) // Create a new stream which can decode byte arrays. val tweets = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder] (ssc, kafkaConf,topicMaps, StorageLevel.MEMORY_ONLY_SER) try { tweets.foreachRDD((rdd, time) => { if (rdd != null) { try { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rdd2 = rdd.map { case (k, v) => parseAVROToString(v) } try { val result = rdd2.mapPartitions(records => { val mapper = new ObjectMapper() mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) mapper.registerModule(DefaultScalaModule) records.flatMap(record => { try { Some(mapper.readValue(record, classOf[Tweet])) } catch { case e: Exception => None; } }) }, true) val df1 = result.toDF() logger.error("Registered tweets: " + df1.count()) df1.registerTempTable("tweets") // To show how easy it is to write multiple formats df1.write.format("orc").mode(org.apache.spark.sql.SaveMode.Append).orc("orcresults") df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).avro("avroresults") df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("parquetresults") df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults") } catch { case e: Exception => None; } } catch { case e: Exception => None; } } }) } catch { case e: Exception => println("Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } } catch { case e: Exception => println("Kafka Stream. Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } ssc.start() ssc.awaitTermination() } def parseAVROToString(rawTweet: Array[Byte]): String = { try { if (rawTweet.isEmpty) { println("Rejected Tweet") "Empty" } else { deserializeTwitter(rawTweet).get("tweet").toString } } catch { case e: Exception => println("Exception:" + e.getMessage); "Empty" } } def deserializeTwitter(tweet: Array[Byte]): GenericRecord = { try { val reader = new GenericDatumReader[GenericRecord](tweetSchema) val decoder = DecoderFactory.get.binaryDecoder(tweet, null) reader.read(null, decoder) } catch { case e: Exception => None; null; } } } // scalastyle:on println build.sbt name := "KafkaConsumer" version := "1.0" scalaVersion := "2.10.6" jarName in assembly := "kafkaconsumer.jar" libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided" libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1" libraryDependencies += "org.apache.avro" % "avro" % "1.7.6" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0" libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13" libraryDependencies += "com.google.code.gson" % "gson" % "2.3" mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "log4j.properties" => MergeStrategy.discard case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first }
Created on 05-18-2016 12:31 PM
Thanks Timothy for this great article
Are you aware of any method to go directly from the RDD of KafkaAvroDecoder output objects to a Spark Dataframe or Dataset specifying an Avro read schema?
Stefano
Created on 06-10-2016 03:17 PM
It looks like you have to wait for Spark 2.0 with structured streaming
Created on 07-14-2016 11:18 AM
Hi,
i'm writing code on zepplin, could you please tell how can I see the output or the final string that has been converted from my AVRO message?? Also,I am not able to see file in any format in my hdfs, what is the location where df1 is stored?
thanks a lot for this code!!!!
Created on 07-14-2016 11:50 AM
where are these dataframes saved!!
Created on 08-23-2016 03:54 PM
once you do a write (df1.write.format("orc").mode(org.) it will save it to an HDFS or local directory depending if you have hadoop enabled. it will create it under the running user like /root/<your name> or under a spark user directory if that was created. it doesn't hurt to put in a full path for the write like /mystuff/awesome.
To read an AVRO file, use the avro tools see here: https://github.com/airisdata/avroparquet
Created on 08-23-2016 03:54 PM
put in a full path, for me running in hadoop it is saved to HDFS, see below.
if you are running standalone not compiled with hadoop, it will store to a local file system, probably /<YOURCURRENTUSER>/something or /tmp check the spark history UI
Created on 10-16-2016 10:40 PM
Hi Timothy thanks for this detailed article , we have a avro schema which is very long (116 lines) so using schema builder to build the entire schema may not be best option in our case, Could you please guide us on how can i approach this our aim is to read avro messages from kafka and convert them to json and write to a datasource also i posted the question for the same https://community.hortonworks.com/questions/61827/read-avro-messages-consumed-using-spark-streaming....
Created on 05-10-2017 09:10 AM
Thanks for the very useful article. I am getting the below when trying to compile.
constructor cannot be instantiated to expected type; found : (T1, T2) required: org.apache.kafka.clients.consumer.ConsumerRecord[String,Array[Byte]] [ERROR] val rdd2 = rdd.map { case (k, v) => parseAVROToString(v) }
Did anybody face this issue? Thanks.