Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Error while sending data from spark to kafka

Error while sending data from spark to kafka

Explorer

Hello,

I am using spark 3.0.0

I am trying to send processed log data from spark to a kafka topic.

but i keep getting this exception

ERROR MicroBatchExecution: Query [id = 7de19d2f-2480-4c70-8bbc-2db8ae0d66df, runId = 5d22ca35-eec3-4c36-a8bb-a57fa9335d56] terminated with error
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:59)
at org.apache.spark.sql.kafka010.KafkaStreamingWrite.<init>(KafkaStreamingWrite.scala:42)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable$$anon$2.buildForStreaming(KafkaSourceProvider.scala:414)
at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:620)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Required attribute 'value' not found;

 These are my source codes,

structuresStreaming.scala

package com.spark.sparkstreaming

import java.sql.Timestamp
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import java.util.regex.Pattern
import java.util.regex.Matcher
import java.text.SimpleDateFormat
import java.util.Locale

import Utilities._

object structuredStreaming {

case class LogEntry(ip:String, client:String, user:String, dateTime:Timestamp, request:String, status:String, bytes:String, referer:String, agent:String)

val logPattern = apacheLogPattern()
val datePattern = Pattern.compile("\\[(.*?) .+]")

def parseDateField(field: String): Option[Timestamp] = {

val dateMatcher = datePattern.matcher(field)
if (dateMatcher.find) {
val dateString = dateMatcher.group(1)
val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
val date = (dateFormat.parse(dateString))
val timestamp = new java.sql.Timestamp(date.getTime());
return Option(timestamp)
} else {
None
}
}

def parseLog(x:Row) : Option[LogEntry] = {

val matcher:Matcher = logPattern.matcher(x.getString(0));
if (matcher.matches()) {
val timeString = matcher.group(4)
return Some(LogEntry(
matcher.group(1),
matcher.group(2),
matcher.group(3),
parseDateField(matcher.group(4)).getOrElse(null),
matcher.group(5),
matcher.group(6),
matcher.group(7),
matcher.group(8),
matcher.group(9)
))
} else {
return None
}
}

def main(args: Array[String]) {

val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.config("spark.sql.streaming.checkpointLocation", "/home/UDHAV.MAHATA/Documents/Checkpoints")
.getOrCreate()

setupLogging()

val rawData = spark.readStream.text("/home/UDHAV.MAHATA/Documents/Spark/logs")

import spark.implicits._

val structuredData = rawData.flatMap(parseLog).select("status", "dateTime")
val windowed = structuredData.withWatermark("dateTime", "10 minutes")
.groupBy($"status", window($"dateTime", "1 hour"))
.count()
//val query = windowed.writeStream.outputMode("complete").format("console").start()
val query = windowed
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.start()
query.awaitTermination()
spark.stop()
}

}

 Utilities.scala

package com.spark.sparkstreaming

import org.apache.log4j.Level
import java.util.regex.Pattern
import java.util.regex.Matcher

object Utilities {
def setupLogging() = {
import org.apache.log4j.{Level, Logger}
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
}

def setupTwitter() = {
import scala.io.Source

for (line <- Source.fromFile("/home/UDHAV.MAHATA/Downloads/SparkStreaming/twitter.txt").getLines) {
val fields = line.split(" ")
if (fields.length == 2) {
System.setProperty("twitter4j.oauth." + fields(0), fields(1))
}
}
}

def apacheLogPattern():Pattern = {
val ddd = "\\d{1,3}"
val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
val client = "(\\S+)"
val user = "(\\S+)"
val dateTime = "(\\[.+?\\])"
val request = "\"(.*?)\""
val status = "(\\d{3})"
val bytes = "(\\S+)"
val referer = "\"(.*?)\""
val agent = "\"(.*?)\""
val regex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
Pattern.compile(regex)
}
}

Can anyone tell me what does Required attribute 'value' not found and how can i resolve this issue? 

Thank you

Don't have an account?
Coming from Hortonworks? Activate your account here