Support Questions

Find answers, ask questions, and share your expertise

How to use saveAsTextFiles in spark streaming

avatar

Hi,

It is simple to display the result in RDD, for example:

val sc = new SparkContext(conf)
val textFile = sc.textFile("/root/file/test")
val apps = textFile.map (line => line.split(";")(0))
    .map(p=>(p,1)) // convert to countable tuples
    .reduceByKey(_+_) // count keys
    .collect() // collect the result
    apps.foreach(println)

And I have the result in my console.And if I want to save the output to a file I do:

apps.saveAsTextFiles("/root/file/file1")

But how I can do it now with DStream,this is my code:

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
  val file = ssc.textFileStream("/root/file/test")
  var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  test.saveAsTextFiles("/root/file/file1")
    sc.stop()  
  }
}

But it doesn't work.

Any help please !!

2 ACCEPTED SOLUTIONS

avatar

I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.

here is the complete code :

    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val file = ssc.textFileStream("/root/file/test/file")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})
  ssc.start()
  ssc.awaitTermination()
  }

View solution in original post

avatar
Rising Star

Thats because you have no new files arriving in the directory after streaming application starts.

 

You can try "cp" to drop files in the directory after starting the streaming application.

View solution in original post

12 REPLIES 12

avatar
Rising Star

You have a handy method bundled with Spark "foreachRDD": 

 

  val file = ssc.textFileStream("/root/file/test")
file.foreachRDD(t=> {
var test=t.map() //DO the map stuff here

test.saveAsTextFiles("/root/file/file1")

})

sc.stop()

 

 

 

avatar

I try :

val file = ssc.textFileStream("/root/file/test")
  file.foreachRDD(t=> {
           var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)       
           test.saveAsTextFiles("/root/file/file1")
         })

sc.stop()

But it doesn't work

avatar
Rising Star

From your code :

 

val textFile = sc.textFileStream("/root/file/test")

textFile.foreachRDD(t=> {

val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)

test.saveAsTextFile("/root/file/file1");

})

 

Mind the t.map( ) not file.map( )

avatar

It does not work,

what is the problem?

Here are my console:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/04/06 12:44:42 INFO SparkContext: Running Spark version 1.5.0
16/04/06 12:44:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/04/06 12:44:48 INFO SecurityManager: Changing view acls to: root
16/04/06 12:44:48 INFO SecurityManager: Changing modify acls to: root
16/04/06 12:44:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/04/06 12:45:00 INFO Slf4jLogger: Slf4jLogger started
16/04/06 12:45:00 INFO Remoting: Starting remoting
16/04/06 12:45:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.31:38825]
16/04/06 12:45:04 INFO Utils: Successfully started service 'sparkDriver' on port 38825.
16/04/06 12:45:04 INFO SparkEnv: Registering MapOutputTracker
16/04/06 12:45:04 INFO SparkEnv: Registering BlockManagerMaster
16/04/06 12:45:05 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1b896884-d84a-4c39-b9dd-93decdb6ee0b
16/04/06 12:45:05 INFO MemoryStore: MemoryStore started with capacity 1027.3 MB
16/04/06 12:45:06 INFO HttpFileServer: HTTP File server directory is /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1/httpd-849fa48d-e2de-46de-845a-a68a02f76b94
16/04/06 12:45:06 INFO HttpServer: Starting HTTP Server
16/04/06 12:45:08 INFO Utils: Successfully started service 'HTTP file server' on port 50992.
16/04/06 12:45:08 INFO SparkEnv: Registering OutputCommitCoordinator
16/04/06 12:45:11 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/04/06 12:45:11 INFO SparkUI: Started SparkUI at http://192.168.1.31:4040
16/04/06 12:45:12 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/04/06 12:45:12 INFO Executor: Starting executor ID driver on host localhost
16/04/06 12:45:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42498.
16/04/06 12:45:15 INFO NettyBlockTransferService: Server created on 42498
16/04/06 12:45:15 INFO BlockManagerMaster: Trying to register BlockManager
16/04/06 12:45:15 INFO BlockManagerMasterEndpoint: Registering block manager localhost:42498 with 1027.3 MB RAM, BlockManagerId(driver, localhost, 42498)
16/04/06 12:45:15 INFO BlockManagerMaster: Registered BlockManager
16/04/06 12:45:18 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/04/06 12:45:22 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@11fb9657
16/04/06 12:45:23 INFO SparkUI: Stopped Spark web UI at http://192.168.1.31:4040
16/04/06 12:45:23 INFO DAGScheduler: Stopping DAGScheduler
16/04/06 12:45:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/06 12:45:23 INFO MemoryStore: MemoryStore cleared
16/04/06 12:45:23 INFO BlockManager: BlockManager stopped
16/04/06 12:45:23 INFO BlockManagerMaster: BlockManagerMaster stopped
16/04/06 12:45:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/06 12:45:23 INFO SparkContext: Successfully stopped SparkContext
16/04/06 12:45:23 INFO ShutdownHookManager: Shutdown hook called
16/04/06 12:45:23 INFO ShutdownHookManager: Deleting directory /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1

No creation of the file, nothing happens.

what's wrong?

avatar
Rising Star

Seems that there is some glitch in your code. It would be much easy if you could post your code.

avatar

this is my code :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerStageCompleted
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileCount {
    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val file = ssc.textFileStream("/root/file/test/f3")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})
  sc.stop()  
  }
}

avatar
Rising Star

You need to assign number of threads to spark while running master on local, most obvious choice is 2, 1 to recieve the data and 1 to process them.

 

so the correct code should be :

    .setMaster("local[2]")

 

 

If your file is not too big change to :

    val ssc = new StreamingContext(sc, Seconds(1))

 

You have stopped the streaming but forgot to start it:

 

   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")

})

sc.start()
sc.awaitTermination()

 

As of now dont use sc.stop()

 

 

avatar

I try it, and I get :

16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
	at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
	at com.org.file.filecount.FileCount.main(FileCount.scala)

 

avatar
Rising Star
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z

  

Compare your code with below line:

 

    .setMaster("local[2]")

 

BTW which version of Spark Streaming you are usning?