I wanted to provide a quick Structured Streaming example that shows an end-to-end flow from source (Twitter), through Kafka, and then data processing using Spark. To accomplish this, I used Apache NiFi (part of Hortonworks HDF) to capture the Twitter data and send it to Apache Kafka.
From here, Spark was used to consume each Twitter payload (as JSON), parse, and analyze the data in real-time.
Before I jump into the technical details, it's good to understand some of the business value of this process. There are many practical applications based on this technology stack as well as the code that I describe below. Here are a few of the business applications that can be implemented using this technology:
Security: Real-time log analysis allows organizations to extract IP addresses, ports, services and events. This data can then be aggregated based on a time window, analyzed, and monitored for abnormal activity. Spark Structured Streaming also supports real-time joins with static data, further enriching the logs by incorporating external data such as location, detailed user information, and historical data.
Sensors & IoT: When working with sensors, out-of-order data is a challenge. The order of readings is critical for identifying patterns and behavior within an environment. One of the goals of Spark Structured Streaming is to maintain order using "watermarking", which enables the engine to automatically track the current event time within the data and attempt to clean up the old state accordingly.
Web Analytics: Structured streaming can be used to route, process, and aggregate clickstream data from your website. This analysis can feed external systems in order to proactively send notifications to users, launch a web form, or trigger an action in a 3rd party system.
Call Center: Identify trends related to call volume, response times, emerging topics, at-risk customers and cross-sell opportunities. Spark is capable of processing both the structure and unstructured call records to address these business needs.
Social Media: Analyze social feeds in real-time to detect influencers, trending topics, abnormal volume, or other indicators. All of this can be monitored and aggregated within a defined time window. The example below will go into more details related to this specific use case.
Here are the technical details associated with this application:
Step 1: Connect to Twitter and stream the data to Kafka
A simple NiFi flow was used to capture the Twitter data using the GetTwitter NiFi processor. This processor consumes data from the Twitter Streaming API. Using PutKafka, I was able to push the JSON payload to a Kafka topic called "dztopic1". Below is a screenshot that shows this NiFi flow:
Step 2: Use Spark to read Kafka Stream
Prerequisite: Before you launch Spark, make sure that you have included the required artifact / dependency as described here: spark-sql-kafka-0-10_2.11. If you want to add this via PySpark cmd line, you can run something like this: ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Spark Structured Streaming subscribes to our Kafka topic using the code shown below:
# Consume Kafka topic
events = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dztopic1") .load()
# Cast the JSON payload as a String
events = events.selectExpr("CAST(value AS STRING)")
Step 3: Define Python UDFs
I created two python functions. The function, parse_json, parsed the Twitter JSON payload and extract each field of interest. The second function, convert_twitter_date, converts the Twitter created_at timestamp into a pyspark timestamp, which is used for windowing. I like using python UDFs, but note that there are other ways to parse JSON and convert the timestamp field.
Start the spark structure streaming query. In this case, I am launching two queries, one that contains results from our time window (query_window) and another query (query_json) that contains the parsed JSON records. Both tables are stored in-memory. Typically, you would use a different Spark sink such as writing the results back to Kafka or persisting to HDFS. For this example (and useful for debugging), I am writing the results to two in-memory tables.