Created on 02-20-2016 11:43 PM - edited 08-17-2019 01:08 PM
Origin: http://henning.kropponline.de/2015/05/19/hivesink-for-flume/
With the most recent release of HDP (v2.2.4) Hive Streaming is shipped as technical preview. It can for example be used with Storm to ingest streaming data collected from Kafka as demonstrated here. But it also still has some serious limitations and in case of Storm a major bug. Nevertheless Hive Streaming is likely to become the tool of choice when it comes to streamline data ingestion to Hadoop. So it is worth to explore already today.
Flume's upcoming release 1.6 will contain a HiveSink
 capable of leveraging Hive Streaming for data ingestion. In the 
following post we will use it as a replacement for the HDFS sink used in
 a previous post here.
 Other then replacing the HDFS sink with a HiveSink none of the previous
 setup will change, except for Hive table schema which needs to be 
adjusted as part of the requirements that currently exist around Hive Streaming. So let's get started by looking into these restrictions.
The only file format supported is ORC. So the original schema of the stocks table needs to be adjusted to reflect that:
DROP TABLE IF EXISTS stocks;
CREATE EXTERNAL TABLE stocks (
    date STRING,
    open DOUBLE,
    high DOUBLE,
    low DOUBLE,
    close DOUBLE,
    volume BIGINT,
    adj_close DOUBLE)
PARTITIONED BY(year STRING)
CLUSTERED BY (date) into 3 buckets
STORED AS ORC
LOCATION '/ingest/stocks';As you can see from the schema the table now also is bucketed, which is required by Hive Streaming. Further more we need to set the following:
Configuration diff as given by Ambari:
Also important to know is that the current Streaming API only supports delimited input data (CSV, tab seperated) or JSON (strict syntax).
For the Flume Hive Sink the following configurations with their defaults can or must be configured:
With the previous example we can use the following Flume configuration. The batch size and the transactions per batch are not set very high which probably be different in a production setup, but is also dependent on the data stream to expect.
flume-hive-ingest.sources = src1
flume-hive-ingest.channels = chan1
flume-hive-ingest.sinks = sink1
flume-hive-ingest.sources.src1.type = spooldir
flume-hive-ingest.sources.src1.channels = chan1
flume-hive-ingest.sources.src1.spoolDir = /vagrant/flume_log
flume-hive-ingest.sources.src1.interceptors = skipHeadI dateI
flume-hive-ingest.sources.src1.interceptors.skipHeadI.type = regex_filter
flume-hive-ingest.sources.src1.interceptors.skipHeadI.regex = ^Date.*
flume-hive-ingest.sources.src1.interceptors.skipHeadI.excludeEvents = true
flume-hive-ingest.sources.src1.interceptors.dateI.type = regex_extractor
flume-hive-ingest.sources.src1.interceptors.dateI.regex = ^(\d+)-.*
flume-hive-ingest.sources.src1.interceptors.dateI.serializers = y
flume-hive-ingest.sources.src1.interceptors.dateI.serializers.y.name = year
flume-hive-ingest.channels.chan1.type = memory
flume-hive-ingest.channels.chan1.capacity = 1000
flume-hive-ingest.channels.chan1.transactionCapacity = 100
flume-hive-ingest.sinks.sink1.type = hive
flume-hive-ingest.sinks.sink1.channel = chan1
flume-hive-ingest.sinks.sink1.hive.metastore = thirft://one.hdp:9083
flume-hive-ingest.sinks.sink1.hive.database = default
flume-hive-ingest.sinks.sink1.hive.table = stocks
flume-hive-ingest.sinks.sink1.hive.partition = %{year}
flume-hive-ingest.sinks.sink1.hive.txnsPerBatchAsk = 2
flume-hive-ingest.sinks.sink1.batchSize = 10
flume-hive-ingest.sinks.sink1.serializer = delimited
flume-hive-ingest.sinks.sink1.serializer.delimiter = ,
flume-hive-ingest.sinks.sink1.serializer.fieldnames = date,open,high,low,close,volume,adj_closeBefore starting a Flume agent with this configuration you might need to set HIVE_HOME and HCAT_HOME as flume-ng will only put the required Hive jars into the classpath with this logic:
add_hive_paths(){
  if [ -d "${HIVE_HOME}/lib" ]; then
    info "Including Hive libraries found via ($HIVE_HOME) for Hive access"
    FLUME_CLASSPATH="$FLUME_CLASSPATH:$HIVE_HOME/lib/*"
  fi
  if [ -d "${HCAT_HOME}/share/hcatalog" ]; then
    info "Including HCatalog libraries found via ($HCAT_HOME) for Hive access"
    FLUME_CLASSPATH="$FLUME_CLASSPATH:${HCAT_HOME}/share/hcatalog/*"
  fi
}Setting them in my case was pretty straight forward:
export HIVE_HOME=/usr/hdp/current/hive-server2 export HCAT_HOME=/usr/hdp/current/hive-webhcat
Now we can start the flume agent, obviously after we have created the stocks table:
$ hcat -f data/stocks_schema.hive $ apache-flume-1.6.0-bin/bin/flume-ng agent -f data/flume-file-hive-ingest.conf -n flume-hive-ingest
When working correctly you should be able to see output similar to this, once you copy the stocks data into the spooling directory:
16/05/15 15:19:18 INFO ql.Driver: OK
16/05/15 15:19:18 INFO log.PerfLogger: <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:18 INFO log.PerfLogger: </PERFLOG method=releaseLocks start=1431703158539 end=1431703158543 duration=4 from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:18 INFO log.PerfLogger: </PERFLOG method=Driver.run start=1431703158452 end=1431703158543 duration=91 from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:20 INFO hive.metastore: Trying to connect to metastore with URI thirft://one.hdp:9083
16/05/15 15:19:20 INFO hive.metastore: Connected to metastore.
16/05/15 15:19:20 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[1743...1744] on endPoint = {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1996] }. Switching to first txn
16/05/15 15:19:20 INFO hive.HiveWriter: Committing Txn 1742 on EndPoint: {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1997] }
16/05/15 15:19:20 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[1745...1746] on endPoint = {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1997] }. Switching to first txnIf something goes wrong, for example with failing connection to the metastore please:
A typical error message could look like this:
16/05/15 14:53:39 WARN hive.HiveSink: sink1 : Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
    at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:98)
    at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
    at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
    at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
    at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:320)
    at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
    ... 6 more
