Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Expert Contributor

Bridging the Process Time – Event Time gap with Hive (Part 1)

Synopsis

Reconciling the difference between event time and collection/processing time is critical to understand for any system that analyses event data. This is important whether events are processed in batch or near real-time streams. This post focuses on batch processing with Hive, and demonstrates easily replicable mechanisms for bridging this gap.

We will look at the issues surrounding this and prevent two repeatable solution patterns using Hive and Hive ACID. This first post will look at the issue and present the solution using Hive only, and the follow-up article will introduce Hive ACID and a solution using that technology.

Overview

One of the most common big data ingestion cases is event data, and as IoT becomes more important, so does this use case. This is one of the most common Hadoop use cases, but I have not found many detailed step by step patterns for implementing it. In addition, I think it is important to understand some of the thinking around events, and specifically, the gap between event time and processing times.

One of the key considerations in event analysis is the difference between data collection time (process time) and the time that the event occurred (event time.) A more formal definition might be:

Event Time – The time that the event occurred

Processing Time – The time that the event was observed in the processing system

In an ideal world, these two times would be the same or very close. However, in the real world there is always some time lag or “skew”. And, this skew may be significant, and this exists whether you are processing events in batches or in near real-time.

40604-picture1.png

This skew can be caused by many different factors including

Resource Limitations – Bandwidth, CPU, etc. may not allow events to be immediately forwarded and processed.

Software Features/Limitations – Software may be intentionally programmed to queue events and send them at predetermined times. For example, cable TV boxes that report information once or twice a day, or fitness trackers that send some information, such as sleep data only daily.

Network Discontinuity – Any mobile application needs to plan for disruptions in Internet connectivity. Whether because of dead-spots in wireless coverage, airplane-mode, or dead batteries, these interruptions can happen regularly. To mitigate these, any good mobile app will queue event messages for sending the next time that a connection is available, which may be minutes or months!

Time Windows

Much of the current interest is around near real-time ingestion of event data. There are many advantages to this, but a lot of use cases only require event data to be processed in larger windows of data. That’s is the focus of the remainder of this article.

I was surprised to find a lack of posts about the mechanics of dealing with event skew and reporting by event time in batch systems. So, I wanted to layout some repeatable patterns that can be used for this.

As you probably know, event streams are essentially unbounded stream of logs. We often deal with this as a series of bounded datasets each representing some time period. Our main consideration here is a batched process that deals with large windows (15 min to 1 hour), but applies down to any level, since we almost always analyze event data by time in the target system.

The Problems

There are two main issues in dealing with this—Completeness and Restatement.

Completeness—When event data can come in for some time past the end of a time window, it is very difficult to assess the completeness of the data. Most the data may arrive within a period (hour or day) of the time window. However, data may continue to trickle in for quite some time afterwards. This presents issues of

  • Processing and combining data that arrives over time
  • Determining a cutoff when data is considered complete

As we can see in this figure, most event data is received in the few windows after the event time. However, data continues to trickle in, and in fact, 100% theoretical completeness may never be achieved! So, if we were to report on the event data at day 3 and at day 7 the results would be very different.

40602-picture2.png

Restatement—By this we mean the restatement of data that has arrived and been grouped by process time into our desired dimension of event time. This would not be an issue if we could simply scan through all the data each time we want to analyze it, but this becomes unworkable as the historical data grows. We need to find a way to process just the newly arrived data and combine it with the existing data.

Other Requirements

In addition, with dealing with our two main issues, we want to a solution that will

Be Scalable – Any solution must be able to scale to large volumes of data, particularly as event history grows over time. Any solution that relies on completely reprocessing the entire set of data will quickly become unworkable.

Provide the ability to reprocess data – Restating event data by Event Time is pretty straightforward if everything goes right. However, if we determine that source data was corrupt or needs to be reloaded for any reasons, things get messy. In that case, we potentially have data from multiple processing periods co-mingled for the same event time partition. So, to reprocess a process period, we need to separate out those rows for the process period and replace them, while leaving the other rows in the partition intact. Not always an easy task with HDFS!

