Support Questions
Find answers, ask questions, and share your expertise

Dataframe is not created with Kafka-Spark-Consumer

Hi All,

As per my requirement i get a Streaming Data produced as Json.

I have written a scala code which will stream data using Kafka-Spark-Consumer.

When i execute the Code , Consumed Json Data is saved into HDFS.

But my requirement is to save the csv file instead of Json in HDFS.

Version :

Spark - 2.0.0

Scala - 2.11.8

Below is the code i used

Code:

package com.spark.kafka.projects
import java.util.Properties
import java.io.FileInputStream
import java.util.HashMap

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import java.util.Properties
import scala.util.Random
import java.util.Date
import java.text.ParseException
import java.text.SimpleDateFormat
import java.lang.Long


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import com.databricks.spark.csv
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrameReader
object SparkKafkaConsumers {
  def main(args: Array[String]): Unit = {
    val Array(filepath) = args
    val (zkQuorum, group, topics, hdfsurl, numThreads, windowinterval) =
      try {
        val prop = new Properties()
        prop.load(new FileInputStream(filepath))
        (
          prop.getProperty("zkQuorum"),
          prop.getProperty("group"),
          prop.getProperty("topics"),
          prop.getProperty("hdfsurl"),
          new Integer(prop.getProperty("numThreads")),
          new Long(prop.getProperty("windowinterval")))
      } catch {
        case e: Exception =>
          e.printStackTrace()
          println("unable to load the propertifile")
          sys.exit(1)
      }

    val sparkConf = new SparkConf().setAppName("SparkKafkaConsumers").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true");

    val sparkcontext = new SparkContext(sparkConf);
    val sqlcnx = new SQLContext(sparkcontext)
    import sqlcnx.implicits._
    
    val mySchema = StructType(
      StructField("source", IntegerType, true) ::
      StructField("packet_id", IntegerType, true) ::
      StructField("channel", IntegerType, true) ::
      StructField("value", DoubleType, true) ::
      StructField("timestamp", StringType, true) :: Nil)

    val ssc = new StreamingContext(sparkConf, Seconds(windowinterval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    lines.foreachRDD((rdd, time) => {
      val count = rdd.count()
      if (count > 0) {
        // Saving RDD into HDFS as Json 
         // rdd.saveAsTextFile(hdfsurl + time.milliseconds.toString)

         // Reading Json rdd and saving as CSV
           val dataframe = sqlcnx.read.schema(mySchema).json(rdd)
           dataframe.registerTempTable("jsonTable");
         val dataframe11 = sqlcnx.sql("SELECT source,packet_id,channel,value,timestamp FROM jsonTable")
        dataframe1.rdd.saveAsTextFile(hdfsurl + time.milliseconds.toString)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

Also i have changed the above Dataframe code as below:

val dataframe = sqlcnx.read.schema(mySchema).json(rdd)
dataframe.registerTempTable("jsonTable");
val dataframe11 = sqlcnx.sql("SELECT source,packet_id,channel,value,timestamp FROM jsonTable")
dataframe1.write.format("com.databricks.spark.csv").option("header", "true").save(hdfsurl + time.milliseconds.toString)

POM.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>SparkKafkaProject</groupId>
	<artifactId>SparkKafkaProject</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SparkKafkaProject</name>
	<url>http://maven.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.7</maven.compiler.source>
		<maven.compiler.target>1.7</maven.compiler.target>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-hive_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>com.databricks</groupId>
			<artifactId>spark-csv_2.10</artifactId>
			<version>1.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version> 0.8.2.0</version>
			<exclusions>
				<exclusion>
					<artifactId>jmxri</artifactId>
					<groupId>com.sun.jmx</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jmxtools</artifactId>
					<groupId>com.sun.jdmk</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jms</artifactId>
					<groupId>javax.jms</groupId>
				</exclusion>
				<exclusion>
					<artifactId>junit</artifactId>
					<groupId>junit</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<sourceDirectory>src</sourceDirectory>
		<!-- <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> 
			<version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> 
			</configuration> </plugin> </plugins> -->
		<plugins>
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.1.3</version>
				<executions>
					<execution>
						<goals>
							<goal>compile</goal>
							<goal>testCompile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.6</version>
				<configuration>
					<!--get all project dependencies -->
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<!--MainClass in mainfest make a executable jar -->


				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<!--bind to the packaging phase -->
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

Error:

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: org.apache.spark.sql.DataFrameReader.json(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/DataFrame;
        at com.spark.kafka.projects.SparkKafkaConsumers$$anonfun$main$1.apply(SparkKafkaConsumers.scala:93)
        at com.spark.kafka.projects.SparkKafkaConsumers$$anonfun$main$1.apply(SparkKafkaConsumers.scala:82)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Kindly help me how with the code. Please correct me if i have done any mistakes in the code.

Thanks in advance

Vijay

2 REPLIES 2

Rising Star

The json method can be used to read from a text file, but it cannot be invoked on a RDD. So you can't do what you are trying to do. You have to parse each JSON string on its own (in a map method for instance).

In order to have a DataFrame, you can convert each JSON string into a case class and then the resultong RDD of case classes can be transformed into a DataFrame via the toDF method.

@Marco Gaido

As per the API, Json method can be used for reading RDD as well. So, i tried this approach for other application. It worked for me.

If you can suggest me or correct me in the code it would be help full

Thanks