Created on 04-28-2017 06:19 PM
Interest in Time Series data bases and historian augmentation/replacement is currently at an all-time high with the explosion of IoT use cases. So many devices sending out unbounded streams of data provides data scientists opportunities for predictive failure analysis, there needs to be a robust scalable architecture to land that and at the same time provide a responsive environment that allows a high numbers of concurrent users access to this data.
This article discusses how the union of features that exist in Apache Phoenix and Hive to can be used construct a robust time series oriented architecture the enables a scalable streaming ingest layer while simultaneously providing a low latency data exploration layer for data scientists and BI reports.
The examples referenced in this article are a subset of a larger CDR processing project I’ve been working with for a while now. This will focus on the downstream data collection dealing specifically with Phoenix and Hive. Data is streamed into a cdr_fact table that contains references to various dimensions such as geography, customer and vendor information and call termination details. The ETL and data enrichment is handled by the upstream Storm topology which is not addressed in this article.
The architecture described here uses Phoenix/Hbase as the high performance data ingestion layer and I’ll utilize some features here to boost read/write performance for temporally oriented data. Hive with LLAP will act as the low latency data serving layer that hosts aggregate rollups across dimensions. LLAP facilitates caching of data, increases concurrency and Hive offers superior SQL semantics and a more mature UDF eco-system. I use the Phoenix Storage Handler for Hive to bridge the communication between these two services.
I started with a Phoenix table that looks like this:
CREATE TABLE IF NOT EXISTS CDR_FACT( ID bigint not null, CALL_TS timestamp not null, GEO_ID integer, CUST_ID smallint, VEND_ID smallint, CUST_REL_ID smallint, VEND_REL_ID smallint, RTE tinyint, CNCT tinyint, EARLY_EVENT tinyint, DURATION double, I_PDD bigint, E_PDD bigint, constraint pk primary key(ID, CALL_TS ROW_TIMESTAMP)) IMMUTABLE_ROWS=true,SALT_BUCKETS=6,COMPRESSION='Snappy';
The ROW_TIMESTAMP feature is an enhancement that helps take advantage of various optimizations that HBase provides for time ranges on the store files as well as various query optimization capabilities built within Phoenix.
I chose to use SALT_BUCKETS here to evenly distribute data across the region servers of my cluster. This can improve write performance at the cost of read performance down the line but since I’m pushing ad-hoc browsing of data into the Hive layer, I found this to be a fair trade off.
In Hive, I’ll define an external table that overlays my Phoenix table using the Phoenix Storage Handler for Hive. This allows Hive to read data directly from Phoenix tables and utilizes some optimizations built into the Phoenix layer while doing so. The storage handler .jar isn’t configured into Hive’s classpath by default so you will need to follow the instructions on the Phoenix website to set this up.
create external table if not exists CDR_FACT ( id_col bigint, call_ts timestamp, geo_id int, cust_id smallint, vend_id smallint, cust_rel_id smallint, vend_rel_id smallint, rte tinyint, cnct tinyint, early_event tinyint, duration double, i_pdd bigint, e_pdd bigint) STORED BY 'org.apache.phoenix.hive.PhoenixStorageHandler' TBLPROPERTIES ( "phoenix.table.name" = "CDR_FACT", "phoenix.zookeeper.quorum" = "zk1.field.hortonworks.com", "phoenix.zookeeper.znode.parent" = "/hbase-unsecure", "phoenix.zookeeper.client.port" = "2181", "phoenix.rowkeys" = "ID, CALL_TS", "phoenix.column.mapping" = "id_col:ID, call_ts:CALL_TS, geo_id:GEO_ID, cust_id:CUST_ID, vend_id:VEND_ID, cust_rel_id:CUST_REL_ID, vend_rel_id:VEND_REL_ID, rte:RTE, cnct:CNCT, early_event:EARLY_EVENT, duration:DURATION, i_pdd:I_PDD, e_pdd:E_PDD");
Note the case sensitive column name mapping between Hive and Phoenix columns. If you don’t get this right, you won’t see an exception when creating the table, but you will get a ColumnNotFoundException when you go to query.
You can actually create a Phoenix table managed by Hive, but in my case the Phoenix table was already in place and populated before I started working on this. I also like the flexibility that comes with being able to drop the Hive overlay without affecting the underlying Phoenix table. Doing it this way may also offer additional Phoenix configuration options not exposed through the Hive interface.
The next step here is to periodically fetch data from my Phoenix table and rollup summaries across the various dimensions then store those off in Hive managed ORC tables to be served through LLAP. I created a Hive UDF called timemachine to aid in creating these rollups. This also provides a mechanism for me to poll latent records from Phoenix as well so my rollups can pick up records which may have arrived late to my Phoenix data store.
hive> create temporary function timemachine as 'com.github.rtempleton.HiveUDFs.TimeMachine' using jar 'hdfs:///udf/HiveUDFs-0.0.1-SNAPSHOT.jar';
Here I group records into 5 minute “buckets” by geography, customer id and vendor id and store the results into a temporary table. This queries the trailing 60 minutes of data to accommodate any records which may have arrived late.
hive> create temporary table cdr_summary_temp as select timemachine(call_ts, 5, 0) as call_ts, geo_id, cust_id, vend_id, count(*) as cnt, sum(cnct) as sum_cnct, sum(early_event) as sum_ee, sum(rte) as sum_rte, max(rte) as max_rte, sum(duration) as sum_dur, sum(i_pdd) as sum_ipdd, sum(e_pdd) as sum_epdd from cdr_fact where call_ts >= unix_timestamp(timemachine(current_timestamp, 5, -12)) group by timemachine(call_ts, 5, 0), geo_id, cust_id, vend_id; Table default.cdr_summary_temp stats: [numFiles=1, numRows=68707, totalSize=661741, rawDataSize=4031686] OK Time taken: 33.769 seconds
Note the unix_timestamp() function needed to wrap our timemachine UDF call. This is due to the way Phoenix serializes timestamps so we need to account for that when applying the predicate.
At this point my summary data is stored in a temp table which may or may not contain different values from previously calculated rollups due to late arriving records. I’ll use the new SQL Merge functionality in Hive to move these records into permanent Hive managed table that is backed by ORC. Using ORC as the storage format will allow this table to benefit from the caching features offered by LLAP.
I know the users will tend to view this data along the geography dimension but due to the high cardinality of the geography id, I will bucket on this rather than partition my table by this value.
hive> CREATE TABLE cdr_summary( call_ts timestamp, geo_id int, cust_id smallint, vend_id smallint, cnt bigint, sum_cnct bigint, sum_ee bigint, sum_rte bigint, max_rte tinyint, sum_dur double, sum_ipdd bigint, sum_epdd bigint) CLUSTERED BY (geo_id) INTO 32 BUCKETS stored as ORC TBLPROPERTIES ("orc.compress"="ZLIB", "transactional"="true");
Obviously this create statement is only a one-time step but now with this table in place I can go ahead and execute my merge statement.
hive> merge into cdr_summary as t using cdr_summary_temp as s on s.call_ts = t.call_ts and s.geo_id = t.geo_id and s.cust_id = t.cust_id and s.vend_id = t.vend_id when matched then update set cnt = s.cnt, sum_rte = s.sum_rte, max_rte = s.max_rte, sum_cnct = s.sum_cnct, sum_ee = s.sum_ee, sum_dur = s.sum_dur, sum_ipdd = s.sum_ipdd, sum_epdd = s.sum_epdd when not matched then insert values (s.call_ts, s.geo_id, s.cust_id, s.vend_id, s.cnt, s.sum_cnct, s.sum_ee, s.sum_rte, s.max_rte, s.sum_dur, s.sum_ipdd, s.sum_epdd); Table default.cdr_summary stats: [numFiles=256, numRows=0, totalSize=3066701, rawDataSize=0] OK Time taken: 26.313 seconds
You can see the execution time on the rollup plus the the merge statement is right around a minute so it’s conceivable to set this up on schedule to poll every few minutes in order to keep the summary table up to date. I can encapsulate the UDF declaration, rollup and merge queries into a SQL script and schedule this with Oozie or a local cron job. I used a temporary table in roll-up query to specifically avoid having to clean up after the fact.
Now that I have some data to work with, an interesting metric to track in our summarized data set is Answer Seizure Ratio (ASR) which is a measure of completed calls to total routing attempts. This can be used to determine how efficient a telco routing network is and help determine when downstream vendors aren’t completing calls. This info will allow admins to make adjustments to routing tables in trouble areas. We can look at this over time and across area codes. I will put the data in one hour buckets.
hive> select timemachine(s.call_ts, 60, 0), g.state, g.npa, sum(cnt) as totalRouteAttempts, sum(s.sum_cnct)/sum(cnt) as ASR, sum(s.sum_cnct)/(sum(cnt)-sum(s.sum_ee)) as AdjustedASR from cdr_summary s inner join geography_dim g on s.geo_id = g.geo_id where s.call_ts >= timemachine(current_timestamp, 60, -24) group by timemachine(s.call_ts, 60, 0), g.state, g.npa; 2017-04-27 18:00:00 AB 403 34 0.20588235294117646 0.22580645161290322 2017-04-27 18:00:00 AB 780 117 0.358974358974359 0.4158415841584158 2017-04-27 18:00:00 AK 907 198 0.3383838383838384 0.6836734693877551 2017-04-27 18:00:00 AL 205 158 0.5189873417721519 0.6356589147286822 2017-04-27 18:00:00 AL 251 115 0.5391304347826087 0.6739130434782609 2017-04-27 18:00:00 AL 256 106 0.44339622641509435 0.5662650602409639 2017-04-27 18:00:00 AL 334 130 0.4307692307692308 0.5137614678899083 2017-04-27 18:00:00 AR 479 50 0.5 0.6097560975609756 2017-04-27 18:00:00 AR 501 83 0.4457831325301205 0.6166666666666667
This executes against the abbreviated dataset rather than the full fidelity fact table so the result is almost immediate. A BI report riding on top of this will quickly be able to locate problem areas and allow us to drill in to identify the troublesome vendor.
hive> select s.vend_id, sum(s.cnt) as totalRouteAttempts, sum(s.sum_cnct) as totalConnects, sum(s.sum_cnct)/sum(cnt) as ASR from cdr_summary s inner join geography_dim g on s.geo_id = g.geo_id where timemachine(s.call_ts, 60, 0) = '2017-04-27 18:00:00' and g.npa = '907' group by vend_id order by totalRouteAttempts desc; 9154 89 15 0.16853932584269662 8573 41 29 0.7073170731707317 9172 27 0 0.0 8805 17 12 0.7058823529411765 9171 14 10 0.7142857142857143 8737 1 1 1.0 9149 1 0 0.0
I’ve left out the vendor details on purpose but this could be easily added via a second join. I’ll point out here too that the dimension tables referenced in these queries are externally defined Hive tables over Phoenix so it’s possible to join between Hive managed tables and those residing in Phoenix. In the example above we see that vendor 9154 was given 89 calls to route but only succeeded in connecting 15 of those and vendor 9172 connected 0 of 27 route attempts. With this information, the user can modify the call routing table to temporarily yank the vendors or push them further down the list. This should result in the ASR for the given area code to come back within an acceptable range. The customer could also begin to a vendor’s historic performance across for optimal route placement without ever having to drop down into the high fidelity fact table. However, when such a query is necessary, the customer would have well defined predicates to apply to efficiently execute against that data in Phoenix.
There are a number of enhancements that exist in Phoenix (secondary indexes), Hbase (date tiered compaction, read/write buffer allocation) and Hive (table partitioning) not addressed in this article but I think this architecture serves as a great jumping off point for building a time series oriented data collection and processing solution. This design allows you to leverage the strengths of both systems; Hbase for fast and scalable ingest and Hive with LLAP for high concurrency and low latency ad-hoc data access.