As an aside, to reprocess data, you need to keep the source around for a while. Pretty obvious, but just saying!

Sample Use Case and Data

For an example use case we will use events arriving for a mobile device representing streaming video viewing events. For this use case, we will receive a set of files hourly and place them in a landing folder in HDFS with an external Hive table laid on top. The processing (collection) time is stamped into the filename using the format YYYYMMDDHH-nnnn.txt. This external table will contain one period’s data at a time and serves as an initial landing zone.

We are also going to assume that we need to save this data in detail, and that analysis will be done directly on the detailed data. Thus, we need to restate the data by event time in the detail store.

Raw Input Source Format

Of particular interest is the event_time columns which is an ISO timestamp in the form:

YYYY-MM-DDTHH:MM:SS.sssZ

CREATE EXTERNAL TABLE video_events_stg (
  device_id string,
  event_type string,
  event_time string,
  play_time_ms bigint,  
  buffer_time_ms bigint) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/landing/video_events_stg'; 
https://raw.githubusercontent.com/screamingweasel/sample-data/master/schema/video_events_stg.hql

Detailed Table Format

CREATE TABLE video_events (
  device_id string,
  event_type string,
  event_time string,
  play_time_ms bigint,
  buffer_time_ms bigint)
PARTITIONED BY (
   event_year string,
   event_month string,
   event_day string,
   event_hour string,
  process_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
wget https://raw.githubusercontent.com/screamingweasel/sample-data/master/schema/video_events.hql

Sample Data

I have put together three files, each containing one hour of processing data. You can pull them from GitHub and load the first hour into hdfs.

mkdir -p /tmp/video
cd /tmp/video
wget
https://raw.githubusercontent.com/screamingweasel/sample-data/master/video/2017011200-00001.txt
wget
https://raw.githubusercontent.com/screamingweasel/sample-data/master/video/2017011201-00001.txt
wget https://raw.githubusercontent.com/screamingweasel/sample-data/master/video/2017011202-00001.txt
hadoop fs -rm -r /landing/video_events_stg
hadoop fs -mkdir -p /landing/video_events_stg
hadoop fs -put /tmp/video/2017011200.00001.txt /landing/video_events_stg/

Solutions

Let’s look at two possible solutions that meet our criteria above. The first utilizes Hive without the newer ACID features. The second post in this series details how to solve this using Hive ACID. Per our requirements, both will have to restate the data as it is ingested into the detailed Hive table and both must support reprocessing of data.

Solution 1

This solution uses pure Hive and does not rely on the newer ACID transaction feature. As noted one hour’s worth of raw input may contain data from any number of event times. We want to reorganize this and store it in the detailed table partitioned by event time for easy reporting. This can be visualized as:

40605-picture3.png

Loading Restatement

We are going to achieve this through Hive Dynamic Partitioning. Later versions of Hive (0.13+) support efficient dynamic partitioning that can accomplish this. Dynamic partitioning is, unfortunately, a bit slower than inserting to a static fixed partition. Our approach of incrementally ingesting should mitigate this, but you would need to benchmark this with your volume.

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=true;
INSERT INTO TABLE default.video_events
PARTITION (event_year, event_month, event_day, event_hour, process_time)
SELECT device_id,event_type,
CAST(regexp_replace(regexp_replace(event_time,'Z',''),'T',' ') as
timestamp) as event_time,
play_time_ms,
buffer_time_ms,
substr(event_time,1,4)  AS event_year,
substr(event_time,6,2)  AS event_month,
substr(event_time,9,2)  AS event_day,substr(event_time,12,2) AS event_hour,substr(regexp_extract(input__file__name, '.*\/(.*)', 1),1,10) AS process_timeFROM default.video_events_stg;

You can see from a “show partitions” that three partitions were created, one for each event time period.

Show partitions default.video_events;
event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200
event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200
event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200
Now let’s process the rest of the data and see the results:
hadoop fs -rm -skipTrash /landing/video_events_stg/*
hadoop fs -put /tmp/video/2017011201-00001.txt /landing/video_events_stg/
hive -f video_events_insert.hql
hadoop fs -rm -skipTrash /landing/video_events_stg/*
hadoop fs -put /tmp/video/2017011202-00001.txt
/landing/video_events_stg/
hive -f video_events_insert.hql

show partitions default.video_events;

  • event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011201
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011201
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011202
  • event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011201
  • event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011202
  • event_year=2017/event_month=01/event_day=12/event_hour=01/process_time=2017011202

select count(*) from default.video_events

3000

So, we can see that our new data is being nicely added by event time. Note that now there are multiple partitions for the event hour, each corresponding to a processing event. We will see how that is used in the next section.

Reprocessing

In order to reprocess input data for a specific process period, we need to be able to identify that data in the restated detail and remove it before reprocessing. The approach we are going to take here is to keep the process period as part of the partition scheme, so that those partitions can be easily identified.

In this case, the partitioning would be:

  • Event Year
  • Event Month
  • Event Day
  • Event Hour
  • Process Timestamp (concatenated)

Ex.

year=2017/month=01/day=10/hour=01/process_date=2017011202

year=2017/month=01/day=12/hour=01/process_date=2017011202

year=2017/month=01/day=12/hour=02/process_date=2017011202

This makes it fairly simple to reprocess a period of source data.

  • 1.List all the partitions of the table and identify ones from the specific processing hour to be reprocessed.
  • 2.Manually drop those partitions.
  • 3.Restore the input data and reprocess the input data as normal

Let’s assume that the data for hour 2017-01-12 01 was incorrect and needs reprocessed. From the show partitions statement, we can see that there are three partitions containing data from that processing time.

  • event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011201
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011201
  • event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011201

Let’s drop ‘em and see what we get

ALTER TABLE default.video_events DROP PARTITION (event_year='2017',event_month='01',event_day='1',event_hour='22',process_time='2017011201');
ALTER TABLE default.video_events DROP PARTITION (event_year='2017',event_month='01',event_day='11',event_hour='23',process_time='2017011201');
ALTER TABLE default.video_events DROP PARTITION (event_year='2017',event_month='01',event_day='12',event_hour='00',process_time='2017011201');

show partitions video_events;

  • event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011202
  • event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011202
  • event_year=2017/event_month=01/event_day=12/event_hour=01/process_time=2017011202

select count(*) from default.video_events

2000

Now, finally let’s put that data back and reprocess it.

hadoop fs -rm -skipTrash /landing/video_events_stg/*
hadoop fs -put /tmp/video/2017011201-00001.txt /landing/video_events_stg/
hive -f video_events_insert.hql

show partitions default.video_events;

  • event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011201
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011201
  • event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011202
  • event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011201
  • event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011202
  • event_year=2017/event_month=01/event_day=12/event_hour=01/process_time=2017011202

select count(*) from default.video_events

3000

Comments on this Solution

One drawback of this solution is that you may end up with small files as event trickle in for older event times. For example, if you only get a handful of events that come in 4 weeks after the event time, you are going to get some very small files, indeed! Our next solution will overcome that issue by using Hive ACID.

Conclusion

When handling event data, we must always be aware of the skew between event time and processing time in order to provide accurate analytics. Our solution to restating the data in terms of event time must be scalable, performant, and allow for reprocessing of data.

We looked at one solution using plain Hive and partitioning. In the next of this series we will look at Hive ACID transactions to develop a more advanced and simpler solution.

Accompanying files can be found at:

https://github.com/screamingweasel/articles/tree/master/event_processing_part_1


picture1.png
1,830 Views
0 Kudos
Version history
Last update:
‎08-17-2019 10:50 AM
Updated by:
Contributors