Created on 04-05-2016 08:26 AM - edited 09-16-2022 03:12 AM
Hi,
It is simple to display the result in RDD, for example:
val sc = new SparkContext(conf)
val textFile = sc.textFile("/root/file/test")
val apps = textFile.map (line => line.split(";")(0))
    .map(p=>(p,1)) // convert to countable tuples
    .reduceByKey(_+_) // count keys
    .collect() // collect the result
    apps.foreach(println)And I have the result in my console.And if I want to save the output to a file I do:
apps.saveAsTextFiles("/root/file/file1")But how I can do it now with DStream,this is my code:
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
  val file = ssc.textFileStream("/root/file/test")
  var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  test.saveAsTextFiles("/root/file/file1")
    sc.stop()  
  }
}But it doesn't work.
Any help please !!
Created 04-07-2016 04:13 AM
I don't know why but I re-run and it works, but I have an empty _success file into the directory file1.
here is the complete code :
    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val file = ssc.textFileStream("/root/file/test/file")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")
})
  ssc.start()
  ssc.awaitTermination()
  }
					
				
			
			
				
			
			
			
				
			
			
			
			
			
		Created 04-07-2016 04:26 AM
Thats because you have no new files arriving in the directory after streaming application starts.
You can try "cp" to drop files in the directory after starting the streaming application.
Created on 04-05-2016 11:51 PM - edited 04-05-2016 11:51 PM
You have a handy method bundled with Spark "foreachRDD":
  val file = ssc.textFileStream("/root/file/test")
  file.foreachRDD(t=> {
           var test=t.map() //DO the map stuff here
       
           test.saveAsTextFiles("/root/file/file1")
         })
sc.stop()
Created 04-06-2016 01:22 AM
I try :
val file = ssc.textFileStream("/root/file/test")
  file.foreachRDD(t=> {
           var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)       
           test.saveAsTextFiles("/root/file/file1")
         })
sc.stop()But it doesn't work
Created on 04-06-2016 03:38 AM - edited 04-06-2016 03:41 AM
From your code :
val textFile = sc.textFileStream("/root/file/test")
textFile.foreachRDD(t=> {
val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFile("/root/file/file1");
})
Mind the t.map( ) not file.map( )
Created 04-06-2016 04:51 AM
It does not work,
what is the problem?
Here are my console:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/04/06 12:44:42 INFO SparkContext: Running Spark version 1.5.0 16/04/06 12:44:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/04/06 12:44:48 INFO SecurityManager: Changing view acls to: root 16/04/06 12:44:48 INFO SecurityManager: Changing modify acls to: root 16/04/06 12:44:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 16/04/06 12:45:00 INFO Slf4jLogger: Slf4jLogger started 16/04/06 12:45:00 INFO Remoting: Starting remoting 16/04/06 12:45:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.31:38825] 16/04/06 12:45:04 INFO Utils: Successfully started service 'sparkDriver' on port 38825. 16/04/06 12:45:04 INFO SparkEnv: Registering MapOutputTracker 16/04/06 12:45:04 INFO SparkEnv: Registering BlockManagerMaster 16/04/06 12:45:05 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1b896884-d84a-4c39-b9dd-93decdb6ee0b 16/04/06 12:45:05 INFO MemoryStore: MemoryStore started with capacity 1027.3 MB 16/04/06 12:45:06 INFO HttpFileServer: HTTP File server directory is /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1/httpd-849fa48d-e2de-46de-845a-a68a02f76b94 16/04/06 12:45:06 INFO HttpServer: Starting HTTP Server 16/04/06 12:45:08 INFO Utils: Successfully started service 'HTTP file server' on port 50992. 16/04/06 12:45:08 INFO SparkEnv: Registering OutputCommitCoordinator 16/04/06 12:45:11 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/04/06 12:45:11 INFO SparkUI: Started SparkUI at http://192.168.1.31:4040 16/04/06 12:45:12 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 16/04/06 12:45:12 INFO Executor: Starting executor ID driver on host localhost 16/04/06 12:45:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42498. 16/04/06 12:45:15 INFO NettyBlockTransferService: Server created on 42498 16/04/06 12:45:15 INFO BlockManagerMaster: Trying to register BlockManager 16/04/06 12:45:15 INFO BlockManagerMasterEndpoint: Registering block manager localhost:42498 with 1027.3 MB RAM, BlockManagerId(driver, localhost, 42498) 16/04/06 12:45:15 INFO BlockManagerMaster: Registered BlockManager 16/04/06 12:45:18 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. 16/04/06 12:45:22 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@11fb9657 16/04/06 12:45:23 INFO SparkUI: Stopped Spark web UI at http://192.168.1.31:4040 16/04/06 12:45:23 INFO DAGScheduler: Stopping DAGScheduler 16/04/06 12:45:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/04/06 12:45:23 INFO MemoryStore: MemoryStore cleared 16/04/06 12:45:23 INFO BlockManager: BlockManager stopped 16/04/06 12:45:23 INFO BlockManagerMaster: BlockManagerMaster stopped 16/04/06 12:45:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/04/06 12:45:23 INFO SparkContext: Successfully stopped SparkContext 16/04/06 12:45:23 INFO ShutdownHookManager: Shutdown hook called 16/04/06 12:45:23 INFO ShutdownHookManager: Deleting directory /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1
No creation of the file, nothing happens.
what's wrong?
Created on 04-06-2016 04:54 AM - edited 04-06-2016 04:55 AM
Seems that there is some glitch in your code. It would be much easy if you could post your code.
Created 04-06-2016 04:59 AM
this is my code :
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerStageCompleted
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileCount {
    def main(args: Array[String]) {
    val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val file = ssc.textFileStream("/root/file/test/f3")
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")
})
  sc.stop()  
  }
}
					
				
			
			
				
			
			
			
			
			
			
			
		Created on 04-06-2016 05:10 AM - edited 04-06-2016 05:13 AM
You need to assign number of threads to spark while running master on local, most obvious choice is 2, 1 to recieve the data and 1 to process them.
so the correct code should be :
    .setMaster("local[2]")
If your file is not too big change to :
val ssc = new StreamingContext(sc, Seconds(1))
You have stopped the streaming but forgot to start it:
   file.foreachRDD(t=> {
         val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
         test.saveAsTextFile("/root/file/file1")
})
sc.start()
sc.awaitTermination()
As of now dont use sc.stop()
Created 04-06-2016 06:20 AM
I try it, and I get :
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.org.file.filecount.FileCount$.main(FileCount.scala:52) at com.org.file.filecount.FileCount.main(FileCount.scala)
Created 04-07-2016 01:57 AM
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
Compare your code with below line:
    .setMaster("local[2]")
BTW which version of Spark Streaming you are usning?
 
					
				
				
			
		
