Created on 03-28-2017 01:39 AM - edited 08-17-2019 01:33 PM
In this article, I'll show how to analyze a real-time data stream using Spark Structured Streaming.
I wanted to provide a quick Structured Streaming example that shows an end-to-end flow from source (Twitter), through Kafka, and then data processing using Spark. To accomplish this, I used Apache NiFi (part of Hortonworks HDF) to capture the Twitter data and send it to Apache Kafka. From here, Spark was used to consume each Twitter payload (as JSON), parse, and analyze the data in real-time.
Before I jump into the technical details, it's good to understand some of the business value of this process. There are many practical applications based on this technology stack as well as the code that I describe below. Here are a few of the business applications that can be implemented using this technology:
Here are the technical details associated with this application:
Step 1: Connect to Twitter and stream the data to Kafka
A simple NiFi flow was used to capture the Twitter data using the GetTwitter NiFi processor. This processor consumes data from the Twitter Streaming API. Using PutKafka, I was able to push the JSON payload to a Kafka topic called "dztopic1". Below is a screenshot that shows this NiFi flow:
Step 2: Use Spark to read Kafka Stream
Prerequisite: Before you launch Spark, make sure that you have included the required artifact / dependency as described here: spark-sql-kafka-0-10_2.11. If you want to add this via PySpark cmd line, you can run something like this: ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Spark Structured Streaming subscribes to our Kafka topic using the code shown below:
# Consume Kafka topic events = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dztopic1") .load() # Cast the JSON payload as a String events = events.selectExpr("CAST(value AS STRING)")
Step 3: Define Python UDFs
I created two python functions. The function, parse_json, parsed the Twitter JSON payload and extract each field of interest. The second function, convert_twitter_date, converts the Twitter created_at timestamp into a pyspark timestamp, which is used for windowing. I like using python UDFs, but note that there are other ways to parse JSON and convert the timestamp field.
Function: parse_json
def parse_json(df): twitterid = str(json.loads(df[0])['id']) created_at = str(json.loads(df[0])['created_at']) tweet = str(json.loads(df[0])['text']) screen_name = str(json.loads(df[0])['user']['screen_name']) return [twitterid, created_at, tweet, screen_name]
Function: convert_twitter_date
def convert_twitter_date(timestamp_str): output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y') return output_ts
Step 4: Parse JSON within Spark
Once we have the JSON string, I used the two python UDFs to parse each payload, convert the timestamp, and output our relevant dataframe columns (created_at, screen_name, tweet, and create_at_ts).
json_schema = StructType([ StructField("twitterid", StringType(), True), StructField("created_at", StringType(), True), StructField("tweet", StringType(), True), StructField("screen_name", StringType(), True) ]) udf_parse_json = udf(parse_json , json_schema) udf_convert_twitter_date = udf(convert_twitter_date, TimestampType()) jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) .where(col("parsed_field").isNotNull()) .withColumn("created_at", col("parsed_field.created_at")) .withColumn("screen_name", col("parsed_field.screen_name")) .withColumn("tweet", col("parsed_field.tweet")) .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
Step 5: Spark Windowing
Using Spark to capture trending screen_names. I chose to use the window operation to aggregate within 1 minute windows and with a slide duration of 15 seconds.
# http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window # pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None) windowedCounts = jsonoutput.groupBy( window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"), jsonoutput.screen_name ).count()
Step 6: Start Spark Structure Streaming
Start the spark structure streaming query. In this case, I am launching two queries, one that contains results from our time window (query_window) and another query (query_json) that contains the parsed JSON records. Both tables are stored in-memory. Typically, you would use a different Spark sink such as writing the results back to Kafka or persisting to HDFS. For this example (and useful for debugging), I am writing the results to two in-memory tables.
query_window = windowedCounts .writeStream .outputMode("complete") .format("memory") .queryName("myTable_window") .start() query_json = jsonoutput .writeStream .outputMode("append") .format("memory") .queryName("myTable_json") .start()
Step 7: Interactively Query Structured Streaming Table
Each in-memory table can be interactively queried to give the current state using standard SparkSQL syntax as shown below.
Query myTable_json: Output 15 records of the in-memory table
spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
Query myTable_json: Output the top 15 Twitter authors
spark.sql("select screen_name, count(*) as count from myTable_json group by screen_name order by count desc limit 15").show(15,False)
Query myTable_window: Output the window aggregations
spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
References:
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html https://hortonworks.com/apache/spark/
https://hortonworks.com/apache/spark/
Environment:
Apache Spark 2.1.0 (Pyspark with Python 2.7.5), Apache NiFi 1.1.0, Apache Kafka 2.10-0.8.2.1
Created on 09-28-2017 01:40 PM
Hi,
I am getting some Unicode error messages when running the job.
Run:
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py
Error:
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 39: ordinal not in range(128)
Any help?
How can I fix this issue?
Created on 03-16-2018 11:38 AM
Hello, I think that the process you do with python and kafka and spark is too easy to do with apache nifi processors, is there any reason to do so??
Thank you!
Created on 03-22-2018 06:36 AM
Hi,
You are using which version of HDF?
Becasue HDF 3.0.2 is not supporting spark when i added it through add service through ambari UI
Created on 03-23-2018 02:36 PM
You have to run Apache Spark on HDP 2.6.
You should start with HDP cluster then add HDF to it.