Reply
Explorer
Posts: 62
Registered: ‎01-22-2014

Unable to write data from a TCP port into HDFS in Spark Streaming

Hi,

 

I am trying to stream data from a TCP port and load the data into HDFS using Spark-Streaming. 

 

The files are are getting created in HDFS, but they are all empty. But the Spark Streaming console shows reading of the data from the TCP port. 

 

I tried this in Spark 0.9.0, 0.9.1 and 1.0 using the Scala-Shell in CDH-5.  I did a  'nc -lk 9993' in a anothe terminal to stream the data.

 

Below is the code, please let know how this issue could be resolved. Thanks.

 

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
import org.apache.spark.streaming.StreamingContext._
import StreamingContext._

val ssc8 = new StreamingContext("local", "NetworkWordCount", Seconds(1))
val lines8 = ssc8.socketTextStream("localhost", 9993)

val words8 = lines8.flatMap(_.split(" "))


val pairs8 = words8.map(word => (word, 1))
val wordCounts8 = pairs8.reduceByKey(_ + _)

wordCounts8.saveAsTextFiles("hdfs://Node1:8020/user/root/Spark8")

wordCounts8.print()

ssc8.start() 

 

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Unable to write data from a TCP port into HDFS in Spark Streaming

Longshot, but are you running locally, with a master like "local" or "local[1]"? I ran into something like this and using "local[2]" or higher resolved it. It needs at least 2 threads it seems.

Explorer
Posts: 62
Registered: ‎01-22-2014

Re: Unable to write data from a TCP port into HDFS in Spark Streaming

Hi,

 

Thanks for your response.

 

I tried using both local[2] and local[4], but still it did not write into HDFS.

 

But there were a couple of improvements -

 

1. Previously the wordcount was not getting displayed on console, now it is.

2.Now along with the _SUCESS file inside the directory, the part-* files are also getting created - but all are empty.

 

Wanted to mention one more thing - I am not using the method ssc8.awaitTermination() . And, when I use ^D or terminate the netcat function, still there is no use. Am I missing something here?

 

I have provided the logs and the HDFS files below -

 

HDFS  Output Files
--------------------

-rw-r--r--   3 user1 user1          0 2014-06-26 09:19 /user/user1/SparkV/_SUCCESS
-rw-r--r--   3 user1 user1         0 2014-06-26 09:19 /user/user1/SparkV/part-00000
-rw-r--r--   3 user1 user1          0 2014-06-26 09:19 /user/user1/SparkV/part-00001




Spark-Shell Console Log
---------------------


-------------------------------------------
Time: 1403789836000 ms
-------------------------------------------
(f,3)
(fsd,2)
(sdf,2)
(fds,1)
(sd,3)

