Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

CDH6.3.2 Spark Structure Streaming With Kafka using Avro to Serialization and deserialization

avatar
New Contributor

I had written a program for test working about using AVRO from kafka in the below, send an AVRO data into the Kafka and got the data from Kafka as same as topic. But I got a wrong data format from Kafka, Could you help me to fix it?

 

 

 

 

import com.sino.data.AppSettings
import com.sksamuel.avro4s.AvroSchema
import org.apache.spark.sql.avro.{from_avro, to_avro}
import org.apache.spark.sql.functions.{col, struct}


case class Person(firstname: String, age: Int)

object Write extends App {

val spark = AppSettings.getSparkSession()
spark.sqlContext.setConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled", "false")

import spark.implicits._

val data = List(Person("77558", 23))

val ds = spark.sparkContext.parallelize(data).toDS
ds.select(to_avro(struct("firstname", "age")) as "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "rep-slave1:9092,rep-slave2:9092,rep-slave3:9092")
.option("topic", "avro_data_topic22")
.save()
}

object Read extends App {
val avroSchema =
s"""
|{"type":"record","name":"Person","fields":[{"name":"firstname","type":"string"},{"name":"age","type":"int"}]}
|""".stripMargin

val spark = AppSettings.getSparkSession()
spark.sqlContext.setConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled", "false")

val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "rep-slave1:9092,rep-slave2:9092,rep-slave3:9092")
.option("subscribe", "avro_data_topic22")
.option("startingOffsets", "earliest") // From starting
.load()
.select(from_avro(col("value"), avroSchema) as 'person)
.select("person.*")
.write
.format("console")
.save()
}

 

 

 

 

 

 

 

Send the data into Kafka:
+----+--------------------+-----------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
|null| [00 06 37 37 38 2E]|avro_data_topic22| 0| 0|2021-06-27 00:33:...| 0|
|null|[00 0A 37 37 35 3...|avro_data_topic22| 0| 1|2021-06-27 00:36:...| 0|
+----+--------------------+-----------------+---------+------+--------------------+-------------+

Get the data from kafka:

+---------+---+
|firstname|age|
+---------+---+
|         | 3|
|         | 5|
+---------+---+

 

 

 

2 REPLIES 2

avatar
Cloudera Employee

Hi @bruce_yang,
Thank you for reaching out. As I see the query you need assistance to read AVRO using Kafka. Cloudera has a way to achieve it and that is very well explained in the thread[1],[2] and [3]. Please let me know if this helped. 


avatar
New Contributor
Hi vdadhich,


1. Thanks for your reply. But I still have not the answer about Spark Structure Streaming simple and Kafka to using the avro in the CDH 6.3.2.

2. I saw the version of spark about CDH 6.3.2 is 2.4.0. I remembered that in spark 2.4.0 that should embedded avro.

I mean that should used the avro format like the json.

I used the to_json and from_json function that be provided by the spark framework,It’s good. But I changed to avro from json, that’s can be analyzing,but the data have some wrong.



Could you help us to check the code, or give me an example about Spark structure streaming 2.4.0 version with Kafka using avro ?

Thanks.