Calculate counts using reduceByKey and store them in a temp table
Querying running counts through SQL
Setup: Define the function that sets up the StreamingContext
This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations.
But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
def creatingFunc():
ssc = StreamingContext(sc, batchIntervalSeconds)
# Set each DStreams in this context to remember RDDs it generated in the last given duration.
# DStreams remember RDDs only for a limited duration of time and releases them for garbage
# collection. This method allows the developer to specify how long to remember the RDDs (
# if the developer wishes to query old data outside the DStream computation).
ssc.remember(60)
lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
lines.pprint()
words = lines.flatMap(lambda line: line.split(","))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
def process(time, rdd):
df = sqlContext.createDataFrame(rdd)
df.registerTempTable("myCounts")
wordCounts.foreachRDD(process)
return ssc
Create a streaming context
If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory
# This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
Stop all streaming contexts running in this JVM from python
This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.
if( not sc._jvm.StreamingContext.getActive().isEmpty() 😞
sc._jvm.StreamingContext.getActive().get().stop(False)