- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 02-03-2017 04:53 PM
Word Count using Spark Streaming in Pyspark
This is a WordCount example with the following
- Local File System as a source
- Calculate counts using reduceByKey and store them in a temp table
- Querying running counts through SQL
Setup: Define the function that sets up the StreamingContext
This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations.
But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.
from pyspark.streaming import StreamingContext batchIntervalSeconds = 10 def creatingFunc(): ssc = StreamingContext(sc, batchIntervalSeconds) # Set each DStreams in this context to remember RDDs it generated in the last given duration. # DStreams remember RDDs only for a limited duration of time and releases them for garbage # collection. This method allows the developer to specify how long to remember the RDDs ( # if the developer wishes to query old data outside the DStream computation). ssc.remember(60) lines = ssc.textFileStream("YOUR_S3_PATH_HERE") lines.pprint() words = lines.flatMap(lambda line: line.split(",")) pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) def process(time, rdd): df = sqlContext.createDataFrame(rdd) df.registerTempTable("myCounts") wordCounts.foreachRDD(process) return ssc
Create a streaming context
If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory
checkpointDir = "/LOCAL_DIR/DATA/" ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
Start the streaming context
# This line starts the streaming context in the background. ssc.start() # This ensures the cell is put on hold until the background streaming thread is started properly. ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
Interactive query
%sql select * from myCounts
world |
hello |
Stop the streaming context.
ssc.stop(stopSparkContext=False)
Stop all streaming contexts running in this JVM from python
This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.
if( not sc._jvm.StreamingContext.getActive().isEmpty() 😞
sc._jvm.StreamingContext.getActive().get().stop(False)