Member since
09-23-2015
70
Posts
87
Kudos Received
7
Solutions
03-18-2020
04:41 PM
Thanks for this great tutorial and I got your tutorial mostly working. However, the Python workers all failed with this following error message, not sure if because the cluster that I am working with is kerberozied but it somehow looks related to authentication and authorization. ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret:
File "/data12/yarn/nm/usercache/yolo/appcache/application_1579645850066_329429/container_e40_1579645850066_329429_02_000002/PY_ENV/py36yarn/lib/python3.6/os.py", line 669, in __getitem__
raise KeyError(key) from None
KeyError: 'PYTHON_WORKER_FACTORY_SECRET'
20/03/18 19:25:06 ERROR executor.Executor: Exception in task 2.2 in stage 0.0 (TID 4)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:230)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
... 11 more
20/03/18 19:25:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5
20/03/18 19:25:06 INFO executor.Executor: Running task 2.3 in stage 0.0 (TID 5)
... View more
09-15-2016
11:33 AM
6 Kudos
The most recent release of Kafka 0.9 with it's comprehensive security
implementation has reached an important milestone. In his blog post Kafka Security 101 Ismael from Confluent describes the security features part of the release very well. As a part II of the here published post about Kafka Security with Kerberos
this post discussed a sample implementation of a Java Kafka producer
with authentication. It is part of a mini series of posts discussing secure HDP clients, connecting services to a secured cluster, and kerberizing the HDP Sandbox (Download HDP Sandbox). In this effort at the end of this post we will also create a Kafka Servlet to publish messages to a secured broker. Kafka provides SSL and Kerberos authentication. Only Kerberos is discussed here. Kafka
from now on supports four different communication protocols between
Consumers, Producers, and Brokers. Each protocol considers different
security aspects, while PLAINTEXT is the old insecure communication
protocol. PLAINTEXT (non-authenticated, non-encrypted) SSL (SSL authentication, encrypted) PLAINTEXT+SASL (authentication, non-encrypted) SSL+SASL (encrypted authentication, encrypted transport) A
Kafka client needs to be configured to use the protocol of the
corresponding broker. This tells the client to use authentication for
communication with the broker: Properties props = new Properties();
props.put("security.protocol", "PLAINTEXTSASL"); Making use of Kerberos authentication in Java is provided by the Java Authentication and Authorization Service (JAAS)
which is a pluggable authentication method similar to PAM supporting
multiple authentication methods. In this case the authentication method
being used is GSS-API for Kerberos. Demo Setup For
JAAS a proper configuration of GSS would be needed in addition to being
in possession of proper credentials, obviously. Some credentials can be
created with MIT Kerberos like this: (as root)
$ kadmin.local -q "addprinc -pw hadoop kafka-user"
$ kadmin.local -q "xst -k /home/kafka-user/kafka-user.keytab kafka-user@MYCORP.NET"
(Creating a keytab will make the existing password invalid. To change your password back to hadoop use as root:)
$ kadmin.local -q "cpw -pw hadoop hdfs-user" The last line is
not necessarily needed as it creates us a so called keytab - basically
an encrypted password of the user - that can be used for password less
authentication for example for automated services. We will make use of
that here as well. First we need to prepare a test topic to publish messages with proper privileges for our kafka-user: # Become Kafka admin
$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/one.hdp@MYCORP.NET
# Set privileges for kafka-user
$ /usr/hdp/current/kafka-broker/bin/kafka-acls.sh --add --allow-principals user:kafka-user --operation ALL --topic test --authorizer-properties zookeeper.connect=one.hdp:2181
Adding following acls for resource: Topic:test
user:kafka-user has Allow permission for operations: All from hosts: *
Following is list of acls for resource: Topic:test
user:kafka-user has Allow permission for operations: All from hosts: * As a sample producer we will use this: package hdp.sample;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
public static void main(String... args) {
String topic = args[1];
Properties props = new Properties();
props.put("metadata.broker.list", args[0]);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++){
producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
}
}
} With this setup we can go ahead demonstrating two ways to use a
JAAS context to authenticate with the Kafka broker. At first we will
configure a context to use the existing privileges possessed by the
executing user. Next we use a so called keytab to demonstrate a
password-less login for automated producer processes. At last we will
look at a Servlet implementation provided here. Authentication with User Login To
configure a JAAS config with userKeyTab set to false and useTicketCache
to true, so that the privileges of the current users are being used. KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=false
useTicketCache=true
serviceName="kafka";
}; We store this in a file under /home/kafka-user/kafka-jaas.conf and exeute the broker like this: # list current user context
$ klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka-user@MYCORP.NET
Valid starting Expires Service principal
21.02.2016 16:13:13 22.02.2016 16:13:13 krbtgt/MYCORP.NET@MYCORP.NET
# execute java producer
$ java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* hdp.sample.KafkaProducer one.hdp:6667 test
# consume sample messages for test
$ /usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --broker-list one.hdp:6667 --topic test --security-protocol PLAINTEXTSASL --partition 0
{metadata.broker.list=one.hdp:6667, request.timeout.ms=1000, client.id=SimpleConsumerShell, security.protocol=PLAINTEXTSASL}
Test Date: Sun Feb 21 16:12:05 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016 Using Keytab to Login Next
we will configure the JAAS context to use a generated keytab file
instead of the security context of the executing user. Before we can do
this we need to create the keytab storing it also under /home/kafka-user/kafka-user.keytab. $ kadmin.local -q "xst -k /home/kafka-user/kafka-user.keytab kafka-user@MYCORP.NET"
Authenticating as principal kafka-user/admin@MYCORP.NET with password.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type aes128-cts-hmac-sha1-96 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des3-cbc-sha1 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type arcfour-hmac added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des-hmac-sha1 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des-cbc-md5 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
$ chown kafka-user. /home/kafka-user/kafka-user.keytab The JAAS configuration can now be changed to look like this: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="kafka-user@MYCORP.NET"
useKeyTab=true
serviceName="kafka"
keyTab="/home/kafka-user/kafka-user.keytab"
client=true;
}; This will use the keytab stored under
/home/kafka-user/kafka-user.keytab while the user executing the producer
must not be logged in to any security controller: $ klist
klist: Credentials cache file '/tmp/krb5cc_0' not found
$ java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* hdp.sample.KafkaProducer one.hdp:6667 test Kafka Producer Servlet In a last example we will add a Kafka Servlet to the hdp-web-sample project previously described in this post. Our Servlet will get the topic and message as a GET parameter. The Servlet looks as follwoing: package hdp.webapp;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Properties;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaServlet extends HttpServlet implements Servlet {
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String topic = request.getParameter("topic");
String msg = request.getParameter("msg");
Properties props = new Properties();
props.put("metadata.broker.list", "one.hdp:6667");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>(topic, msg));
PrintWriter out = response.getWriter();
out.println("<html>");
out.println("<head><title>Write to topic: "+ topic +"</title></head>");
out.println("<body><h1>/"+ msg +"</h1>");
out.println("</html>");
out.close();
}
} Again we are changing the JAAS config of the Tomcat service to be able to make use of the previously generated keytab. The jaas.conf of Tomcat will contain now this: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="kafka-user@MYCORP.NET"
useKeyTab=true
serviceName="kafka"
keyTab="/home/kafka-user/kafka-user.keytab"
client=true;
};
com.sun.security.jgss.krb5.initiate {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
principal="tomcat/one.hdp@MYCORP.NET"
useKeyTab=true
keyTab="/etc/tomcat/tomcat.keytab"
storeKey=true;
}; After deploying the web app and restarting tomcat with this
newly adapted JAAS config you should be able to publish message to a
secured broker be triggering the following GET address from a browser http://one.hdp:8099/hdp-web/kafka?topic=test&msg=Test1 . The response should be a 200 OK like this: You might be having some issues and in particular seeing this Exception: SEVERE: Servlet.service() for servlet [KafkaServlet] in context with path [/hdp-web] threw exception [Servlet execution threw an exception] with root cause
javax.security.auth.login.LoginException: Unable to obtain password from user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:897)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at hdp.webapp.KafkaServlet.doGet(KafkaServlet.java:33)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:314)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745) If are seeing the message javax.security.auth.login.LoginException: Unable to obtain password from user
it likely refers to your keytab file, as being the users password. So
make sure that the tomcat user is able to read that file stored under /home/kafka-user/kafka-user.keytab for example. Further Readings Kafka Security 101
Kafka Security
Kafka Sasl/Kerberos and SSL Implementation
Oracle Doc: JAAS Authentication
Krb5LoginModule
Flume with kerberized KafkaJAAS Login Configuration File This article was first published under: http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/
... View more
Labels:
12-17-2016
06:59 AM
i am getting following error Caused by: java.lang.ClassNotFoundException: org.apache.hive.hcatalog.streaming.RecordWriter thanks, Rishit Shah
... View more
02-20-2016
11:43 PM
2 Kudos
Origin: http://henning.kropponline.de/2015/09/27/storm-serialization-with-avro-using-kryo-serializer/ Working with complex data events can be a challenge designing Storm
topologies for real-time data processing. In such cases emitting single
values for multiple and varying event characteristics soon reveals it's
limitations. For message serialization Storm leverages the Kryo
serialization framework used by many other projects. Kryo keeps a
registry of serializers being used for corresponding Class types.
Mappings in that registry can be overridden or added making the
framework extendable to diverse type serializations. On the other hand Avro is a very popular "data serialization system"
that bridges between many different programming languages and tools.
While the fact that data objects can be described in JSON makes it
really easy to use, Avro is often being used for it's support of schema
evolution. With support for schema evolution the same implementation
(Storm topology) could be capable of reading different versions of the
same data event without adaptation. This makes it a very good fit for
Storm as a intermediator between data ingestion points and data storage
in today's Enterprise Data Architectures. Storm Enterprise Data Architecture The
example here does not provide complex event samples to illustrated that
point, but it gives an end to end implementation of a Storm topology
where events get send to a Kafka queue as Avro objects processesed natively by a real-time processing topology. The example can be found here.
It's a simple Hive Streaming example where stock events are read from a
CSV file and send to Kafka. Stock events are a flat, none complex data
type as already mentioned, but we'll still use it to demo serialization
with using Avro. Deserialization in Storm Before
we look at the beginning, let's start with the end. When we have
everything working properly we should be able to use our defined event
object as such in any bolt part of the topology: Stock stock = (Stock) tuple.getValueByField("myobj_fieldname");
// OR by index //
Stock stock = tuple.getValue(0); As demonstrated we should be
able to cast our object simply from the tuple as it will already be
present in serialized form inside the tuple. Storm will take care of the
serialization for us. Remember Storm internally is using Kryo for
Serialization as described here.
It is using this for all data types in a tuple. To make this work with
our object described in Avro we simply have to register a customer
serializer with Storm's Kryo. The above snippet also concludes, that if we try to get retrieve the date in any other way, for example like this tuple.getBinary(0) we will receive an error. An Exception in such a case could look like this: 2015-09-23 10:52:57 s.AvroStockDataBolt [ERROR] java.lang.ClassCastException: storm_hive_streaming_example.model.Stock cannot be cast to [B
java.lang.ClassCastException: storm_hive_streaming_example.model.Stock cannot be cast to [B
at backtype.storm.tuple.TupleImpl.getBinary(TupleImpl.java:144) ~[storm-core-0.10.0.2.3.0.0-2557.jar:0.10.0.2.3.0.0-2557]
at storm_hive_streaming_example.FieldEmitBolt.execute(FieldEmitBolt.java:34) ~[stormjar.jar:na]
The sample error message clearly stats that our already serialized object simply can not be cast to a binary. So how do we set things up from the start? Spout Scheme Let's
return to the beginning of all, the ingestion of events into a queue
for example. The part being responsible for reading an event of a data
source, like for example a message broker, is known as a Spout to Storm.
Typically we have one spout for a specific data source other than
having single purpose Spouts of a topology. Hence the a spout needs to
be adaptive to the use case and events being issued. Storm uses so
called "Scheme" to configure the data declaration of receiving and
emitting events by the Spout. The Scheme interface declares the methods deserialize(byte[] pojoBytes)
for deserializing the event collected. It returns a list of objects
instead of just one object as one event could potentially be serialized
into several data fields. Here the StockAvroScheme emits the complete Stock object in one field. The second method that needs to be implemented by the Scheme interface is the getOutputFields()
method. This method is responsible for advertising the field definition
to the receiving bolts. As by the implementation below the stock object
gets send in one field. public class StockAvroScheme implements Scheme {
private static final Logger LOG = LoggerFactory.getLogger(Stock.class);
// deserializing the message recieved by the Spout
public List<Object> deserialize(byte[] pojoBytes) {
StockAvroSerializer serializer = new StockAvroSerializer(); // Kryo Serializer
Stock stock = serializer.read(null, new Input(pojoBytes), Stock.class);
List<Object> values = new ArrayList<>();
values.add(0, stock);
return values;
}
// defining the output fields of the Spout
public Fields getOutputFields() {
return new Fields(new String[]{ FieldNames.STOCK_FIELD });
}
} This Scheme can be as illustrated below by the YAML topology configuration using Storm Flux: components:
# defines a scheme for the spout to emit a Stock.class object
- id: "stockAvroScheme"
className: "storm_hive_streaming_example.serializer.StockAvroScheme"
# adding the defined stock scheme to the multi-scheme that can be assigned to the spout
- id: "stockMultiScheme"
className: "backtype.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stockAvroScheme"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${hive-streaming-example.zk.hosts}"
# configuring the spout to read bytes from Kafka and emit Stock.class
- id: "stockSpoutConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts" # brokerHosts
- "${hive-streaming-example.kafka.topic}" # topic
- "${hive-streaming-example.kafka.zkRoot}" # zkRoot
- "${hive-streaming-example.kafka.spoutId}" # id
properties:
- name: "scheme"
ref: "stockMultiScheme" # use the stock scheme previously defined Last but not least we still need to register our customer serializer with Storm. Registering the Serializer Tuples
are send to Spouts and Bolts running in a separate JVMs either on the
same or on a remote host. In case of sending the tuple it needs to get
serialized and deserialized prior to placing the tuple on the the output
collector. For the serialization Storm uses Kryo Serializer. In
order to use a custom Serializer implementation it needs to get
registered with the Kryo instance being used by Strom. This can be done
as part of the topology configuration. Here is the configuration
definition using Storm Flux: name: "hive-streaming-example-avro-scheme"
config:
topology.workers: 1
# define serializers being used by tuples de-/serializing values. See http://storm.apache.org/documentation/Serialization.html
topology.kryo.register:
- storm_hive_streaming_example.model.Stock: storm_hive_streaming_example.serializer.StockAvroSerializer With this registration of the custom Kryo Serializer the AvroStockDataBolt can simply cast the Stock object from the tuple value emit it to the FieldEmitBolt, which decomposes the Stock instance into separate field being used by the HiveBolt. Having the AvroStockDataBolt and FieldEmitBolt
would not make sense in a real implementation as the Scheme could
obviously already be configured to do all that - deserialize and emit
fields to the HiveBolt. Having these two extra bolts is just for demonstration purposes. Finally the custom Kryo Serializer which implements a write(Kryo kryo, Output output, Stock object) and read(Kryo kryo, Input input, Class<Stock> type). Having a general implementation of generic Avro types would be ideal. public class StockAvroSerializer extends Serializer<Stock> {
private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
private Schema SCHEMA = Stock.getClassSchema();
public void write(Kryo kryo, Output output, Stock object) {
DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
writer.write(object, encoder);
encoder.flush();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
IOUtils.closeQuietly(out);
byte[] outBytes = out.toByteArray();
output.writeInt(outBytes.length, true);
output.write(outBytes);
}
public Stock read(Kryo kryo, Input input, Class<Stock> type) {
byte[] value = input.getBuffer();
SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
Stock record = null;
try {
record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
} catch (IOException e) {
LOG.error(e.toString(), e);
}
return record;
}
} Further Readings Storm Serialization
Storm Hive Streaming Example (Github) Storm Flux
Avro Specification
Kafka Storm Starter (Github) Kryo Serializable
http://www.confluent.io/blog/stream-data-platform-2/
Simple Example Using Kryo
Storm Blueprints: Patterns for Distributed Realtime Computation (Amazon) Storm Applied: Strategies for Real-Time Event Processing (Amazon)
... View more
Labels: