Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Rising Star

The Hive Streaming API enables the near real-time data ingestion into Hive. This two part posting reviews some of the design decisions necessary to produce a health Hive Streaming ingest process from which you can in a near real-time execute queries on the ingested data.

Implementing a real-time Hive Streaming example

Implementing a Hive Streaming data feed requires we make tradeoffs between the load on the NameNode versus the business SLAs for low latency data queries. Hive Streaming is able to work in a near real-time basis through the creation of a new ‘delta’ file on a bucketed Hive table with every table commit. For example, a Hive Streaming client is committing 1 row every second (it can actually commit at much faster rates), then after 1 minute there would exist 60 new delta files added o HDFS. After a day, there would be 86,400 new delta files and 15.5 million after 6 months just on your streamed data. With this many new HDFS Hive delta files, eventually the Hadoop cluster would encounter NameNode problems.

When implementing your Hive Streaming solution it is important to keep in mind the following table list 4 of the most significant implementation concerns.

Implementation Concern Design Decisions to address concern
Need to reduce the latency between ingestion and availability for queries
  • Decrease the number of rows per commit. Consequence – there will most likely be more delta files created which will impact the NameNode.
  • Reduce the column count on streamed table to reduce the amount of data moved around during compaction.
NameNode reaching capacity
  • Increase the number of rows per commit. Consequence
  • Reduction in the number of delta files within HDFS at any point in time, but there could be a longer lag before the data will be accessible to Hive Queries.
  • Based on resources availability schedule regular major compactions. Consequence – If there exists a lot of data to compact and limited cluster sources, it could interfere with the performance of other jobs.
  • Consider reducing the bucket and/or partition counts defined to the table in order to reduce the number of temporary delta files stored in HDFS in between compactions.
Major compactions will consume too much of the cluster’s resources
  • Assign the compaction to its own resource manager queue.
  • Consider reducing the bucket and/or partition counts defined to the table in order to reduce the number of file moves required during compaction.
  • Run compactions on a scheduled or manual basis during off-hours rather than allow automatic execution.
Need to maximize per thread throughput
  • Increase the number of rows per commit operations. This recommendation may seem contradictory, but it is true that data throughput is maximized by committing more rows at one time and data availability is maximized by decreasing the number of committed rows at one time.
  • A Hive streaming example with the Hive API

    Let’s apply the above Hive Streaming design considerations in a simple example to write 10 columns of data at a rate of over 1 row per 100ms and see how the above concepts can apply to reality.

    Hive Streaming Required Configuration Settings

    The following table represents a list of settings available in Hive 1.2 (HDP 2.4) and which are required to support the ability to select Hive Streaming for your tables and which will support our example’s requirements.

    Configuration Value Description
    Hive.support.concurrency true
    Hive.enforce.bucketing true Ensures that hive correctly assigns the streamed row into the correct Hive bucket.
    Hive.exec.dynamic.partition.mode nonstrict Allows for the dynamic creation of partitions. Be careful that the streamed data ingested a low cardinality.
    Hive.txn.manager org.apache.hadoop.hive.ql.lockmgr.DbTxnManager The class to process the hive transactions necessary to support Hive streaming
    Hive.compactor.initiator.on True ???
    Hive.compactor.worker.threads 1 Controlls the number of worker threads controlling the compaction process.
    Hive.txn.timeout Default 300 seconds Sometimes it is necessary to increase the value to avoid a large number of aborted transactions when the cluster is heavily taxed.

    Creating the destination table for the Hive stream

    Now that the system is configured to support Hive Streaming, it is time to create a transactional Hive table. The Hive table requirements to support Hive Streaming include:

    • Hive table must be stored in ORC format
    • Hive table must be bucketed. If desired, the table may also support partitioning along with the bucket definition.
    • There must be sufficient disk temporary space to support compaction operations
    • Need to specify the table as ‘transactional’
    CREATE TABLE acidtest (a INT, b1 STRING, b2 STRING,
    b3 STRING,b4 STRING,b5 STRING,
    b6 STRING,b7 STRING,b8 STRING,
    b9 STRING)
    CLUSTERED BY(a) INTO 4 BUCKETS
    STORED AS ORC
    tblproperties("transactional"="true")";
    

    In our example above, we see a CREATE TABLE nearly identical to most of the other Hive tables we have already created with one notable exception; we are specifying a table property of “transactional” equals to true. This property is necessary in order to tell Hive to utilize the transactional features to support streaming in Hive.

    The BUCKET count of 4 was selected to keep the number of delta bucket files to a minimum yet still support table JOINS to other tables such as CUSTOMER_INFORMATION table with 8 buckets. As 4 is a multiple of the 8 buckets found in the JOINed table, we will also with this schema be able to support real-time table BUCKET JOINs.

    In different experiments performed to research for this blog, it was found that dropping the “transactional introduces inconsistent results. The problem can be in some instances it will appear to work, so make absolutely certain that the “transactional” property is defined to your Hive streaming table.

    Establishing the Hive Connections

    Now that we have our destination Hive Streaming table created and available to Hive, it is now time to start creating our Hive Streaming client. For this blog posting to illustrate the concepts behind Hive Streaming, we will use the base Hive Streaming API. But, be aware that both Flume and Storm have components to support at a higher level the Hive Streaming process.

    For the example the next sections will walk through the full code and support files are available through in the github WriteHDFS.java available for your download.

    Our first step to create our streaming client is to define the HiveEndPoint connection to the thrift service.

    HiveEndPoint("thrift://"+hostname+":9083", "HIVE_DATABASE","HIVE_TABLE_NAME", null);

    The hostname and port name to configure for our connection can be retrieved from within Ambari by searching for the property value ‘hive.metastore.uris’, and should not be confused with the hiveserver2 hostname.

    Hive Streaming Writers

    Once the endpoint connection established we need to assign it to our streaming writer and specify the columns in our destination table.

    At the time of this writing, there are 3 writer classes available:

    • DelimitedInputWriter (the writer used for this blog’s example) – Outputs the column values as a delimited string where you can specify the desired delimiter.
    • StrictJsonWriter – Converts the JSON string passed to the writer into an Object using JsonSerde.
    • AbstractRecordWriter – A base class available to create your own customer writers
    String[] colFlds = new String[]{"a", "b1", "b2", "b3", "b4","b5", "b6", "b7", "b8", "b9"};
    writer = new DelimitedInputWriter(colFlds, ",", endPt);

    The DelimitedInputWriter which we are using for this example contains 3 parameters:

    • A String[] array containing a list of the column names defined to the table. You need to make certain that these column names match up with the CREATE TABLE columns or you could encounter some errors.
    • The string delimiter between each field. In this example, we are just doing a comma separated list.
    • The HiveEndPoint connection reference

    Defining the Hive Transactions and writing to the stream

    Our next step is to open a collection of batch ids to use for the hive streaming. Hive defaults to 1,000 open transactions at one time, though this value may be specified by changing the hive property ‘hive.txn.max.open.batch’. Once there exists a pool of open transactions, the ‘beginNextTransaction()’ operation is able to start a transaction for Hive Streaming to write. Keep in mind that attempting to call ‘beginNextTransaction()’ when there are no more remaining transactions will produce an error, so it is useful to check ‘remaining Transactions()’ is greater than 0 before trying to start a new transaction.

    TransactionBatch txnBatch =
    connection.fetchTransactionBatch(maxBatchGroups, writer);
    txnBatch.beginNextTransaction();
    
    for (int i = 0; i < writeRows; ++i) {
        writeStream(i, connection, writer, txnBatch, **false**);
        if (currentBatchSize < 1) {
            System.out.println(">"+threadId
                " Beginning Transaction Commit:"+i+" Transaction State:"
                +txnBatch.getCurrentTransactionState());
            writer.flush();
            txnBatch.commit();
    
            if (txnBatch.remainingTransactions() > 0) {
                System.out.println(">"+threadId+" ->"
                    +i+" txnBatch transactions remaining:"
                    + txnBatch.remainingTransactions());
                txnBatch.beginNextTransaction();
                currentBatchSize = maxBatchSize;
            } else {
                System.out.println(">"+threadId
                    +" Refreshing the transaction group count");
                txnBatch = connection.fetchTransactionBatch(maxBatchGroups,writer);
                txnBatch.beginNextTransaction();
                currentBatchSize = maxBatchSize;
            }
        }
        --currentBatchSize;
    }
    writer.flush();
    txnBatch.commit();
    txnBatch.close();

    Some important implementation details to keep in mind include:

    • The data is not available for queries until after the flush() and commit() operations have completed.
    • When the fetchTransactionBatch operation executes the system opens ‘maxBatchGroups’ count transactions. In order to close them, make certain to execute the txnBatch.close() at the end of Hive Stream method as shown at the bottom of the example.

    Physical Data Changes as the Hive Streaming example executes

    To really understand how Hive Streaming works, it is useful to monitor the effect each step of the flow on the HDFS physical storage layer. There are two physical measures to look at as the Hive Streaming process executes:

    • Count and size of the delta files
    • Number of rows accessible to Hive Queries

    Looking at the physical data flow, there are 4 high level steps described below which should clarify how Hive Streaming impacts the processing flow.

    Step 1: After the table create and before the commit() operation:

    Right before the commit() operation and right after the flush, you will start to see the delta files appear in the warehouse subdirectory for your hive stream table.

    [hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest
    -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 17:12     /apps/hive/warehouse/acidtest/_orc_acid_version
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:12     /apps/hive/warehouse/acidtest/delta_3238501_3239500
    -rw-r--r-- 3 mjohnson hdfs 2792 2016-03-27 17:12  /apps/hive/warehouse/acidtest/delta_3238501_3239500/bucket_00000
    -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:12     /apps/hive/warehouse/acidtest/delta_3238501_3239500/bucket_00000_flush_length

    We can see that in bucket #0 2792 bytes of temporary data has been written to HDFS. Though at this point in time because the commit() has not run, you will see 0 record count upon executing a select count(*) from actidtest.

    Step 2: Immediately after the ‘commit()’ instruction

    However, as soon as the commit has completed, then we find that the ‘select count(*)…’ now returns 10,0001 rows as being part of the acidtest table. There are also 999 transactions remaining, so we do not need to call the method connection.fetchTransactionBatch(maxBatchGroups, writer); yet.

    Step 3: ‘maxBatchGroups’ commit() operations have completed, and no more open transactions

    Now that the maxBatchGroups (generally 1,000) commit() operations have completed and there are no more remainingTransactions, it is now necessary to pull an additional group of Transaction batches by calling ‘‘connection.fetchTransactionBatch( maxBatchGroups, writer);’ again. If we execute the command ‘SHOW TRANSACTIONS;’ before and after calling ‘fetchTransactionBatch’ we would see that the total number of OPEN transactions would increase by roughly the amount of the ‘maxBatchGroups’ variable value.

    As the stream continues to ingest data you will notice that the simple delta file list displayed below now has duplicated references to the same bucket (bucket #0).

    [hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest
    -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 17:35     /apps/hive/warehouse/acidtest/_orc_acid_version
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3296501_3297500
    -rw-r--r-- 3 mjohnson hdfs 1521 2016-03-27 17:37  /apps/hive/warehouse/acidtest/delta_3296501_3297500/bucket_00000
    -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3296501_3297500/bucket_00000_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3297501_3298500
    -rw-r--r-- 3 mjohnson hdfs 1545 2016-03-27 17:37  /apps/hive/warehouse/acidtest/delta_3297501_3298500/bucket_00001
    -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3297501_3298500/bucket_00001_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3298501_3299500
    -rw-r--r-- 3 mjohnson hdfs 1518 2016-03-27 17:37  /apps/hive/warehouse/acidtest/delta_3298501_3299500/bucket_00000
    -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3298501_3299500/bucket_00000_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3299501_3300500
    -rw-r--r-- 3 mjohnson hdfs 1519 2016-03-27 17:37  /apps/hive/warehouse/acidtest/delta_3299501_3300500/bucket_00000
    -rw-r--r-- 3 mjohnson hdfs 8 2016-03-27 17:37     /apps/hive/warehouse/acidtest/delta_3299501_3300500/bucket_00000_flush_length
    

    As we see in the file listing above, there are 3 delta files for the transaction range of 3299501-3300500 and bucket #0. This is because there were multiple commits assigning values to bucket #0 for the open transaction range. If we increased the number of rows included in each commit as mentioned earlier in the Implementation section of this blog or played around with the maxBatchGroups value, we might actually see fewer of these small temporary delta files.

    The good news is while the hive streaming commit operations are adding new delta files, the compaction functionality is also running in the back ground trying to consolidate all of these files for more efficient Namenode operations and queries.

    Step 4: Completion of the Hive Stream process

    In many instances, the Hive Stream ingestion process never actually ends. New delta files are continually getting created in HDFS by the commit() step and the compaction processes are continually consolidating the delta files. As each transaction gets committed, it gets closed from the TRANSACTIONS list.

    In a healthy system, you will find that over time the number of transactions reported by the command ‘SHOW TRANSACTIONS’ should be less than the value you have assigned to ‘maxBatchGroups’. If the number is greater than the ‘maxBatchGroups’ then it is possible that for one of the transactional tables that the ingest process was stopped without executing a ‘txnBatch.close();’ operation which returns the unused transactions.

    Hive Streaming Compaction

    Hive Compaction Design considerations

    The Compaction process has a set of cleaner processes running in the background during the ingest process looking for opportunities to compact the delta files based on the rules you specify.

    The first thing to keep in mind is that there are two forms of Compaction; ‘minor’ and ‘major’. A ‘minor’ compaction will just consolidate the delta files. This approach does not have to worry about consolidating all of the delta files along with a large set of base bucket files and is thus the least disruptive to the system resources. ‘major’ compaction consolidates all of the delta files just like the ‘minor’ compaction and in addition it consolidates the delta files with the base to produce a very clean physical layout for the hive table. However, major compactions can take minutes to hours and can consume a lot of disk, network, memory and CPU resources, so they should be invoked carefully.

    To provide greater control over the compaction process and avoid impacting other processes in addition to the compactor configuration options available, it is also possible to invoke compaction automatically by the cleaner threads or manually initiated when system load is low.

    The primary compaction configuration triggers to review when implementing or tuning your compaction processes are:

    • hive.compactor.initiator.on
    • hive.compactor.cleaner.run.interval
    • hive.compactor.delta.num.threshold - Number of delta directories in a table or partition that will trigger a minor compaction.
    • hive.compactor.delta.pct.threshold - Percentage (fractional) size of the delta files relative to the base that will trigger a major compaction. 1 = 100%, so the default 0.1 = 10%.
    • hive.compactor.abortedtxn.threshold - Number of aborted transactions involving a given table or partition that will trigger a major compaction

    A Hive Compaction Manual example

    In our example we have turned off major compaction as it should only run during off load periods. We take a look at the delta files for our table in hdfs and see that there are over 300 delta files and 5 base files.

    [hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest
    -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 13:17      /apps/hive/warehouse/acidtest/_orc_acid_version
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:18      /apps/hive/warehouse/acidtest/delta_2113501_2123500
    -rw-r--r-- 3 mjohnson hdfs 482990 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2113501_2123500/bucket_00002
    -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2113501_2123500/bucket_00002_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17      /apps/hive/warehouse/acidtest/delta_2123501_2133500
    -rw-r--r-- 3 mjohnson hdfs 482784 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2123501_2133500/bucket_00001
    -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2123501_2133500/bucket_00001_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17      /apps/hive/warehouse/acidtest/delta_2133501_2143500
    -rw-r--r-- 3 mjohnson hdfs 482110 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2133501_2143500/bucket_00001
    -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2133501_2143500/bucket_00001_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17      /apps/hive/warehouse/acidtest/delta_2143501_2153500
    -rw-r--r-- 3 mjohnson hdfs 476285 2016-03-27 13:18 /apps/hive/warehouse/acidtest/delta_2143501_2153500/bucket_00000
    -rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2143501_2153500/bucket_00000_flush_length
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17      /apps/hive/warehouse/acidtest/delta_2153501_2163500
    

    A decision has been been made to run the major compaction manually during the even lull, so we execute the “ALTER TABLE {tablename} COMPACT ‘major’” command to place the compaction job into the queue for processing. A compaction resource management queue was defined with a limited quota resource, so the compaction will not impact other jobs.

    hive> alter table acidtest compact 'major';
    Compaction enqueued.
    OK
    Time taken: 0.037 seconds
    hive> show compactions;
    OK
    Database Table Partition Type State Worker Start Time
    default acidtest NULL MAJOR working server2.hdp-26 1459100244000
    Time taken: 0.019 seconds, Fetched: 2 row(s)
    hive> show compactions;
    OK
    Database Table Partition Type State Worker Start Time
    Time taken: 0.016 seconds, Fetched: 1 row(s)
    hive>;

    The outstanding table compaction jobs are visible by executing the command line “SHOW COMPACTIONS as illustrated in the example above. Or the ‘major’ compaction is also visible through the Applications history log. After the ‘major’ compaction has completed, all of the delta files available at the time the compaction was initiated will have rolled up into the ‘base’ tables.

    [hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest
    -rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 13:17      /apps/hive/warehouse/acidtest_orc_acid_version
    drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:37      /apps/hive/warehouse/acidtest/base_2213500
    -rw-r--r-- 3 mjohnson hdfs 72704 2016-03-27 13:37  /apps/hive/warehouse/acidtest/base_2213500/bucket_00000
    -rw-r--r-- 3 mjohnson hdfs 436159 2016-03-27 13:37 /apps/hive/warehouse/acidtest/base_2213500/bucket_00001
    -rw-r--r-- 3 mjohnson hdfs 219572 2016-03-27 13:37 /apps/hive/warehouse/acidtest/base_2213500/bucket_00002
    [hive@server1 ~]$

    The end result of this example is that 305 consolidated to just 5 files. While 300 files will not impact the NameNode performance, it will most likely improve query performance as the Hive engine will have fewer files to scan to execute the query.

    Next Steps

    After completing these examples, you should have a good idea on how to setup Hive streaming in your environment. For some additional information on Hive Streaming check out the Bibliography section at the bottom of this article.

    Bibliography

    Hopefully, the example and source code supplied with this blog posting are sufficient to get you started with Hive Streaming and avoid potential problems. In addition to this blog posting some other resources which are useful references include:

    35,179 Views
    Comments
    avatar
    Explorer

    This is really good example of how streaming API can be used with hive. I tried to run this example code outside from the edge node, It is throwing following error:

    hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=xxxx/xxx.yy.zz.135:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:534) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1601) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1342) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1295) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)

    hdfs.DFSClient: Abandoning BP-

    Does it mean that hive stream can work only from edge node?

    Thanks

    Arun

    avatar
    Explorer

    Thanks for such a nice and detailed blog. I am looking for a solution to avoid duplicate records during hive streaming. Can anybody please help me ?

    Version history
    Last update:
    ‎08-04-2016 10:37 PM
    Updated by:
    Contributors