Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

Word Count using Spark Streaming in Pyspark

This is a WordCount example with the following

  • Local File System as a source
  • Calculate counts using reduceByKey and store them in a temp table
  • Querying running counts through SQL

Setup: Define the function that sets up the StreamingContext

This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations.

But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.

from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10 

def creatingFunc():
  ssc = StreamingContext(sc, batchIntervalSeconds)
  # Set each DStreams in this context to remember RDDs it generated in the last given duration.
  # DStreams remember RDDs only for a limited duration of time and releases them for garbage
  # collection. This method allows the developer to specify how long to remember the RDDs (
  # if the developer wishes to query old data outside the DStream computation).
  ssc.remember(60)
  
  lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
  lines.pprint()

  words = lines.flatMap(lambda line: line.split(","))
  pairs = words.map(lambda word: (word, 1))
  wordCounts = pairs.reduceByKey(lambda x, y: x + y)

  def process(time, rdd):
    df = sqlContext.createDataFrame(rdd)
    df.registerTempTable("myCounts")
  
  wordCounts.foreachRDD(process)
  
  return ssc

Create a streaming context

If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory

checkpointDir = "/LOCAL_DIR/DATA/"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)

Start the streaming context

# This line starts the streaming context in the background.
ssc.start()

# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
------------------------------------------- Time: 2015-10-07 20:57:00 ------------------------------------------- hello,world hello,world hello,world hello,world hello,world hello,world hello,world hello,world hello,world hello,world ... ------------------------------------------- Time: 2015-10-07 20:57:10 ------------------------------------------- hello,world hello,world hello,world hello,world hello,world hello,world hello,world hello,world hello,world hello,world ...

Interactive query

%sql select * from myCounts
world
hello

Stop the streaming context.

ssc.stop(stopSparkContext=False)

Stop all streaming contexts running in this JVM from python

This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.


if( not sc._jvm.StreamingContext.getActive().isEmpty() 😞 
	sc._jvm.StreamingContext.getActive().get().stop(False)
13,188 Views