Member since
06-26-2021
2
Posts
0
Kudos Received
0
Solutions
06-28-2021
05:35 PM
Hi vdadhich, 1. Thanks for your reply. But I still have not the answer about Spark Structure Streaming simple and Kafka to using the avro in the CDH 6.3.2. 2. I saw the version of spark about CDH 6.3.2 is 2.4.0. I remembered that in spark 2.4.0 that should embedded avro. I mean that should used the avro format like the json. I used the to_json and from_json function that be provided by the spark framework,It’s good. But I changed to avro from json, that’s can be analyzing,but the data have some wrong. Could you help us to check the code, or give me an example about Spark structure streaming 2.4.0 version with Kafka using avro ? Thanks.
... View more
06-27-2021
07:01 PM
I had written a program for test working about using AVRO from kafka in the below, send an AVRO data into the Kafka and got the data from Kafka as same as topic. But I got a wrong data format from Kafka, Could you help me to fix it?
import com.sino.data.AppSettings
import com.sksamuel.avro4s.AvroSchema
import org.apache.spark.sql.avro.{from_avro, to_avro}
import org.apache.spark.sql.functions.{col, struct}
case class Person(firstname: String, age: Int)
object Write extends App {
val spark = AppSettings.getSparkSession()
spark.sqlContext.setConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled", "false")
import spark.implicits._
val data = List(Person("77558", 23))
val ds = spark.sparkContext.parallelize(data).toDS
ds.select(to_avro(struct("firstname", "age")) as "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "rep-slave1:9092,rep-slave2:9092,rep-slave3:9092")
.option("topic", "avro_data_topic22")
.save()
}
object Read extends App {
val avroSchema =
s"""
|{"type":"record","name":"Person","fields":[{"name":"firstname","type":"string"},{"name":"age","type":"int"}]}
|""".stripMargin
val spark = AppSettings.getSparkSession()
spark.sqlContext.setConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled", "false")
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "rep-slave1:9092,rep-slave2:9092,rep-slave3:9092")
.option("subscribe", "avro_data_topic22")
.option("startingOffsets", "earliest") // From starting
.load()
.select(from_avro(col("value"), avroSchema) as 'person)
.select("person.*")
.write
.format("console")
.save()
}
Send the data into Kafka:
+----+--------------------+-----------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
|null| [00 06 37 37 38 2E]|avro_data_topic22| 0| 0|2021-06-27 00:33:...| 0|
|null|[00 0A 37 37 35 3...|avro_data_topic22| 0| 1|2021-06-27 00:36:...| 0|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
Get the data from kafka:
+---------+---+
|firstname|age|
+---------+---+
| | 3|
| | 5|
+---------+---+
... View more
Labels: