Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

In this article, I'll show how to analyze a real-time data stream using Spark Structured Streaming.

I wanted to provide a quick Structured Streaming example that shows an end-to-end flow from source (Twitter), through Kafka, and then data processing using Spark. To accomplish this, I used Apache NiFi (part of Hortonworks HDF) to capture the Twitter data and send it to Apache Kafka. From here, Spark was used to consume each Twitter payload (as JSON), parse, and analyze the data in real-time.

Before I jump into the technical details, it's good to understand some of the business value of this process. There are many practical applications based on this technology stack as well as the code that I describe below. Here are a few of the business applications that can be implemented using this technology:

  • Security: Real-time log analysis allows organizations to extract IP addresses, ports, services and events. This data can then be aggregated based on a time window, analyzed, and monitored for abnormal activity. Spark Structured Streaming also supports real-time joins with static data, further enriching the logs by incorporating external data such as location, detailed user information, and historical data.
  • Sensors & IoT: When working with sensors, out-of-order data is a challenge. The order of readings is critical for identifying patterns and behavior within an environment. One of the goals of Spark Structured Streaming is to maintain order using "watermarking", which enables the engine to automatically track the current event time within the data and attempt to clean up the old state accordingly.
  • Web Analytics: Structured streaming can be used to route, process, and aggregate clickstream data from your website. This analysis can feed external systems in order to proactively send notifications to users, launch a web form, or trigger an action in a 3rd party system.
  • Call Center: Identify trends related to call volume, response times, emerging topics, at-risk customers and cross-sell opportunities. Spark is capable of processing both the structure and unstructured call records to address these business needs.
  • Social Media: Analyze social feeds in real-time to detect influencers, trending topics, abnormal volume, or other indicators. All of this can be monitored and aggregated within a defined time window. The example below will go into more details related to this specific use case.

Here are the technical details associated with this application:

Step 1: Connect to Twitter and stream the data to Kafka

A simple NiFi flow was used to capture the Twitter data using the GetTwitter NiFi processor. This processor consumes data from the Twitter Streaming API. Using PutKafka, I was able to push the JSON payload to a Kafka topic called "dztopic1". Below is a screenshot that shows this NiFi flow:

14112-screen-shot-2017-03-26-at-104203-pm.png

Step 2: Use Spark to read Kafka Stream

Prerequisite: Before you launch Spark, make sure that you have included the required artifact / dependency as described here: spark-sql-kafka-0-10_2.11. If you want to add this via PySpark cmd line, you can run something like this: ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

Spark Structured Streaming subscribes to our Kafka topic using the code shown below:

# Consume Kafka topic
events = spark    .readStream    .format("kafka")    .option("kafka.bootstrap.servers", "localhost:9092")    .option("subscribe", "dztopic1")    .load() 

# Cast the JSON payload as a String
events = events.selectExpr("CAST(value AS STRING)")

Step 3: Define Python UDFs

I created two python functions. The function, parse_json, parsed the Twitter JSON payload and extract each field of interest. The second function, convert_twitter_date, converts the Twitter created_at timestamp into a pyspark timestamp, which is used for windowing. I like using python UDFs, but note that there are other ways to parse JSON and convert the timestamp field.

Function: parse_json

def parse_json(df):
    twitterid   = str(json.loads(df[0])['id'])
    created_at  = str(json.loads(df[0])['created_at'])
    tweet       = str(json.loads(df[0])['text'])
    screen_name = str(json.loads(df[0])['user']['screen_name'])
    return [twitterid, created_at, tweet, screen_name]

Function: convert_twitter_date

def convert_twitter_date(timestamp_str):
    output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
    return output_ts

Step 4: Parse JSON within Spark

Once we have the JSON string, I used the two python UDFs to parse each payload, convert the timestamp, and output our relevant dataframe columns (created_at, screen_name, tweet, and create_at_ts).

json_schema = StructType([
  StructField("twitterid", StringType(), True),
  StructField("created_at", StringType(), True),
  StructField("tweet", StringType(), True),
  StructField("screen_name", StringType(), True)
])

udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())

jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns])))                    .where(col("parsed_field").isNotNull())                    .withColumn("created_at", col("parsed_field.created_at"))                    .withColumn("screen_name", col("parsed_field.screen_name"))                    .withColumn("tweet", col("parsed_field.tweet"))                    .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))

Step 5: Spark Windowing

Using Spark to capture trending screen_names. I chose to use the window operation to aggregate within 1 minute windows and with a slide duration of 15 seconds.

# http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window
# pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
windowedCounts = jsonoutput.groupBy(
    window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),
    jsonoutput.screen_name
    ).count()

Step 6: Start Spark Structure Streaming

Start the spark structure streaming query. In this case, I am launching two queries, one that contains results from our time window (query_window) and another query (query_json) that contains the parsed JSON records. Both tables are stored in-memory. Typically, you would use a different Spark sink such as writing the results back to Kafka or persisting to HDFS. For this example (and useful for debugging), I am writing the results to two in-memory tables.

query_window = windowedCounts     .writeStream     .outputMode("complete")     .format("memory")     .queryName("myTable_window")     .start()

query_json = jsonoutput     .writeStream     .outputMode("append")     .format("memory")     .queryName("myTable_json")     .start()

Step 7: Interactively Query Structured Streaming Table

Each in-memory table can be interactively queried to give the current state using standard SparkSQL syntax as shown below.

Query myTable_json: Output 15 records of the in-memory table

spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)

14113-screen-shot-2017-03-27-at-94915-pm.png

Query myTable_json: Output the top 15 Twitter authors

spark.sql("select screen_name, count(*) as count from myTable_json group by screen_name order by count desc limit 15").show(15,False) 

14115-screen-shot-2017-03-27-at-95011-pm.png

Query myTable_window: Output the window aggregations

spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)

14116-screen-shot-2017-03-27-at-95030-pm.png

References:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html https://hortonworks.com/apache/spark/

https://hortonworks.com/apache/spark/

Environment:

Apache Spark 2.1.0 (Pyspark with Python 2.7.5), Apache NiFi 1.1.0, Apache Kafka 2.10-0.8.2.1

9,767 Views
Comments
Rising Star

Hi,

I am getting some Unicode error messages when running the job.

Run:

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py

Error:

UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 39: ordinal not in range(128)

Any help?

How can I fix this issue?

Not applicable

Hello, I think that the process you do with python and kafka and spark is too easy to do with apache nifi processors, is there any reason to do so??

Thank you!

New Contributor

Hi,

You are using which version of HDF?

Becasue HDF 3.0.2 is not supporting spark when i added it through add service through ambari UI

Super Guru

You have to run Apache Spark on HDP 2.6.

You should start with HDP cluster then add HDF to it.

see https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_installing-hdf-and-hdp/content/ch_instal...

Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 01:33 PM
Updated by:
 
Contributors
Top Kudoed Authors