Reply
Explorer
Posts: 33
Registered: ‎03-08-2016
Accepted Solution

How to use saveAsTextFiles in spark streaming

Hi,

It is simple to display the result in RDD, for example:

val sc = new SparkContext(conf)
val textFile = sc.textFile("/root/file/test")
val apps = textFile.map (line => line.split(";")(0))
    .map(p=>(p,1)) // convert to countable tuples
    .reduceByKey(_+_) // count keys
    .collect() // collect the result
    apps.foreach(println)

And I have the result in my console.And if I want to save the output to a file I do:

apps.saveAsTextFiles("/root/file/file1")

But how I can do it now with DStream,this is my code:

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
  val file = ssc.textFileStream("/root/file/test")
  var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  test.saveAsTextFiles("/root/file/file1")
    sc.stop()  
  }
}

But it doesn't work.

Any help please !!

Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: How to use saveAsTextFiles in spark streaming

[ Edited ]

You have a handy method bundled with Spark "foreachRDD": 

 

  val file = ssc.textFileStream("/root/file/test")
file.foreachRDD(t=> {
var test=t.map() //DO the map stuff here

test.saveAsTextFiles("/root/file/file1")

})

sc.stop()

 

 

 

Explorer
Posts: 33
Registered: ‎03-08-2016

Re: How to use saveAsTextFiles in spark streaming

I try :

val file = ssc.textFileStream("/root/file/test")
  file.foreachRDD(t=> {
           var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)       
           test.saveAsTextFiles("/root/file/file1")
         })

sc.stop()

But it doesn't work

Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: How to use saveAsTextFiles in spark streaming

[ Edited ]

From your code :

 

val textFile = sc.textFileStream("/root/file/test")

textFile.foreachRDD(t=> {

val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)

test.saveAsTextFile("/root/file/file1");

})

 

Mind the t.map( ) not file.map( )

Explorer
Posts: 33
Registered: ‎03-08-2016

Re: How to use saveAsTextFiles in spark streaming

It does not work,

what is the problem?

Here are my console:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/04/06 12:44:42 INFO SparkContext: Running Spark version 1.5.0
16/04/06 12:44:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/04/06 12:44:48 INFO SecurityManager: Changing view acls to: root
16/04/06 12:44:48 INFO SecurityManager: Changing modify acls to: root
16/04/06 12:44:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/04/06 12:45:00 INFO Slf4jLogger: Slf4jLogger started
16/04/06 12:45:00 INFO Remoting: Starting remoting
16/04/06 12:45:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.31:38825]
16/04/06 12:45:04 INFO Utils: Successfully started service 'sparkDriver' on port 38825.
16/04/06 12:45:04 INFO SparkEnv: Registering MapOutputTracker
16/04/06 12:45:04 INFO SparkEnv: Registering BlockManagerMaster
16/04/06 12:45:05 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1b896884-d84a-4c39-b9dd-93decdb6ee0b
16/04/06 12:45:05 INFO MemoryStore: MemoryStore started with capacity 1027.3 MB
16/04/06 12:45:06 INFO HttpFileServer: HTTP File server directory is /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1/httpd-849fa48d-e2de-46de-845a-a68a02f76b94
16/04/06 12:45:06 INFO HttpServer: Starting HTTP Server
16/04/06 12:45:08 INFO Utils: Successfully started service 'HTTP file server' on port 50992.
16/04/06 12:45:08 INFO SparkEnv: Registering OutputCommitCoordinator
16/04/06 12:45:11 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/04/06 12:45:11 INFO SparkUI: Started SparkUI at http://192.168.1.31:4040
16/04/06 12:45:12 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/04/06 12:45:12 INFO Executor: Starting executor ID driver on host localhost
16/04/06 12:45:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42498.
16/04/06 12:45:15 INFO NettyBlockTransferService: Server created on 42498
16/04/06 12:45:15 INFO BlockManagerMaster: Trying to register BlockManager
16/04/06 12:45:15 INFO BlockManagerMasterEndpoint: Registering block manager localhost:42498 with 1027.3 MB RAM, BlockManagerId(driver, localhost, 42498)
16/04/06 12:45:15 INFO BlockManagerMaster: Registered BlockManager
16/04/06 12:45:18 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/04/06 12:45:22 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@11fb9657
16/04/06 12:45:23 INFO SparkUI: Stopped Spark web UI at http://192.168.1.31:4040
16/04/06 12:45:23 INFO DAGScheduler: Stopping DAGScheduler
16/04/06 12:45:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/06 12:45:23 INFO MemoryStore: MemoryStore cleared
16/04/06 12:45:23 INFO BlockManager: BlockManager stopped
16/04/06 12:45:23 INFO BlockManagerMaster: BlockManagerMaster stopped
16/04/06 12:45:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/06 12:45:23 INFO SparkContext: Successfully stopped SparkContext
16/04/06 12:45:23 INFO ShutdownHookManager: Shutdown hook called
16/04/06 12:45:23 INFO ShutdownHookManager: Deleting directory /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1

No creation of the file, nothing happens.

what's wrong?

Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: How to use saveAsTextFiles in spark streaming

[ Edited ]

Seems that there is some glitch in your code. It would be much easy if you could post your code.

Explorer
Posts: 33
Registered: ‎03-08-2016

Re: How to use saveAsTextFiles in spark streaming

this is my code :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerStageCompleted
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileCount {
    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val file = ssc.textFileStream("/root/file/test/f3")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})
  sc.stop()  
  }
}
Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: How to use saveAsTextFiles in spark streaming

[ Edited ]

You need to assign number of threads to spark while running master on local, most obvious choice is 2, 1 to recieve the data and 1 to process them.

 

so the correct code should be :

    .setMaster("local[2]")

 

 

If your file is not too big change to :

    val ssc = new StreamingContext(sc, Seconds(1))

 

You have stopped the streaming but forgot to start it:

 

   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})

sc.start()
sc.awaitTermination()

 

As of now dont use sc.stop()

 

 

Explorer
Posts: 33
Registered: ‎03-08-2016

Re: How to use saveAsTextFiles in spark streaming

I try it, and I get :

16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
	at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
	at com.org.file.filecount.FileCount.main(FileCount.scala)

 

Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: How to use saveAsTextFiles in spark streaming

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z

  

Compare your code with below line:

 

    .setMaster("local[2]")

 

BTW which version of Spark Streaming you are usning?