Caused by: java.lang.NullPointerException
    at org.apache.thrift.transport.TSocket.open(TSocket.java:168)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:358)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:215)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:161)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.getMetaStoreClient(HiveEndPoint.java:448)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:274)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
    at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:316)
    at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:313)
    at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:366)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    ... 1 more
16/05/15 14:53:39 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.Created on 12-17-2016 06:57 AM
can you specify where exactl we need to set
add_hive_paths(){ if [ -d "${HIVE_HOME}/lib" ]; then info "Including Hive libraries found via ($HIVE_HOME) for Hive access" FLUME_CLASSPATH="$FLUME_CLASSPATH:$HIVE_HOME/lib/*" fi if [ -d "${HCAT_HOME}/share/hcatalog" ]; then info "Including HCatalog libraries found via ($HCAT_HOME) for Hive access" FLUME_CLASSPATH="$FLUME_CLASSPATH:${HCAT_HOME}/share/hcatalog/*" fi }
is it in flume-env.sh?
i am now just able to see
# Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH=""
# export HIVE_HOME=/usr/lib/hive # export HCAT_HOME=/usr/lib/hive-hcatalog
kindly help.
Created on 12-17-2016 06:59 AM
i am getting following error
Caused by: java.lang.ClassNotFoundException: org.apache.hive.hcatalog.streaming.RecordWriter
thanks,
Rishit Shah
