Member since
07-18-2016
94
Posts
94
Kudos Received
20
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2577 | 08-11-2017 06:04 PM | |
2439 | 08-02-2017 11:22 PM | |
9770 | 07-10-2017 03:36 PM | |
17968 | 03-17-2017 01:27 AM | |
14813 | 02-24-2017 05:35 PM |
07-24-2017
12:21 PM
@Hugo Felix Nice! Glad to see that you got it working. If you upgrade to Spark 2.x, then you should not need to add the serde (just something to keep in mind). If you're all set, can you please mark thread as accepted. Thanks!
... View more
07-20-2017
04:13 PM
1 Kudo
@Hugo Felix Here's a test, that should help to determine if your syntax is off or if your environment is misconfigured. First, create a test Hive table and populate it with data: CREATE TABLE IF NOT EXISTS testtable
(id string, text string)
STORED AS ORC;
INSERT INTO TABLE testtable VALUES
('1111', 'The service was great, and the agent was very helpful'),
('2222', 'I enjoyed the event but the food was terrible'),
('3333', 'Unhappy with the organization of the event')
Then create a file called "my_py_udf.py" (as shown below). It can be placed anywhere, but in my example I placed it at /tmp/my_py_udf.py. import sys
for line in sys.stdin:
id, text = line.replace('\n',' ').split('\t')
positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"])
negative = set(["hate", "bad", "stupid", "terrible", "unhappy"])
words = text.split()
word_count = len(words)
positive_matches = [1 for word in words if word in positive]
negative_matches = [-1 for word in words if word in negative]
st = sum(positive_matches) + sum(negative_matches)
if st > 0:
print '\t'.join([text, 'positive', str(word_count)])
elif st < 0:
print '\t'.join([text, 'negative', str(word_count)])
else:
print '\t'.join([text, 'neutral', str(word_count)])
Then from within Hive, execute the following commands: ADD FILE my_py_udf.py;
SELECT
TRANSFORM (id, text)
USING 'python my_py_udf.py'
AS (text, sentiment, word_count)
FROM testtable;
Your resulting output should look like this: The service was great, and the agent was very helpful positive 10
I enjoyed the event but the food was terrible neutral 9
Unhappy with the organization of the event neutral 7
... View more
07-17-2017
08:02 PM
@Hugo Felix Yeah, that was the next test I had in mind for you to test out. Thanks for sharing the results. Can you share the environment you are working in? Are you using Spark 1.6.x or Spark 2.x? Also, what version of HDP are you using?
... View more
07-17-2017
04:21 PM
@Hugo Felix What happens if you replace created_at, user.screen_name, text = line.split('\t')
with created_at, screen_name, text = line.split('\t')
I do not believe python will be able to parse the user.screen_name variable in the context you are writing it. In my example, the function accepts a tab-delimited argument and then you perform the split('\t'), it parses the argument out into X number of variables. The names of the assigned variables (such as created_at, screen_name, text) are arbitrary (you could name then x,y,z if you wanted, but you would have to make sure the rest of the python script used the x,y,z variable names). Give that a try and let me know if it helps. Thanks.
... View more
07-12-2017
08:01 PM
2 Kudos
@Hugo Felix It might be an out of memory issue, but it could also be a variable/column mismatch. Can you share your python function and the Hive query so that I can review?
... View more
07-10-2017
03:36 PM
3 Kudos
@Hugo Felix One option is to implement these functions as a Hive UDF (written in python). For example, your new python function (my_py_udf.py) would look something like this: import sys
for line in sys.stdin:
createdAt, screenName, text = line.replace('\n',' ').split('\t')
positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
negative = set(["hate", "bad", "stupid"])
words = text.split()
word_count = len(words)
positive_matches = [1 for word in words if word in positive]
negative_matches = [-1 for word in words if word in negative]
st = sum(positive_matches) + sum(negative_matches)
if st > 0:
print '\t'.join([text, 'positive', str(word_count)])
elif st < 0:
print '\t'.join([text, 'negative', str(word_count)])
else:
print '\t'.join([text, 'neutral', str(word_count)])
NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function). To call this UDF within Hive, run Hive code similar to this: ADD FILE /home/hive/my_py_udf.py;
SELECT
TRANSFORM (createdAt, screenName, text)
USING 'python my_py_udf.py'
AS text,
sentiment,
word_count
FROM tweets;
Hope this helps!
... View more
03-28-2017
01:39 AM
7 Kudos
In this article, I'll show how to analyze a real-time data stream using Spark Structured Streaming. I wanted to provide a quick Structured Streaming example that shows an end-to-end flow from source (Twitter), through Kafka, and then data processing using Spark. To accomplish this, I used Apache NiFi (part of Hortonworks HDF) to capture the Twitter data and send it to Apache Kafka.
From here, Spark was used to consume each Twitter payload (as JSON), parse, and analyze the data in real-time.
Before I jump into the technical details, it's good to understand some of the business value of this process. There are many practical applications based on this technology stack as well as the code that I describe below. Here are a few of the business applications that can be implemented using this technology: Security: Real-time log analysis allows organizations to extract IP addresses, ports, services and events. This data can then be aggregated based on a time window, analyzed, and monitored for abnormal activity. Spark Structured Streaming also supports real-time joins with static data, further enriching the logs by incorporating external data such as location, detailed user information, and historical data. Sensors & IoT: When working with sensors, out-of-order data is a challenge. The order of readings is critical for identifying patterns and behavior within an environment. One of the goals of Spark Structured Streaming is to maintain order using "watermarking", which enables the engine to automatically track the current event time within the data and attempt to clean up the old state accordingly. Web Analytics: Structured streaming can be used to route, process, and aggregate clickstream data from your website. This analysis can feed external systems in order to proactively send notifications to users, launch a web form, or trigger an action in a 3rd party system. Call Center: Identify trends related to call volume, response times, emerging topics, at-risk customers and cross-sell opportunities. Spark is capable of processing both the structure and unstructured call records to address these business needs.
Social Media: Analyze social feeds in real-time to detect influencers, trending topics, abnormal volume, or other indicators. All of this can be monitored and aggregated within a defined time window. The example below will go into more details related to this specific use case.
Here are the technical details associated with this application: Step 1: Connect to Twitter and stream the data to Kafka A simple NiFi flow was used to capture the Twitter data using the GetTwitter NiFi processor. This processor consumes data from the Twitter Streaming API. Using PutKafka, I was able to push the JSON payload to a Kafka topic called "dztopic1". Below is a screenshot that shows this NiFi flow: Step 2: Use Spark to read Kafka Stream Prerequisite: Before you launch Spark, make sure that you have included the required artifact / dependency as described here: spark-sql-kafka-0-10_2.11. If you want to add this via PySpark cmd line, you can run something like this: ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Spark Structured Streaming subscribes to our Kafka topic using the code shown below: # Consume Kafka topic
events = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "dztopic1") .load()
# Cast the JSON payload as a String
events = events.selectExpr("CAST(value AS STRING)")
Step 3: Define Python UDFs I created two python functions. The function, parse_json, parsed the Twitter JSON payload and extract each field of interest. The second function, convert_twitter_date, converts the Twitter created_at timestamp into a pyspark timestamp, which is used for windowing. I like using python UDFs, but note that there are other ways to parse JSON and convert the timestamp field. Function: parse_json def parse_json(df):
twitterid = str(json.loads(df[0])['id'])
created_at = str(json.loads(df[0])['created_at'])
tweet = str(json.loads(df[0])['text'])
screen_name = str(json.loads(df[0])['user']['screen_name'])
return [twitterid, created_at, tweet, screen_name]
Function: convert_twitter_date def convert_twitter_date(timestamp_str):
output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
return output_ts
Step 4: Parse JSON within Spark Once we have the JSON string, I used the two python UDFs to parse each payload, convert the timestamp, and output our relevant dataframe columns (created_at, screen_name, tweet, and create_at_ts). json_schema = StructType([
StructField("twitterid", StringType(), True),
StructField("created_at", StringType(), True),
StructField("tweet", StringType(), True),
StructField("screen_name", StringType(), True)
])
udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) .where(col("parsed_field").isNotNull()) .withColumn("created_at", col("parsed_field.created_at")) .withColumn("screen_name", col("parsed_field.screen_name")) .withColumn("tweet", col("parsed_field.tweet")) .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
Step 5: Spark Windowing Using Spark to capture trending screen_names. I chose to use the window operation to aggregate within 1 minute windows and with a slide duration of 15 seconds. # http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window
# pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
windowedCounts = jsonoutput.groupBy(
window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),
jsonoutput.screen_name
).count()
Step 6: Start Spark Structure Streaming Start the spark structure streaming query. In this case, I am launching two queries, one that contains results from our time window (query_window) and another query (query_json) that contains the parsed JSON records. Both tables are stored in-memory. Typically, you would use a different Spark sink such as writing the results back to Kafka or persisting to HDFS. For this example (and useful for debugging), I am writing the results to two in-memory tables. query_window = windowedCounts .writeStream .outputMode("complete") .format("memory") .queryName("myTable_window") .start()
query_json = jsonoutput .writeStream .outputMode("append") .format("memory") .queryName("myTable_json") .start()
Step 7: Interactively Query Structured Streaming Table Each in-memory table can be interactively queried to give the current state using standard SparkSQL syntax as shown below. Query myTable_json: Output 15 records of the in-memory table spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
Query myTable_json: Output the top 15 Twitter authors spark.sql("select screen_name, count(*) as count from myTable_json group by screen_name order by count desc limit 15").show(15,False)
Query myTable_window: Output the window aggregations spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)
References: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
https://hortonworks.com/apache/spark/ https://hortonworks.com/apache/spark/ Environment: Apache Spark 2.1.0 (Pyspark with Python 2.7.5), Apache NiFi 1.1.0, Apache Kafka 2.10-0.8.2.1
... View more
Labels:
03-17-2017
01:27 AM
2 Kudos
Hi @Sean Byrne I also had a similar question, but it's common within distributed systems to see many "part" file outputs. This is because you will typically have many partitions, across multiple nodes, writing to the same output directory (so interference is reduced). However, you can run a Spark job against this directory in order to create one single CSV file. Here's the code: # Use PySpark to read in all "part" files
allfiles = spark.read.option("header","false").csv("/destination_path/part-*.csv")
# Output as CSV file
allfiles.coalesce(1).write.format("csv").option("header", "false").save("/destination_path/single_csv_file/")
Another option would be to use format("memory") and then you could execute periodic in-memory queries against the Spark Stream. These queries could save the in-memory table to a single CSV (or other format). If I come across any way to output to a single CSV from Structure Streaming, I will be sure to post it. Hope this is helpful.
... View more
03-08-2017
05:46 PM
2 Kudos
@X Long I do not believe it was removed in Spark 2.1.0. Here's the documentation for Broadcast Variables (for scala, java, and python): http://spark.apache.org/docs/2.1.0/programming-guide.html#broadcast-variables You may also need to get the spark.sql.autoBroadcastJoinThreshold parameter, if you are running into errors. This parameter sets the max size (in bytes) for a table that will be broadcast to all worker nodes when performing a join. If you are running in to an error, can you please post that as well. Thanks!
... View more
02-27-2017
01:11 PM
2 Kudos
@Aditya Mamidala Here's a working example of foreachPartition that I've used as part of a project. This is part of a Spark Streaming process, where "event" is a DStream, and each stream is written to HBase via Phoenix (JDBC). I have a structure similar to what you tried in your code, where I first use foreachRDD then foreachPartition. event.map(x => x._2 ).foreachRDD { rdd =>
rdd.foreachPartition { rddpartition =>
val thinUrl = "jdbc:phoenix:phoenix.dev:2181:/hbase"
val conn = DriverManager.getConnection(thinUrl)
rddpartition.foreach { record =>
conn.createStatement().execute("UPSERT INTO myTable VALUES (" + record._1 + ")" )
}
conn.commit()
}
}
The full project is located here.
... View more