Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Dataframe is not created with Kafka-Spark-Consumer

Dataframe is not created with Kafka-Spark-Consumer

New Contributor

Hi All,

As per my requirement i get a Streaming Data produced as Json.

I have written a scala code which will stream data using Kafka-Spark-Consumer.

When i execute the Code , Consumed Json Data is saved into HDFS.

But my requirement is to save the csv file instead of Json in HDFS.

Version :

Spark - 2.0.0

Scala - 2.11.8

Below is the code i used

Code:

package com.spark.kafka.projects
import java.util.Properties
import java.io.FileInputStream
import java.util.HashMap

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import java.util.Properties
import scala.util.Random
import java.util.Date
import java.text.ParseException
import java.text.SimpleDateFormat
import java.lang.Long


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import com.databricks.spark.csv
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrameReader
object SparkKafkaConsumers {
  def main(args: Array[String]): Unit = {
    val Array(filepath) = args
    val (zkQuorum, group, topics, hdfsurl, numThreads, windowinterval) =
      try {
        val prop = new Properties()
        prop.load(new FileInputStream(filepath))
        (
          prop.getProperty("zkQuorum"),
          prop.getProperty("group"),
          prop.getProperty("topics"),
          prop.getProperty("hdfsurl"),
          new Integer(prop.getProperty("numThreads")),
          new Long(prop.getProperty("windowinterval")))
      } catch {
        case e: Exception =>
          e.printStackTrace()
          println("unable to load the propertifile")
          sys.exit(1)
      }

    val sparkConf = new SparkConf().setAppName("SparkKafkaConsumers").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true");

    val sparkcontext = new SparkContext(sparkConf);
    val sqlcnx = new SQLContext(sparkcontext)
    import sqlcnx.implicits._
    
    val mySchema = StructType(
      StructField("source", IntegerType, true) ::
      StructField("packet_id", IntegerType, true) ::
      StructField("channel", IntegerType, true) ::
      StructField("value", DoubleType, true) ::
      StructField("timestamp", StringType, true) :: Nil)

    val ssc = new StreamingContext(sparkConf, Seconds(windowinterval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    lines.foreachRDD((rdd, time) => {
      val count = rdd.count()
      if (count > 0) {
        // Saving RDD into HDFS as Json 
         // rdd.saveAsTextFile(hdfsurl + time.milliseconds.toString)

         // Reading Json rdd and saving as CSV
           val dataframe = sqlcnx.read.schema(mySchema).json(rdd)
           dataframe.registerTempTable("jsonTable");
         val dataframe11 = sqlcnx.sql("SELECT source,packet_id,channel,value,timestamp FROM jsonTable")
        dataframe1.rdd.saveAsTextFile(hdfsurl + time.milliseconds.toString)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

Also i have changed the above Dataframe code as below:

val dataframe = sqlcnx.read.schema(mySchema).json(rdd)
dataframe.registerTempTable("jsonTable");
val dataframe11 = sqlcnx.sql("SELECT source,packet_id,channel,value,timestamp FROM jsonTable")
dataframe1.write.format("com.databricks.spark.csv").option("header", "true").save(hdfsurl + time.milliseconds.toString)

POM.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>SparkKafkaProject</groupId>
	<artifactId>SparkKafkaProject</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SparkKafkaProject</name>
	<url>http://maven.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.7</maven.compiler.source>
		<maven.compiler.target>1.7</maven.compiler.target>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-hive_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupId>com.databricks</groupId>
			<artifactId>spark-csv_2.10</artifactId>
			<version>1.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version> 0.8.2.0</version>
			<exclusions>
				<exclusion>
					<artifactId>jmxri</artifactId>
					<groupId>com.sun.jmx</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jmxtools</artifactId>
					<groupId>com.sun.jdmk</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jms</artifactId>
					<groupId>javax.jms</groupId>
				</exclusion>
				<exclusion>
					<artifactId>junit</artifactId>
					<groupId>junit</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<sourceDirectory>src</sourceDirectory>
		<!-- <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> 
			<version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> 
			</configuration> </plugin> </plugins> -->
		<plugins>
			<plugin>
				<groupId>net.alchim31.maven</groupId>
				<artifactId>scala-maven-plugin</artifactId>
				<version>3.1.3</version>
				<executions>
					<execution>
						<goals>
							<goal>compile</goal>
							<goal>testCompile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.6</version>
				<configuration>
					<!--get all project dependencies -->
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<!--MainClass in mainfest make a executable jar -->


				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<!--bind to the packaging phase -->
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

Error:

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: org.apache.spark.sql.DataFrameReader.json(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/DataFrame;
        at com.spark.kafka.projects.SparkKafkaConsumers$$anonfun$main$1.apply(SparkKafkaConsumers.scala:93)
        at com.spark.kafka.projects.SparkKafkaConsumers$$anonfun$main$1.apply(SparkKafkaConsumers.scala:82)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Kindly help me how with the code. Please correct me if i have done any mistakes in the code.

Thanks in advance

Vijay

2 REPLIES 2
Highlighted

Re: Dataframe is not created with Kafka-Spark-Consumer

Rising Star

The json method can be used to read from a text file, but it cannot be invoked on a RDD. So you can't do what you are trying to do. You have to parse each JSON string on its own (in a map method for instance).

In order to have a DataFrame, you can convert each JSON string into a case class and then the resultong RDD of case classes can be transformed into a DataFrame via the toDF method.

Re: Dataframe is not created with Kafka-Spark-Consumer

New Contributor

@Marco Gaido

As per the API, Json method can be used for reading RDD as well. So, i tried this approach for other application. It worked for me.

If you can suggest me or correct me in the code it would be help full

Thanks

Don't have an account?
Coming from Hortonworks? Activate your account here