Member since
09-23-2015
70
Posts
87
Kudos Received
7
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4501 | 09-20-2016 09:11 AM | |
3735 | 05-17-2016 11:58 AM | |
2550 | 04-18-2016 07:27 PM | |
2463 | 04-14-2016 08:25 AM | |
2600 | 03-24-2016 07:16 PM |
03-22-2016
04:17 PM
1 Kudo
In in an advanced architecture you would leverage Zookeeper to announce a new model to the topology without taking it offline.
... View more
03-22-2016
04:14 PM
3 Kudos
You can use PMML (https://de.wikipedia.org/wiki/Predictive_Model_Markup_Language). Spark does support (not all) model to be exported to PMML: http://spark.apache.org/docs/latest/mllib-pmml-model-export.html (UPDATE: As @Simon Elliston Ball rightfully points out in his answer, in case the PMML model is not supported the Spark libs can be reused as most of them have no dependency to the SparkContext) One way could be to use JPMML with Java in Storm: http://henning.kropponline.de/2015/09/06/jpmml-example-random-forest/ https://github.com/jpmml/jpmml-storm The other could be to use R in Storm. I have seen it done, but don't have a reference at hand.
... View more
03-10-2016
09:54 PM
1 Kudo
Thank you for pointing me to that documentation. That helped me with something else.
... View more
03-10-2016
09:53 PM
2 Kudos
I was told that to avoid users from being able to append custom configurations to a config file supports_adding_forbidden can be added to configuration tag like so: <configuration supports_final="true" supports_adding_forbidden="true"> ... </configuration>
... View more
03-10-2016
08:47 AM
2 Kudos
I created a custom service with config files. How do I disable the Custom config section for that config file in Ambari?
... View more
Labels:
- Labels:
-
Apache Ambari
02-20-2016
11:43 PM
1 Kudo
Origin: http://henning.kropponline.de/2015/05/19/hivesink-for-flume/ With the most recent release of HDP (v2.2.4) Hive Streaming is shipped as technical preview. It can for example be used with Storm to ingest streaming data collected from Kafka as demonstrated here. But it also still has some serious limitations and in case of Storm a major bug.
Nevertheless Hive Streaming is likely to become the tool of choice when
it comes to streamline data ingestion to Hadoop. So it is worth to
explore already today. Flume's upcoming release 1.6 will contain a HiveSink
capable of leveraging Hive Streaming for data ingestion. In the
following post we will use it as a replacement for the HDFS sink used in
a previous post here.
Other then replacing the HDFS sink with a HiveSink none of the previous
setup will change, except for Hive table schema which needs to be
adjusted as part of the requirements that currently exist around Hive Streaming. So let's get started by looking into these restrictions. Hive Streaming Limitations The only file format supported is ORC. So the original schema of the stocks table needs to be adjusted to reflect that: DROP TABLE IF EXISTS stocks;
CREATE EXTERNAL TABLE stocks (
date STRING,
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume BIGINT,
adj_close DOUBLE)
PARTITIONED BY(year STRING)
CLUSTERED BY (date) into 3 buckets
STORED AS ORC
LOCATION '/ingest/stocks'; As you can see from the schema the table now also is bucketed, which is required by Hive Streaming. Further more we need to set the following: hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager hive.compactor.initiator.on = true hive.compactor.worker.threads > 0 (eg. 5) Configuration diff as given by Ambari:
Also
important to know is that the current Streaming API only supports
delimited input data (CSV, tab seperated) or JSON (strict syntax). Flume Hive Sink For the Flume Hive Sink the following configurations with their defaults can or must be configured: hive.metastore hive.database hive.table hive.partition hive.txnsPerBatchAsk = 100 batchSize = 15000 serializer (delimited | json) serializer.delimiter = , serializer.fieldnames batchSize = 15000 idleTimeout = 0 callTimeout = 10000 heartBeatInterval = 240 maxOpenConnections useLocalTimeStamp timeZone roundUnit round hour minute second roundValue With the previous example
we can use the following Flume configuration. The batch size and the
transactions per batch are not set very high which probably be different
in a production setup, but is also dependent on the data stream to
expect. flume-hive-ingest.sources = src1
flume-hive-ingest.channels = chan1
flume-hive-ingest.sinks = sink1
flume-hive-ingest.sources.src1.type = spooldir
flume-hive-ingest.sources.src1.channels = chan1
flume-hive-ingest.sources.src1.spoolDir = /vagrant/flume_log
flume-hive-ingest.sources.src1.interceptors = skipHeadI dateI
flume-hive-ingest.sources.src1.interceptors.skipHeadI.type = regex_filter
flume-hive-ingest.sources.src1.interceptors.skipHeadI.regex = ^Date.*
flume-hive-ingest.sources.src1.interceptors.skipHeadI.excludeEvents = true
flume-hive-ingest.sources.src1.interceptors.dateI.type = regex_extractor
flume-hive-ingest.sources.src1.interceptors.dateI.regex = ^(\d+)-.*
flume-hive-ingest.sources.src1.interceptors.dateI.serializers = y
flume-hive-ingest.sources.src1.interceptors.dateI.serializers.y.name = year
flume-hive-ingest.channels.chan1.type = memory
flume-hive-ingest.channels.chan1.capacity = 1000
flume-hive-ingest.channels.chan1.transactionCapacity = 100
flume-hive-ingest.sinks.sink1.type = hive
flume-hive-ingest.sinks.sink1.channel = chan1
flume-hive-ingest.sinks.sink1.hive.metastore = thirft://one.hdp:9083
flume-hive-ingest.sinks.sink1.hive.database = default
flume-hive-ingest.sinks.sink1.hive.table = stocks
flume-hive-ingest.sinks.sink1.hive.partition = %{year}
flume-hive-ingest.sinks.sink1.hive.txnsPerBatchAsk = 2
flume-hive-ingest.sinks.sink1.batchSize = 10
flume-hive-ingest.sinks.sink1.serializer = delimited
flume-hive-ingest.sinks.sink1.serializer.delimiter = ,
flume-hive-ingest.sinks.sink1.serializer.fieldnames = date,open,high,low,close,volume,adj_close Before starting a Flume agent with this configuration you might need to set HIVE_HOME and HCAT_HOME as flume-ng will only put the required Hive jars into the classpath with this logic: add_hive_paths(){
if [ -d "${HIVE_HOME}/lib" ]; then
info "Including Hive libraries found via ($HIVE_HOME) for Hive access"
FLUME_CLASSPATH="$FLUME_CLASSPATH:$HIVE_HOME/lib/*"
fi
if [ -d "${HCAT_HOME}/share/hcatalog" ]; then
info "Including HCatalog libraries found via ($HCAT_HOME) for Hive access"
FLUME_CLASSPATH="$FLUME_CLASSPATH:${HCAT_HOME}/share/hcatalog/*"
fi
} Setting them in my case was pretty straight forward: export HIVE_HOME=/usr/hdp/current/hive-server2
export HCAT_HOME=/usr/hdp/current/hive-webhcat Now we can start the flume agent, obviously after we have created the stocks table: $ hcat -f data/stocks_schema.hive
$ apache-flume-1.6.0-bin/bin/flume-ng agent -f data/flume-file-hive-ingest.conf -n flume-hive-ingest When
working correctly you should be able to see output similar to this,
once you copy the stocks data into the spooling directory: 16/05/15 15:19:18 INFO ql.Driver: OK
16/05/15 15:19:18 INFO log.PerfLogger: <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:18 INFO log.PerfLogger: </PERFLOG method=releaseLocks start=1431703158539 end=1431703158543 duration=4 from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:18 INFO log.PerfLogger: </PERFLOG method=Driver.run start=1431703158452 end=1431703158543 duration=91 from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:20 INFO hive.metastore: Trying to connect to metastore with URI thirft://one.hdp:9083
16/05/15 15:19:20 INFO hive.metastore: Connected to metastore.
16/05/15 15:19:20 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[1743...1744] on endPoint = {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1996] }. Switching to first txn
16/05/15 15:19:20 INFO hive.HiveWriter: Committing Txn 1742 on EndPoint: {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1997] }
16/05/15 15:19:20 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[1745...1746] on endPoint = {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1997] }. Switching to first txn Troubleshooting If something goes wrong, for example with failing connection to the metastore please: Check the requirements posted here or on the Hive wiki. Also check that your schema is bucketed and read the Exception message carefully. Increase the timeout for the HiveWriter to connect to the Metastore and again read the Exception message carefully. Make hdfs://tmp/hive and file:///tmp/hive writable (eg. chmod 777) A typical error message could look like this: 16/05/15 14:53:39 WARN hive.HiveSink: sink1 : Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:98)
at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:320)
at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
... 6 more
Caused by: java.lang.NullPointerException
at org.apache.thrift.transport.TSocket.open(TSocket.java:168)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:358)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:215)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:161)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.getMetaStoreClient(HiveEndPoint.java:448)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:274)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:316)
at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:313)
at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:366)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
16/05/15 14:53:39 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. Further Readings Hive Streaming
Flume HiveSink
HDP Flume
Apache Flume: Distributed Log Collection for Hadoop - Second Edition (Amazon) Apache Hive Essentials (Amazon)
... View more
Labels:
02-20-2016
11:43 PM
2 Kudos
Origin: http://henning.kropponline.de/2015/09/27/storm-serialization-with-avro-using-kryo-serializer/ Working with complex data events can be a challenge designing Storm
topologies for real-time data processing. In such cases emitting single
values for multiple and varying event characteristics soon reveals it's
limitations. For message serialization Storm leverages the Kryo
serialization framework used by many other projects. Kryo keeps a
registry of serializers being used for corresponding Class types.
Mappings in that registry can be overridden or added making the
framework extendable to diverse type serializations. On the other hand Avro is a very popular "data serialization system"
that bridges between many different programming languages and tools.
While the fact that data objects can be described in JSON makes it
really easy to use, Avro is often being used for it's support of schema
evolution. With support for schema evolution the same implementation
(Storm topology) could be capable of reading different versions of the
same data event without adaptation. This makes it a very good fit for
Storm as a intermediator between data ingestion points and data storage
in today's Enterprise Data Architectures. Storm Enterprise Data Architecture The
example here does not provide complex event samples to illustrated that
point, but it gives an end to end implementation of a Storm topology
where events get send to a Kafka queue as Avro objects processesed natively by a real-time processing topology. The example can be found here.
It's a simple Hive Streaming example where stock events are read from a
CSV file and send to Kafka. Stock events are a flat, none complex data
type as already mentioned, but we'll still use it to demo serialization
with using Avro. Deserialization in Storm Before
we look at the beginning, let's start with the end. When we have
everything working properly we should be able to use our defined event
object as such in any bolt part of the topology: Stock stock = (Stock) tuple.getValueByField("myobj_fieldname");
// OR by index //
Stock stock = tuple.getValue(0); As demonstrated we should be
able to cast our object simply from the tuple as it will already be
present in serialized form inside the tuple. Storm will take care of the
serialization for us. Remember Storm internally is using Kryo for
Serialization as described here.
It is using this for all data types in a tuple. To make this work with
our object described in Avro we simply have to register a customer
serializer with Storm's Kryo. The above snippet also concludes, that if we try to get retrieve the date in any other way, for example like this tuple.getBinary(0) we will receive an error. An Exception in such a case could look like this: 2015-09-23 10:52:57 s.AvroStockDataBolt [ERROR] java.lang.ClassCastException: storm_hive_streaming_example.model.Stock cannot be cast to [B
java.lang.ClassCastException: storm_hive_streaming_example.model.Stock cannot be cast to [B
at backtype.storm.tuple.TupleImpl.getBinary(TupleImpl.java:144) ~[storm-core-0.10.0.2.3.0.0-2557.jar:0.10.0.2.3.0.0-2557]
at storm_hive_streaming_example.FieldEmitBolt.execute(FieldEmitBolt.java:34) ~[stormjar.jar:na]
The sample error message clearly stats that our already serialized object simply can not be cast to a binary. So how do we set things up from the start? Spout Scheme Let's
return to the beginning of all, the ingestion of events into a queue
for example. The part being responsible for reading an event of a data
source, like for example a message broker, is known as a Spout to Storm.
Typically we have one spout for a specific data source other than
having single purpose Spouts of a topology. Hence the a spout needs to
be adaptive to the use case and events being issued. Storm uses so
called "Scheme" to configure the data declaration of receiving and
emitting events by the Spout. The Scheme interface declares the methods deserialize(byte[] pojoBytes)
for deserializing the event collected. It returns a list of objects
instead of just one object as one event could potentially be serialized
into several data fields. Here the StockAvroScheme emits the complete Stock object in one field. The second method that needs to be implemented by the Scheme interface is the getOutputFields()
method. This method is responsible for advertising the field definition
to the receiving bolts. As by the implementation below the stock object
gets send in one field. public class StockAvroScheme implements Scheme {
private static final Logger LOG = LoggerFactory.getLogger(Stock.class);
// deserializing the message recieved by the Spout
public List<Object> deserialize(byte[] pojoBytes) {
StockAvroSerializer serializer = new StockAvroSerializer(); // Kryo Serializer
Stock stock = serializer.read(null, new Input(pojoBytes), Stock.class);
List<Object> values = new ArrayList<>();
values.add(0, stock);
return values;
}
// defining the output fields of the Spout
public Fields getOutputFields() {
return new Fields(new String[]{ FieldNames.STOCK_FIELD });
}
} This Scheme can be as illustrated below by the YAML topology configuration using Storm Flux: components:
# defines a scheme for the spout to emit a Stock.class object
- id: "stockAvroScheme"
className: "storm_hive_streaming_example.serializer.StockAvroScheme"
# adding the defined stock scheme to the multi-scheme that can be assigned to the spout
- id: "stockMultiScheme"
className: "backtype.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stockAvroScheme"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${hive-streaming-example.zk.hosts}"
# configuring the spout to read bytes from Kafka and emit Stock.class
- id: "stockSpoutConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts" # brokerHosts
- "${hive-streaming-example.kafka.topic}" # topic
- "${hive-streaming-example.kafka.zkRoot}" # zkRoot
- "${hive-streaming-example.kafka.spoutId}" # id
properties:
- name: "scheme"
ref: "stockMultiScheme" # use the stock scheme previously defined Last but not least we still need to register our customer serializer with Storm. Registering the Serializer Tuples
are send to Spouts and Bolts running in a separate JVMs either on the
same or on a remote host. In case of sending the tuple it needs to get
serialized and deserialized prior to placing the tuple on the the output
collector. For the serialization Storm uses Kryo Serializer. In
order to use a custom Serializer implementation it needs to get
registered with the Kryo instance being used by Strom. This can be done
as part of the topology configuration. Here is the configuration
definition using Storm Flux: name: "hive-streaming-example-avro-scheme"
config:
topology.workers: 1
# define serializers being used by tuples de-/serializing values. See http://storm.apache.org/documentation/Serialization.html
topology.kryo.register:
- storm_hive_streaming_example.model.Stock: storm_hive_streaming_example.serializer.StockAvroSerializer With this registration of the custom Kryo Serializer the AvroStockDataBolt can simply cast the Stock object from the tuple value emit it to the FieldEmitBolt, which decomposes the Stock instance into separate field being used by the HiveBolt. Having the AvroStockDataBolt and FieldEmitBolt
would not make sense in a real implementation as the Scheme could
obviously already be configured to do all that - deserialize and emit
fields to the HiveBolt. Having these two extra bolts is just for demonstration purposes. Finally the custom Kryo Serializer which implements a write(Kryo kryo, Output output, Stock object) and read(Kryo kryo, Input input, Class<Stock> type). Having a general implementation of generic Avro types would be ideal. public class StockAvroSerializer extends Serializer<Stock> {
private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
private Schema SCHEMA = Stock.getClassSchema();
public void write(Kryo kryo, Output output, Stock object) {
DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
writer.write(object, encoder);
encoder.flush();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
IOUtils.closeQuietly(out);
byte[] outBytes = out.toByteArray();
output.writeInt(outBytes.length, true);
output.write(outBytes);
}
public Stock read(Kryo kryo, Input input, Class<Stock> type) {
byte[] value = input.getBuffer();
SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
Stock record = null;
try {
record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
} catch (IOException e) {
LOG.error(e.toString(), e);
}
return record;
}
} Further Readings Storm Serialization
Storm Hive Streaming Example (Github) Storm Flux
Avro Specification
Kafka Storm Starter (Github) Kryo Serializable
http://www.confluent.io/blog/stream-data-platform-2/
Simple Example Using Kryo
Storm Blueprints: Patterns for Distributed Realtime Computation (Amazon) Storm Applied: Strategies for Real-Time Event Processing (Amazon)
... View more
Labels:
02-09-2016
08:36 PM
1 Kudo
Hi, I can't remember how or if I solved it 😉 but could you try to
configure the Hive View with manual configuration instead of the option
to get the cluster configuration? Let me know if this works. Thanks!
... View more
02-09-2016
08:36 PM
1 Kudo
Hi, I can't remember how or if I solved it 😉 but could you try to configure the Hive View with manual configuration instead of the option to get the cluster configuration? Let me know if this works. Thanks!
... View more
12-04-2015
03:24 PM
Is it possible to use config-groups during install with Ambari and/or Ambari Blueprints?
... View more
Labels:
- Labels:
-
Apache Ambari