Created on 06-27-2021 07:01 PM - edited 09-16-2022 07:42 AM
I had written a program for test working about using AVRO from kafka in the below, send an AVRO data into the Kafka and got the data from Kafka as same as topic. But I got a wrong data format from Kafka, Could you help me to fix it?
import com.sino.data.AppSettings
import com.sksamuel.avro4s.AvroSchema
import org.apache.spark.sql.avro.{from_avro, to_avro}
import org.apache.spark.sql.functions.{col, struct}
case class Person(firstname: String, age: Int)
object Write extends App {
val spark = AppSettings.getSparkSession()
spark.sqlContext.setConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled", "false")
import spark.implicits._
val data = List(Person("77558", 23))
val ds = spark.sparkContext.parallelize(data).toDS
ds.select(to_avro(struct("firstname", "age")) as "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "rep-slave1:9092,rep-slave2:9092,rep-slave3:9092")
.option("topic", "avro_data_topic22")
.save()
}
object Read extends App {
val avroSchema =
s"""
|{"type":"record","name":"Person","fields":[{"name":"firstname","type":"string"},{"name":"age","type":"int"}]}
|""".stripMargin
val spark = AppSettings.getSparkSession()
spark.sqlContext.setConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled", "false")
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "rep-slave1:9092,rep-slave2:9092,rep-slave3:9092")
.option("subscribe", "avro_data_topic22")
.option("startingOffsets", "earliest") // From starting
.load()
.select(from_avro(col("value"), avroSchema) as 'person)
.select("person.*")
.write
.format("console")
.save()
}
Send the data into Kafka:
+----+--------------------+-----------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
|null| [00 06 37 37 38 2E]|avro_data_topic22| 0| 0|2021-06-27 00:33:...| 0|
|null|[00 0A 37 37 35 3...|avro_data_topic22| 0| 1|2021-06-27 00:36:...| 0|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
Get the data from kafka:
+---------+---+
|firstname|age|
+---------+---+
| | 3|
| | 5|
+---------+---+
Created 06-28-2021 08:06 AM
Hi @bruce_yang,
Thank you for reaching out. As I see the query you need assistance to read AVRO using Kafka. Cloudera has a way to achieve it and that is very well explained in the thread[1],[2] and [3]. Please let me know if this helped.
Created 06-28-2021 05:35 PM