14/06/26 09:37:16 INFO scheduler.JobScheduler: Finished job streaming job 1403789836000 ms.1 from job set of time 1403789836000 ms
14/06/26 09:37:16 INFO storage.MemoryStore: ensureFreeSpace(8) called with curMem=327, maxMem=286339891
14/06/26 09:37:16 INFO storage.MemoryStore: Block input-0-1403789836000 stored as bytes to memory (size 8.0 B, free 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerInfo: Added input-0-1403789836000 in memory on localhost:49784 (size: 8.0 B, free: 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerMaster: Updated info of block input-0-1403789836000
14/06/26 09:37:16 WARN storage.BlockManager: Block input-0-1403789836000 already exists on this machine; not re-adding it
14/06/26 09:37:16 INFO receiver.BlockGenerator: Pushed block input-0-1403789836000
14/06/26 09:37:16 INFO storage.MemoryStore: ensureFreeSpace(15) called with curMem=335, maxMem=286339891
14/06/26 09:37:16 INFO storage.MemoryStore: Block input-0-1403789836200 stored as bytes to memory (size 15.0 B, free 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerInfo: Added input-0-1403789836200 in memory on localhost:49784 (size: 15.0 B, free: 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerMaster: Updated info of block input-0-1403789836200
14/06/26 09:37:16 WARN storage.BlockManager: Block input-0-1403789836200 already exists on this machine; not re-adding it
14/06/26 09:37:16 INFO receiver.BlockGenerator: Pushed block input-0-1403789836200
14/06/26 09:37:16 INFO storage.MemoryStore: ensureFreeSpace(8) called with curMem=350, maxMem=286339891
14/06/26 09:37:16 INFO storage.MemoryStore: Block input-0-1403789836400 stored as bytes to memory (size 8.0 B, free 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerInfo: Added input-0-1403789836400 in memory on localhost:49784 (size: 8.0 B, free: 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerMaster: Updated info of block input-0-1403789836400
14/06/26 09:37:16 WARN storage.BlockManager: Block input-0-1403789836400 already exists on this machine; not re-adding it
14/06/26 09:37:16 INFO receiver.BlockGenerator: Pushed block input-0-1403789836400
14/06/26 09:37:16 INFO storage.MemoryStore: ensureFreeSpace(9) called with curMem=358, maxMem=286339891
14/06/26 09:37:16 INFO storage.MemoryStore: Block input-0-1403789836600 stored as bytes to memory (size 9.0 B, free 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerInfo: Added input-0-1403789836600 in memory on localhost:49784 (size: 9.0 B, free: 273.1 MB)
14/06/26 09:37:16 INFO storage.BlockManagerMaster: Updated info of block input-0-1403789836600
14/06/26 09:37:16 WARN storage.BlockManager: Block input-0-1403789836600 already exists on this machine; not re-adding it
14/06/26 09:37:16 INFO receiver.BlockGenerator: Pushed block input-0-1403789836600
14/06/26 09:37:17 INFO storage.MemoryStore: ensureFreeSpace(14) called with curMem=367, maxMem=286339891
14/06/26 09:37:17 INFO storage.MemoryStore: Block input-0-1403789836800 stored as bytes to memory (size 14.0 B, free 273.1 MB)
14/06/26 09:37:17 INFO storage.BlockManagerInfo: Added input-0-1403789836800 in memory on localhost:49784 (size: 14.0 B, free: 273.1 MB)
14/06/26 09:37:17 INFO storage.BlockManagerMaster: Updated info of block input-0-1403789836800
14/06/26 09:37:17 WARN storage.BlockManager: Block input-0-1403789836800 already exists on this machine; not re-adding it
14/06/26 09:37:17 INFO receiver.BlockGenerator: Pushed block input-0-1403789836800
14/06/26 09:37:18 INFO scheduler.ReceiverTracker: Stream 0 received 6 blocks
14/06/26 09:37:18 INFO scheduler.JobScheduler: Added jobs for time 1403789838000 ms

 

 

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Unable to write data from a TCP port into HDFS in Spark Streaming

Hm, why not call awaitTermination()? I'm not sure what else happens in your program, but if it exits it's going to try to shut down the processes. You need it to keep running.

Explorer
Posts: 62
Registered: ‎01-22-2014

Re: Unable to write data from a TCP port into HDFS in Spark Streaming

Hi,

 

My program is not doing anything else. It is same as the code given above which is a sample program from Spark programming guide.

 

As of now, awaitTermination() is not used in my program.

 

I am not able to use awaitTermination() as once the start() method is specified in the spark shell, streaming logs begin in the console and I cannot call the method awaitTermination(). If I open a new terminal, open the spark shell and call the var.awaitTermination() function, it does not do anything.

 

Pls assist if my understanding is not correct or if I am doing anything wrong here.

Cloudera Employee
Posts: 366
Registered: ‎07-29-2013

Re: Unable to write data from a TCP port into HDFS in Spark Streaming

Sure, but the shell is still parsing your input. You should call awaitTermination(). Usually I paste in the whole block of code I want to execute all at once.

 

Of course, long-term, you would create a stand-alone program that you compile, and not just use the shell.

Highlighted
Explorer
Posts: 62
Registered: ‎01-22-2014

Re: Unable to write data from a TCP port into HDFS in Spark Streaming

Hi,

 

I copies the completed code as a single block and executed it, still no luck.

 

Is there any other way of calling the awaitTermination()  option as I cannot use sbt currently in my organization as local repositories are not configured and sbt cannot connect to internet.