Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Consuming Avro events from Kafka in Spark structured streaming

avatar

Hi All,

I have designed a Nifi flow to push JSON events serialized in Avro format into Kafka topic, then I am trying to consume it in Spark Structured streaming.

While Kafka part works fine, Spark Structured streaming is not able to read Avro events. It fails with below error.

[Stage 0:>                                                          (0 + 1) / 1]2019-07-19 16:56:57 ERROR Utils:91 - Aborting task
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

Spark code

import org.apache.spark.sql.types.{ StructField, StructType }
import org.apache.spark.sql.types.{ DecimalType, LongType, ByteType, StringType }
import org.apache.spark.sql.types.DataType._
import scala.collection.Seq
import org.apache.spark._
import spark.implicits._
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Path, Paths}

val spark = SparkSession.builder.appName("Spark-Kafka-Integration").master("local").getOrCreate()
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc")))
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic_name").load()
val df1 = df.select(from_avro(col("value"),jsonFormatSchema).as("data")).select("data.*")
df1.writeStream.format("console").option("truncate","false").start()
))

Schema used in Spark

{
 "type": "record",
 "name": "kafka_demo_new",
 "fields": [
  {
   "name": "host",
   "type": "string"
  },
  {
   "name": "event",
   "type": "string"
  },
  {
   "name": "connectiontype",
   "type": "string"
  },
  {
   "name": "user",
   "type": "string"
  },
  {
   "name": "eventtimestamp",
   "type": "string"
  }
 ]
}

Sample topic data in Kafka

{"host":"localhost","event":"Qradar_Demo","connectiontype":"tcp/ip","user":"user","eventtimestamp":"2018-05-24 23:15:07"}


HDP - 3.1.0

Kafka - 2.0.0

Spark - 2.4.0

Any help is appreciated.

4 REPLIES 4

avatar

@Jay Kumar SenSharma can you please help on this ?


avatar
New Contributor

Did you find a way round this? I am facing the same issue and can't find a way to consume the messages in spark from NiFi. TIA

avatar
Community Manager

@sapnakatti, as this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
New Contributor

Thank you @VidyaSargur. Will do. Except for the details like schema this is pretty much my code and issue so was wondering if the OP has found a way around. I'll start a new thread and include the details from my code.