Created on 08-04-2016 10:37 PM
The Hive Streaming API enables the near real-time data ingestion into Hive. This two part posting reviews some of the design decisions necessary to produce a health Hive Streaming ingest process from which you can in a near real-time execute queries on the ingested data.
Implementing a Hive Streaming data feed requires we make tradeoffs between the load on the NameNode versus the business SLAs for low latency data queries. Hive Streaming is able to work in a near real-time basis through the creation of a new ‘delta’ file on a bucketed Hive table with every table commit. For example, a Hive Streaming client is committing 1 row every second (it can actually commit at much faster rates), then after 1 minute there would exist 60 new delta files added o HDFS. After a day, there would be 86,400 new delta files and 15.5 million after 6 months just on your streamed data. With this many new HDFS Hive delta files, eventually the Hadoop cluster would encounter NameNode problems.
When implementing your Hive Streaming solution it is important to keep in mind the following table list 4 of the most significant implementation concerns.
Implementation Concern | Design Decisions to address concern |
---|---|
Need to reduce the latency between ingestion and availability for queries |
|
NameNode reaching capacity |
|
Major compactions will consume too much of the cluster’s resources |
|
Need to maximize per thread throughput |
Let’s apply the above Hive Streaming design considerations in a simple example to write 10 columns of data at a rate of over 1 row per 100ms and see how the above concepts can apply to reality.
The following table represents a list of settings available in Hive 1.2 (HDP 2.4) and which are required to support the ability to select Hive Streaming for your tables and which will support our example’s requirements.
Configuration | Value | Description |
---|---|---|
Hive.support.concurrency | true | |
Hive.enforce.bucketing | true | Ensures that hive correctly assigns the streamed row into the correct Hive bucket. |
Hive.exec.dynamic.partition.mode | nonstrict | Allows for the dynamic creation of partitions. Be careful that the streamed data ingested a low cardinality. |
Hive.txn.manager | org.apache.hadoop.hive.ql.lockmgr.DbTxnManager | The class to process the hive transactions necessary to support Hive streaming |
Hive.compactor.initiator.on | True | ??? |
Hive.compactor.worker.threads | 1 | Controlls the number of worker threads controlling the compaction process. |
Hive.txn.timeout | Default 300 seconds | Sometimes it is necessary to increase the value to avoid a large number of aborted transactions when the cluster is heavily taxed. |
Now that the system is configured to support Hive Streaming, it is time to create a transactional Hive table. The Hive table requirements to support Hive Streaming include:
CREATE TABLE acidtest (a INT, b1 STRING, b2 STRING, b3 STRING,b4 STRING,b5 STRING, b6 STRING,b7 STRING,b8 STRING, b9 STRING) CLUSTERED BY(a) INTO 4 BUCKETS STORED AS ORC tblproperties("transactional"="true")";
In our example above, we see a CREATE TABLE
nearly identical to most of
the other Hive tables we have already created with one notable
exception; we are specifying a table property of “transactional” equals
to true. This property is necessary in order to tell Hive to utilize the
transactional features to support streaming in Hive.
The BUCKET count of 4 was selected to keep the number of delta bucket files to a minimum yet still support table JOINS to other tables such as CUSTOMER_INFORMATION table with 8 buckets. As 4 is a multiple of the 8 buckets found in the JOINed table, we will also with this schema be able to support real-time table BUCKET JOINs.
In different experiments performed to research for this blog, it was found that dropping the “transactional introduces inconsistent results. The problem can be in some instances it will appear to work, so make absolutely certain that the “transactional” property is defined to your Hive streaming table.
Now that we have our destination Hive Streaming table created and available to Hive, it is now time to start creating our Hive Streaming client. For this blog posting to illustrate the concepts behind Hive Streaming, we will use the base Hive Streaming API. But, be aware that both Flume and Storm have components to support at a higher level the Hive Streaming process.
For the example the next sections will walk through the full code and support files are available through in the github WriteHDFS.java available for your download.
Our first step to create our streaming client is to define the HiveEndPoint connection to the thrift service.
HiveEndPoint("thrift://"+hostname+":9083", "HIVE_DATABASE","HIVE_TABLE_NAME", null);
The hostname and port name to configure for our connection can be retrieved from within Ambari by searching for the property value ‘hive.metastore.uris’, and should not be confused with the hiveserver2 hostname.
Once the endpoint connection established we need to assign it to our streaming writer and specify the columns in our destination table.
At the time of this writing, there are 3 writer classes available:
String[] colFlds = new String[]{"a", "b1", "b2", "b3", "b4","b5", "b6", "b7", "b8", "b9"}; writer = new DelimitedInputWriter(colFlds, ",", endPt);
The DelimitedInputWriter which we are using for this example contains 3 parameters:
Our next step is to open a collection of batch ids to use for the hive streaming. Hive defaults to 1,000 open transactions at one time, though this value may be specified by changing the hive property ‘hive.txn.max.open.batch’. Once there exists a pool of open transactions, the ‘beginNextTransaction()’ operation is able to start a transaction for Hive Streaming to write. Keep in mind that attempting to call ‘beginNextTransaction()’ when there are no more remaining transactions will produce an error, so it is useful to check ‘remaining Transactions()’ is greater than 0 before trying to start a new transaction.
TransactionBatch txnBatch = connection.fetchTransactionBatch(maxBatchGroups, writer); txnBatch.beginNextTransaction(); for (int i = 0; i < writeRows; ++i) { writeStream(i, connection, writer, txnBatch, **false**); if (currentBatchSize < 1) { System.out.println(">"+threadId " Beginning Transaction Commit:"+i+" Transaction State:" +txnBatch.getCurrentTransactionState()); writer.flush(); txnBatch.commit(); if (txnBatch.remainingTransactions() > 0) { System.out.println(">"+threadId+" ->" +i+" txnBatch transactions remaining:" + txnBatch.remainingTransactions()); txnBatch.beginNextTransaction(); currentBatchSize = maxBatchSize; } else { System.out.println(">"+threadId +" Refreshing the transaction group count"); txnBatch = connection.fetchTransactionBatch(maxBatchGroups,writer); txnBatch.beginNextTransaction(); currentBatchSize = maxBatchSize; } } --currentBatchSize; } writer.flush(); txnBatch.commit(); txnBatch.close();
Some important implementation details to keep in mind include:
To really understand how Hive Streaming works, it is useful to monitor the effect each step of the flow on the HDFS physical storage layer. There are two physical measures to look at as the Hive Streaming process executes:
Looking at the physical data flow, there are 4 high level steps described below which should clarify how Hive Streaming impacts the processing flow.
Step 1: After the table create and before the commit() operation:
Right before the commit() operation and right after the flush, you will start to see the delta files appear in the warehouse subdirectory for your hive stream table.
[hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 17:12 /apps/hive/warehouse/acidtest/_orc_acid_version drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:12 /apps/hive/warehouse/acidtest/delta_3238501_3239500 -rw-r--r-- 3 mjohnson hdfs 2792 2016-03-27 17:12 /apps/hive/warehouse/acidtest/delta_3238501_3239500/bucket_00000 -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:12 /apps/hive/warehouse/acidtest/delta_3238501_3239500/bucket_00000_flush_length
We can see that in bucket #0 2792 bytes of temporary data has been
written to HDFS. Though at this point in time because the commit() has
not run, you will see 0 record count upon executing a select count(*)
from actidtest
.
Step 2: Immediately after the ‘commit()’ instruction
However, as soon as the commit has completed, then we find that the
‘select count(*)…’ now returns 10,0001 rows as being part of the
acidtest table. There are also 999 transactions remaining, so we do not
need to call the method connection.fetchTransactionBatch(maxBatchGroups, writer);
yet.
Step 3: ‘maxBatchGroups’ commit() operations have completed, and no more open transactions
Now that the maxBatchGroups (generally 1,000) commit() operations have completed and there are no more remainingTransactions, it is now necessary to pull an additional group of Transaction batches by calling ‘‘connection.fetchTransactionBatch( maxBatchGroups, writer);’ again. If we execute the command ‘SHOW TRANSACTIONS;’ before and after calling ‘fetchTransactionBatch’ we would see that the total number of OPEN transactions would increase by roughly the amount of the ‘maxBatchGroups’ variable value.
As the stream continues to ingest data you will notice that the simple delta file list displayed below now has duplicated references to the same bucket (bucket #0).
[hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 17:35 /apps/hive/warehouse/acidtest/_orc_acid_version drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3296501_3297500 -rw-r--r-- 3 mjohnson hdfs 1521 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3296501_3297500/bucket_00000 -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3296501_3297500/bucket_00000_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3297501_3298500 -rw-r--r-- 3 mjohnson hdfs 1545 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3297501_3298500/bucket_00001 -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3297501_3298500/bucket_00001_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3298501_3299500 -rw-r--r-- 3 mjohnson hdfs 1518 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3298501_3299500/bucket_00000 -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3298501_3299500/bucket_00000_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3299501_3300500 -rw-r--r-- 3 mjohnson hdfs 1519 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3299501_3300500/bucket_00000 -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37 /apps/hive/warehouse/acidtest/delta_3299501_3300500/bucket_00000_flush_length
As we see in the file listing above, there are 3 delta files for the transaction range of 3299501-3300500 and bucket #0. This is because there were multiple commits assigning values to bucket #0 for the open transaction range. If we increased the number of rows included in each commit as mentioned earlier in the Implementation section of this blog or played around with the maxBatchGroups value, we might actually see fewer of these small temporary delta files.
The good news is while the hive streaming commit operations are adding new delta files, the compaction functionality is also running in the back ground trying to consolidate all of these files for more efficient Namenode operations and queries.
Step 4: Completion of the Hive Stream process
In many instances, the Hive Stream ingestion process never actually ends. New delta files are continually getting created in HDFS by the commit() step and the compaction processes are continually consolidating the delta files. As each transaction gets committed, it gets closed from the TRANSACTIONS list.
In a healthy system, you will find that over time the number of transactions reported by the command ‘SHOW TRANSACTIONS’ should be less than the value you have assigned to ‘maxBatchGroups’. If the number is greater than the ‘maxBatchGroups’ then it is possible that for one of the transactional tables that the ingest process was stopped without executing a ‘txnBatch.close();’ operation which returns the unused transactions.
The Compaction process has a set of cleaner processes running in the background during the ingest process looking for opportunities to compact the delta files based on the rules you specify.
The first thing to keep in mind is that there are two forms of Compaction; ‘minor’ and ‘major’. A ‘minor’ compaction will just consolidate the delta files. This approach does not have to worry about consolidating all of the delta files along with a large set of base bucket files and is thus the least disruptive to the system resources. ‘major’ compaction consolidates all of the delta files just like the ‘minor’ compaction and in addition it consolidates the delta files with the base to produce a very clean physical layout for the hive table. However, major compactions can take minutes to hours and can consume a lot of disk, network, memory and CPU resources, so they should be invoked carefully.
To provide greater control over the compaction process and avoid impacting other processes in addition to the compactor configuration options available, it is also possible to invoke compaction automatically by the cleaner threads or manually initiated when system load is low.
The primary compaction configuration triggers to review when implementing or tuning your compaction processes are:
In our example we have turned off major compaction as it should only run during off load periods. We take a look at the delta files for our table in hdfs and see that there are over 300 delta files and 5 base files.
[hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 13:17 /apps/hive/warehouse/acidtest/_orc_acid_version drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2113501_2123500 -rw-r--r-- 3 mjohnson hdfs 482990 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2113501_2123500/bucket_00002 -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2113501_2123500/bucket_00002_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17 /apps/hive/warehouse/acidtest/delta_2123501_2133500 -rw-r--r-- 3 mjohnson hdfs 482784 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2123501_2133500/bucket_00001 -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2123501_2133500/bucket_00001_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17 /apps/hive/warehouse/acidtest/delta_2133501_2143500 -rw-r--r-- 3 mjohnson hdfs 482110 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2133501_2143500/bucket_00001 -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2133501_2143500/bucket_00001_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17 /apps/hive/warehouse/acidtest/delta_2143501_2153500 -rw-r--r-- 3 mjohnson hdfs 476285 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2143501_2153500/bucket_00000 -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2143501_2153500/bucket_00000_flush_length drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17 /apps/hive/warehouse/acidtest/delta_2153501_2163500
A decision has been been made to run the major compaction manually during the even lull, so we execute the “ALTER TABLE {tablename} COMPACT ‘major’” command to place the compaction job into the queue for processing. A compaction resource management queue was defined with a limited quota resource, so the compaction will not impact other jobs.
hive> alter table acidtest compact 'major'; Compaction enqueued. OK Time taken: 0.037 seconds hive> show compactions; OK Database Table Partition Type State Worker Start Time default acidtest NULL MAJOR working server2.hdp-26 1459100244000 Time taken: 0.019 seconds, Fetched: 2 row(s) hive> show compactions; OK Database Table Partition Type State Worker Start Time Time taken: 0.016 seconds, Fetched: 1 row(s) hive>;
The outstanding table compaction jobs are visible by executing the command line “SHOW COMPACTIONS as illustrated in the example above. Or the ‘major’ compaction is also visible through the Applications history log. After the ‘major’ compaction has completed, all of the delta files available at the time the compaction was initiated will have rolled up into the ‘base’ tables.
[hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 13:17 /apps/hive/warehouse/acidtest_orc_acid_version drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:37 /apps/hive/warehouse/acidtest/base_2213500 -rw-r--r-- 3 mjohnson hdfs 72704 2016-03-27 13:37 /apps/hive/warehouse/acidtest/base_2213500/bucket_00000 -rw-r--r-- 3 mjohnson hdfs 436159 2016-03-27 13:37 /apps/hive/warehouse/acidtest/base_2213500/bucket_00001 -rw-r--r-- 3 mjohnson hdfs 219572 2016-03-27 13:37 /apps/hive/warehouse/acidtest/base_2213500/bucket_00002 [hive@server1 ~]$
The end result of this example is that 305 consolidated to just 5 files. While 300 files will not impact the NameNode performance, it will most likely improve query performance as the Hive engine will have fewer files to scan to execute the query.
After completing these examples, you should have a good idea on how to setup Hive streaming in your environment. For some additional information on Hive Streaming check out the Bibliography section at the bottom of this article.
Hopefully, the example and source code supplied with this blog posting are sufficient to get you started with Hive Streaming and avoid potential problems. In addition to this blog posting some other resources which are useful references include:
Created on 07-06-2017 01:49 AM
This is really good example of how streaming API can be used with hive. I tried to run this example code outside from the edge node, It is throwing following error:
hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=xxxx/xxx.yy.zz.135:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:534) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1601) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1342) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1295) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
hdfs.DFSClient: Abandoning BP-
Does it mean that hive stream can work only from edge node?
Thanks
Arun
Created on 04-20-2021 07:33 PM
Thanks for such a nice and detailed blog. I am looking for a solution to avoid duplicate records during hive streaming. Can anybody please help me ?