Created 07-22-2019 08:10 PM
Hi All,
I have designed a Nifi flow to push JSON events serialized in Avro format into Kafka topic, then I am trying to consume it in Spark Structured streaming.
While Kafka part works fine, Spark Structured streaming is not able to read Avro events. It fails with below error.
[Stage 0:> (0 + 1) / 1]2019-07-19 16:56:57 ERROR Utils:91 - Aborting task org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62 at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
Spark code
import org.apache.spark.sql.types.{ StructField, StructType } import org.apache.spark.sql.types.{ DecimalType, LongType, ByteType, StringType } import org.apache.spark.sql.types.DataType._ import scala.collection.Seq import org.apache.spark._ import spark.implicits._ import org.apache.spark.streaming._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql._ import org.apache.spark.sql.avro._ import java.nio.file.{Files, Path, Paths} val spark = SparkSession.builder.appName("Spark-Kafka-Integration").master("local").getOrCreate() val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc"))) val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic_name").load() val df1 = df.select(from_avro(col("value"),jsonFormatSchema).as("data")).select("data.*") df1.writeStream.format("console").option("truncate","false").start() ))
Schema used in Spark
{ "type": "record", "name": "kafka_demo_new", "fields": [ { "name": "host", "type": "string" }, { "name": "event", "type": "string" }, { "name": "connectiontype", "type": "string" }, { "name": "user", "type": "string" }, { "name": "eventtimestamp", "type": "string" } ] }
Sample topic data in Kafka
{"host":"localhost","event":"Qradar_Demo","connectiontype":"tcp/ip","user":"user","eventtimestamp":"2018-05-24 23:15:07"}
HDP - 3.1.0
Kafka - 2.0.0
Spark - 2.4.0
Any help is appreciated.
Created 07-22-2019 08:10 PM
@Jay Kumar SenSharma can you please help on this ?
Created 01-12-2023 11:42 PM
Did you find a way round this? I am facing the same issue and can't find a way to consume the messages in spark from NiFi. TIA
Created 01-13-2023 12:55 AM
@sapnakatti, as this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post.
Regards,
Vidya Sargur,Created 01-13-2023 01:44 AM
Thank you @VidyaSargur. Will do. Except for the details like schema this is pretty much my code and issue so was wondering if the OP has found a way around. I'll start a new thread and include the details from my code.