Member since
09-23-2015
70
Posts
87
Kudos Received
7
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2067 | 09-20-2016 09:11 AM | |
1274 | 05-17-2016 11:58 AM | |
1008 | 04-18-2016 07:27 PM | |
1084 | 04-14-2016 08:25 AM | |
1322 | 03-24-2016 07:16 PM |
09-26-2016
03:05 PM
3 Kudos
Controlling the environment of an application is vital for it's
functionality and stability. Especially in a distributed environment it
is important for developers to have control over the version of
dependencies. In such an scenario it's a critical task to ensure
possible conflicting requirements of multiple applications are not
disturbing each other. That is why frameworks like YARN
ensure that each application is executed in a self-contained
environment - typically in a Linux Container or Docker Container - that
is controlled by the developer. In this post we show what this means for
Python environments being used by Spark. YARN Application Deployment As mentioned earlier does YARN
execute each application in a self-contained environment on each host.
This ensures the execution in a controlled environment managed by
individual developers. The way this works in a nutshell is that the
dependency of an application are distributed to each node typically via HDFS. This
figure simplifies the fact that HDFS is actually being used to
distribute the application. See HDFS distributed cache for reference. The files are uploaded to a staging folder /user/${username}/.${application} of the submitting user in HDFS. Because of the distributed architecture of HDFS
it is ensured that multiple nodes have local copies of the files. In
fact to ensure that a large fraction of the cluster has a local copy of
application files and does not need to download them over the network,
the HDFS replication factor is set much higher for this files than 3.
Often a number between 10 and 20 is chosen for the replication factor. During
the preparation of the container on a node you will notice in logs
similar commands to the below example are being executed: ln -sf "/hadoop/yarn/local/usercache/vagrant/filecache/72/pyspark.zip" "pyspark.zip" The folder /hadoop/yarn/local/ is the configured location on each node where YARN
stores it's needed files and logs locally. Creating a symbolic link
like this inside the container makes the content of the zip file
available. It is being referenced as "pyspark.zip". Using Conda Env For
application developers this means that they can package and ship their
controlled environment with each application. Other solutions like NFS or Amazon EFS
shares are not needed, especially since solutions like shared folders
makes for a bad architecture that is not designed to scale very well
making the development of application less agile. The following example demonstrate the use of conda env to transport a python environment with a PySpark application needed to be executed. This sample application uses the NLTK package with the additional requirement of making tokenizer and tagger resources available to the application as well. Our sample application: import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf
conf = SparkConf()
conf.setAppName("spark-ntlk-env")
sc = SparkContext(conf=conf)
data = sc.textFile('hdfs:///user/vagrant/1970-Nixon.txt')
def word_tokenize(x):
import nltk
return nltk.word_tokenize(x)
def pos_tag(x):
import nltk
return nltk.pos_tag([x])
words = data.flatMap(word_tokenize)
words.saveAsTextFile('hdfs:///user/vagrant/nixon_tokens')
pos_word = words.map(pos_tag)
pos_word.saveAsTextFile('hdfs:///user/vagrant/nixon_token_pos') Preparing the sample input data For our example we are using the provided samples of NLTK (http://www.nltk.org/nltk_data/) and upload them to HDFS: (nltk_env)$ python -m nltk.downloader -d nltk_data all
(nltk_env)$ hdfs dfs -put nltk_data/corpora/state_union/1970-Nixon.txt /user/vagrant/ No Hard (Absolute) Links! Before
we actually go and create our environment lets first take a quick
moment to recap on how an environment is typically being composed. On a
machine the environment is made out of variables linking to different
target folders containing executable or other resource files. So if you
execute a command it is either referenced from your PATH, PYTHON_LIBRARY, or any other defined variable. These variables link to files in directories like /usr/bin, /usr/local/bin or any other referenced location. They are called hard links or absolute reference as they start from root /. Environments
using hard links are not easily transportable as they make strict
assumption about the the overall execution engine (your OS for example)
they are being used in. Therefor it is necessary to use relative links
in a transportable/relocatable environment. This is especially true for conda env as it creates hard links by default. By making the conda env relocatable it can be used in a application by referencing it from the application root . (current dir) instead of the overall root /. By using the --copy options during the creation of the environment packages are being copied instead of being linked. Creating our relocatable environment together with nltk and numpy: conda create -n nltk_env --copy -y -q python=3 nltk numpy
Fetching package metadata .......
Solving package specifications: ..........
Package plan for installation in environment /home/datalab_user01/anaconda2/envs/nltk_env:
The following packages will be downloaded:
package | build
---------------------------|-----------------
python-3.5.2 | 0 17.2 MB
nltk-3.2.1 | py35_0 1.8 MB
numpy-1.11.1 | py35_0 6.1 MB
setuptools-23.0.0 | py35_0 460 KB
wheel-0.29.0 | py35_0 82 KB
pip-8.1.2 | py35_0 1.6 MB
------------------------------------------------------------
Total: 27.2 MB
The following NEW packages will be INSTALLED:
mkl: 11.3.3-0 (copy)
nltk: 3.2.1-py35_0 (copy)
numpy: 1.11.1-py35_0 (copy)
openssl: 1.0.2h-1 (copy)
pip: 8.1.2-py35_0 (copy)
python: 3.5.2-0 (copy)
readline: 6.2-2 (copy)
setuptools: 23.0.0-py35_0 (copy)
sqlite: 3.13.0-0 (copy)
tk: 8.5.18-0 (copy)
wheel: 0.29.0-py35_0 (copy)
xz: 5.2.2-0 (copy)
zlib: 1.2.8-3 (copy)
#
# To activate this environment, use:
# $ source activate nltk_env
#
# To deactivate this environment, use:
# $ source deactivate
#
This also works for different python version 3.x or 2.x! Zip it and Ship it! Now that we have our relocatable environment all set we are able to package it and ship it as part of our sample PySpark job. $ cd ~/anaconda2/envs/
$ zip -r nltk_env.zip nltk_env Making this available in during the execution of our application in a YARN container we have for one distribute the package and for second change the default environment of Spark for python to your location. The variable controlling the python environment for python applications in Spark is named PYSPARK_PYTHON. PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python --master yarn-cluster --archives nltk_env.zip#NLTK spark_nltk_sample.py Our virtual environment is linked by NLTK that is why the path in PYSPARK_PYTHON is pointing to ./NLTK/content/of/zip/... . The exact command being executed during container creation is something like this: ln -sf "/hadoop/yarn/local/usercache/vagrant/filecache/71/nltk_env.zip" "NLTK" Shipping additional resources with an application is controlled by the --files and --archives options as shown here. The options being used here are documented in Spark Yarn Configuration and Spark Environment Variables for reference. Packaging tokenizer and taggers Doing just the above will unfortunately fail, because using the NLTK
parser in the way we are using it in the example program has some
additional dependencies. If you have followed the above steps executing
submitting it to your YARN cluster will result in the following exception at container level: Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
...
raise LookupError(resource_not_found)
LookupError:
**********************************************************************
Resource 'taggers/averaged_perceptron_tagger/averaged_perceptron
_tagger.pickle' not found. Please use the NLTK Downloader to
obtain the resource: >>> nltk.download()
Searched in:
- '/home/nltk_data'
- '/usr/share/nltk_data'
- '/usr/local/share/nltk_data'
- '/usr/lib/nltk_data'
- '/usr/local/lib/nltk_data'
**********************************************************************
at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more The problem is that NLTK expects the follwoing resource in the tokenizers/punkt/english.pickle to be available in either of the following locations: Searched in:
- '/home/nltk_data'
- '/usr/share/nltk_data'
- '/usr/local/share/nltk_data'
- '/usr/lib/nltk_data'
- '/usr/local/lib/nltk_data' The good thing about this is,
that we by now should now how we can ship the required dependency to our
application. We can do it the same way we did with our python
environment. Again it is imporant to reensure yourself how the resource is going to be referenced. NLTK expects it by default in the current location under tokenizers/punkt/english.pickle that is why we navigate into the folder for packaging and reference the zip file wiht tokenizer.zip#tokenizer. (nltk_env)$ cd nltk_data/tokenizers/
(nltk_env)$ zip -r ../../tokenizers.zip *
(nltk_env)$ cd ../../
(nltk_env)$ cd nltk_data/taggers/
(nltk_env)$ zip -r ../../taggers.zip *
(nltk_env)$ cd ../../ At a later point our program will expect a tagger in the same fashion already demonstrated in the above snippet. Using YARN Locations We can ship those zip resources the same way we shipped our conda env. In addition environment variable can be used to control resource discovery and allocation. For NLTK you can use the environment variable NLTK_DATA to control the path. Setting this in Spark can be done similar to the way we set PYSPARK_PYTHON: --conf spark.yarn.appMasterEnv.NLTK_DATA=./ Additionally YARN exposes the container path via the environment variable PWD. This can be used in NLTK as to add it to the search path as follows: def word_tokenize(x):
import nltk
nltk.data.path.append(os.environ.get('PWD'))
return nltk.word_tokenize(x) The submission of your application becomes now: PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python --conf spark.yarn.appMasterEnv.NLTK_DATA=./ --master yarn-cluster --archives nltk_env.zip#NLTK,tokenizers.zip#tokenizers,taggers.zip#taggers spark_nltk_sample.py The expected result should be something like the following: (nltk_env)$ hdfs dfs -cat /user/datalab_user01/nixon_tokens/* | head -n 20
Annual
Message
to
the
Congress
on
the
State
of
the
Union
.
January
22
,
1970
Mr.
Speaker
,
Mr. And: (nltk_env)$ hdfs dfs -cat /user/datalab_user01/nixon_token_pos/* | head -n 20
[(u'Annual', 'JJ')]
[(u'Message', 'NN')]
[(u'to', 'TO')]
[(u'the', 'DT')]
[(u'Congress', 'NNP')]
[(u'on', 'IN')]
[(u'the', 'DT')]
[(u'State', 'NNP')]
[(u'of', 'IN')]
[(u'the', 'DT')]
[(u'Union', 'NN')]
[(u'.', '.')]
[(u'January', 'NNP')]
[(u'22', 'CD')]
[(u',', ',')]
[(u'1970', 'CD')]
[(u'Mr.', 'NNP')]
[(u'Speaker', 'NN')]
[(u',', ',')]
[(u'Mr.', 'NNP')] Further Readings PySpark Internals
Spark NLTK Example
NLTK Data
Virtualenv in Hadoop Streaming
Conda Intro
Conda Env with Spark
Python Env support in Spark (SPARK-13587) Post was first published here: http://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/
... View more
- Find more articles tagged with:
- conda
- Data Processing
- How-ToTutorial
- pyspark
- Spark
Labels:
09-20-2016
09:52 AM
Well, you would have the login, but not the kerberos init. You would still have two realms with user credentials the KRB5 realm and the LDAP realm depending on your setup. Actually the KRB5 realm can be included inside LDAP or put differently Kerberos can be configured to use LDAP as it's user DB, that would give you the possibility to combine both. This essential is what FreeIPA is.
... View more
09-20-2016
09:49 AM
Yes with pam_ldap integration: http://www.tldp.org/HOWTO/archived/LDAP-Implementation-HOWTO/pamnss.html
... View more
09-20-2016
09:11 AM
A list of recommended tools are:
SSSD https://fedorahosted.org/sssd/ / https://help.ubuntu.com/lts/serverguide/sssd-ad.html FreeIPA (introduces additional AD and need to establish Trust between the two) https://www.freeipa.org/page/Main_Page Winutils Centrify (commercial) https://www.centrify.com/ VAS / Quest (commercial) https://software.dell.com/products/authentication-services/ .... Please check the material of this workshop for reference: https://community.hortonworks.com/articles/1143/cheatsheet-on-configuring-authentication-authoriza.html
https://community.hortonworks.com/repos/4465/workshops-on-how-to-setup-security-on-hadoop-using.html
... View more
09-19-2016
02:29 PM
1 Kudo
This does not sound like a good idea. Edge nodes by definition typically just hold client programs no services like datanode or nodemanager. YARN would manage the resource allocation based on data and utilization of the nodes, that is why it often also is not a good idea to run nodemanagers without datanodes on one machine. Concerning your "But can i .. bring the data
to the edge .. run the task if all other nodes are busy?" YARN does the resource negotiation and scheduling for distributed frameworks like MR and Spark. I would advise to not do this manually but let YARN do this for you. I hope this helps?
... View more
09-15-2016
05:43 PM
Currently Spark does not support the deployment to YARN from a SparkContext. Use spark-submit instead. For unit testing it is recommended to use [local] runner. The problem is that you can not set the Hadoop conf from outside the SparkContext, it is received from *-site.xml config under HADOOP_HOME during the spark-submit. So you can not point to your remote cluster in Eclipse unless you setup the correct *-site.conf on your laptop and use spark-submit. SparkSubmit is available as a Java class, but I doubt that
you will achieve what your are looking for with it. But you would be able to
launch a spark job from Eclipse to a remote cluster, if this is sufficient for you. Have a look at the Oozie Spark launcher as an example. SparkContext is dramatically changing in Spark 2 in favor I think of SparkClient to support multiple SparkContexts. I am not sure what the situation is with that.
... View more
09-15-2016
11:48 AM
1 Kudo
@Smart Solutions could you please check if this article is of any help for you: https://community.hortonworks.com/content/kbentry/56704/secure-kafka-java-producer-with-kerberos.html
... View more
09-15-2016
11:33 AM
6 Kudos
The most recent release of Kafka 0.9 with it's comprehensive security
implementation has reached an important milestone. In his blog post Kafka Security 101 Ismael from Confluent describes the security features part of the release very well. As a part II of the here published post about Kafka Security with Kerberos
this post discussed a sample implementation of a Java Kafka producer
with authentication. It is part of a mini series of posts discussing secure HDP clients, connecting services to a secured cluster, and kerberizing the HDP Sandbox (Download HDP Sandbox). In this effort at the end of this post we will also create a Kafka Servlet to publish messages to a secured broker. Kafka provides SSL and Kerberos authentication. Only Kerberos is discussed here. Kafka
from now on supports four different communication protocols between
Consumers, Producers, and Brokers. Each protocol considers different
security aspects, while PLAINTEXT is the old insecure communication
protocol. PLAINTEXT (non-authenticated, non-encrypted) SSL (SSL authentication, encrypted) PLAINTEXT+SASL (authentication, non-encrypted) SSL+SASL (encrypted authentication, encrypted transport) A
Kafka client needs to be configured to use the protocol of the
corresponding broker. This tells the client to use authentication for
communication with the broker: Properties props = new Properties();
props.put("security.protocol", "PLAINTEXTSASL"); Making use of Kerberos authentication in Java is provided by the Java Authentication and Authorization Service (JAAS)
which is a pluggable authentication method similar to PAM supporting
multiple authentication methods. In this case the authentication method
being used is GSS-API for Kerberos. Demo Setup For
JAAS a proper configuration of GSS would be needed in addition to being
in possession of proper credentials, obviously. Some credentials can be
created with MIT Kerberos like this: (as root)
$ kadmin.local -q "addprinc -pw hadoop kafka-user"
$ kadmin.local -q "xst -k /home/kafka-user/kafka-user.keytab kafka-user@MYCORP.NET"
(Creating a keytab will make the existing password invalid. To change your password back to hadoop use as root:)
$ kadmin.local -q "cpw -pw hadoop hdfs-user" The last line is
not necessarily needed as it creates us a so called keytab - basically
an encrypted password of the user - that can be used for password less
authentication for example for automated services. We will make use of
that here as well. First we need to prepare a test topic to publish messages with proper privileges for our kafka-user: # Become Kafka admin
$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/one.hdp@MYCORP.NET
# Set privileges for kafka-user
$ /usr/hdp/current/kafka-broker/bin/kafka-acls.sh --add --allow-principals user:kafka-user --operation ALL --topic test --authorizer-properties zookeeper.connect=one.hdp:2181
Adding following acls for resource: Topic:test
user:kafka-user has Allow permission for operations: All from hosts: *
Following is list of acls for resource: Topic:test
user:kafka-user has Allow permission for operations: All from hosts: * As a sample producer we will use this: package hdp.sample;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
public static void main(String... args) {
String topic = args[1];
Properties props = new Properties();
props.put("metadata.broker.list", args[0]);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++){
producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
}
}
} With this setup we can go ahead demonstrating two ways to use a
JAAS context to authenticate with the Kafka broker. At first we will
configure a context to use the existing privileges possessed by the
executing user. Next we use a so called keytab to demonstrate a
password-less login for automated producer processes. At last we will
look at a Servlet implementation provided here. Authentication with User Login To
configure a JAAS config with userKeyTab set to false and useTicketCache
to true, so that the privileges of the current users are being used. KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=false
useTicketCache=true
serviceName="kafka";
}; We store this in a file under /home/kafka-user/kafka-jaas.conf and exeute the broker like this: # list current user context
$ klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka-user@MYCORP.NET
Valid starting Expires Service principal
21.02.2016 16:13:13 22.02.2016 16:13:13 krbtgt/MYCORP.NET@MYCORP.NET
# execute java producer
$ java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* hdp.sample.KafkaProducer one.hdp:6667 test
# consume sample messages for test
$ /usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh --broker-list one.hdp:6667 --topic test --security-protocol PLAINTEXTSASL --partition 0
{metadata.broker.list=one.hdp:6667, request.timeout.ms=1000, client.id=SimpleConsumerShell, security.protocol=PLAINTEXTSASL}
Test Date: Sun Feb 21 16:12:05 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016
Test Date: Sun Feb 21 16:12:06 UTC 2016 Using Keytab to Login Next
we will configure the JAAS context to use a generated keytab file
instead of the security context of the executing user. Before we can do
this we need to create the keytab storing it also under /home/kafka-user/kafka-user.keytab. $ kadmin.local -q "xst -k /home/kafka-user/kafka-user.keytab kafka-user@MYCORP.NET"
Authenticating as principal kafka-user/admin@MYCORP.NET with password.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type aes128-cts-hmac-sha1-96 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des3-cbc-sha1 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type arcfour-hmac added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des-hmac-sha1 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
Entry for principal kafka-user@MYCORP.NET with kvno 2, encryption type des-cbc-md5 added to keytab WRFILE:/home/kafka-user/kafka-user.keytab.
$ chown kafka-user. /home/kafka-user/kafka-user.keytab The JAAS configuration can now be changed to look like this: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="kafka-user@MYCORP.NET"
useKeyTab=true
serviceName="kafka"
keyTab="/home/kafka-user/kafka-user.keytab"
client=true;
}; This will use the keytab stored under
/home/kafka-user/kafka-user.keytab while the user executing the producer
must not be logged in to any security controller: $ klist
klist: Credentials cache file '/tmp/krb5cc_0' not found
$ java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* hdp.sample.KafkaProducer one.hdp:6667 test Kafka Producer Servlet In a last example we will add a Kafka Servlet to the hdp-web-sample project previously described in this post. Our Servlet will get the topic and message as a GET parameter. The Servlet looks as follwoing: package hdp.webapp;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Properties;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaServlet extends HttpServlet implements Servlet {
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String topic = request.getParameter("topic");
String msg = request.getParameter("msg");
Properties props = new Properties();
props.put("metadata.broker.list", "one.hdp:6667");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>(topic, msg));
PrintWriter out = response.getWriter();
out.println("<html>");
out.println("<head><title>Write to topic: "+ topic +"</title></head>");
out.println("<body><h1>/"+ msg +"</h1>");
out.println("</html>");
out.close();
}
} Again we are changing the JAAS config of the Tomcat service to be able to make use of the previously generated keytab. The jaas.conf of Tomcat will contain now this: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="kafka-user@MYCORP.NET"
useKeyTab=true
serviceName="kafka"
keyTab="/home/kafka-user/kafka-user.keytab"
client=true;
};
com.sun.security.jgss.krb5.initiate {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
principal="tomcat/one.hdp@MYCORP.NET"
useKeyTab=true
keyTab="/etc/tomcat/tomcat.keytab"
storeKey=true;
}; After deploying the web app and restarting tomcat with this
newly adapted JAAS config you should be able to publish message to a
secured broker be triggering the following GET address from a browser http://one.hdp:8099/hdp-web/kafka?topic=test&msg=Test1 . The response should be a 200 OK like this: You might be having some issues and in particular seeing this Exception: SEVERE: Servlet.service() for servlet [KafkaServlet] in context with path [/hdp-web] threw exception [Servlet execution threw an exception] with root cause
javax.security.auth.login.LoginException: Unable to obtain password from user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:897)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at hdp.webapp.KafkaServlet.doGet(KafkaServlet.java:33)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:314)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745) If are seeing the message javax.security.auth.login.LoginException: Unable to obtain password from user
it likely refers to your keytab file, as being the users password. So
make sure that the tomcat user is able to read that file stored under /home/kafka-user/kafka-user.keytab for example. Further Readings Kafka Security 101
Kafka Security
Kafka Sasl/Kerberos and SSL Implementation
Oracle Doc: JAAS Authentication
Krb5LoginModule
Flume with kerberized KafkaJAAS Login Configuration File This article was first published under: http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/
... View more
- Find more articles tagged with:
- How-ToTutorial
- Kafka
- Kerberos
- Security
Labels:
05-17-2016
11:58 AM
2 Kudos
Hi @sarfarazkhan pathan, if I am not mistaken this is just a warning. You typically can install the cluster with already installed users from previous installs. In some cases this obviously can cause some issues, but in general it doesn't. Also if you do add and remove users frequently from multiple installs you are increasing the uid count of your system. So it should be save to ignore the warning. You can read here about cleaning up nodes from previous installs: http://henning.kropponline.de/2016/04/24/uninstalling-and-cleaning-a-hdp-node/ EDIT: BTW to be safe restart of the ambari agent and simply re-running the checks should be enough. No restart of node or similar should be required. Regards
... View more
04-22-2016
07:49 PM
2 Kudos
To me it looks like in the Ranger admin config your ambari_ranger_admin and ambari_ranger_password are empty. Could you please check if they exists and are not blank? Please note, that the ambari_ranger_admin is different from the Ranger admin_user. But both are defined in the Ranger configuration in Ambari.
... View more
04-18-2016
08:46 PM
@Benjamin Leonhardi Try this https://github.com/hkropp/vagrant-hdp/blob/master/bin/ambari-shell.jar and run it with % java -jar ambari-shell.jar --ambari.host=
... View more
04-18-2016
07:27 PM
1 Kudo
No, Ranger does not have a shell. But that is an interesting idea. If you are interested, Ambari does have a shell for it's REST API. It uses Spring Shell. Checkout this https://cwiki.apache.org/confluence/display/AMBARI/Ambari+Shell and this http://docs.spring.io/spring-shell/docs/current/reference/htmlsingle/
... View more
04-14-2016
08:25 AM
1. First you need to make sure, that your none root user has sufficient sudoers rights. Please check this docuement: http://docs.hortonworks.com/HDPDocuments/Ambari-2.2.1.1/bk_Ambari_Security_Guide/content/_configuring_ambari_for_non-root.html 2. Next I suspect you setup password less SSH for the root user not the none-root user? I always prefer manual agent registration, but that is just me. Please check: http://docs.hortonworks.com/HDPDocuments/Ambari-2.2.1.1/bk_ambari_reference_guide/content/_install_the_ambari_agents_manually.html A simple sed helps to do the manual registration: $ sed -i 's/hostname=localhost/hostname=<ambari_server_fqdn>/' /etc/ambari-agent/conf/ambari-agent.ini
... View more
04-12-2016
05:52 PM
The provided workaround by @Alessio Ubaldi seems to work. But you should also try to first upgrade to 2.3.7.
... View more
04-11-2016
02:40 PM
1 Kudo
Can I configure two authentication provider for Knox and if how would that work?
... View more
Labels:
- Labels:
-
Apache Knox
04-06-2016
09:51 AM
3 Kudos
The Error message in /var/log/knox/gateway.log says that the certificate used by Knox will be valid starting in the future: Failed to start gateway: org.apache.hadoop.gateway.services.ServiceLifecycleException: Gateway SSL Certificate is not yet valid. Server will not start. -> "not yet valid" Knox refuses to start, because using such a certificate will result in an SSL exception for almost any client. You will need to check the certificate your are using for Knox. This is stored as gateway-identity in gateway.jks under /var/lib/knox/data*/keystore Please refer to this: http://knox.apache.org/books/knox-0-6-0/user-guide.html#Management+of+Security+Artifacts What also should work is, if you simply remove the gateway-identity from the keystore, upon start Knox should create a self-signed certificate for you. Could you share how the certificate was generated? Did you change it after the install? Are you using ntp?
... View more
04-05-2016
02:24 PM
1 Kudo
Can you also provide what you find in /var/log/knox/gateway.log
... View more
04-02-2016
09:32 PM
1 Kudo
Actually this does not quite answer the question, but gives a good hint to dfs.internal.nameservices. The parameter is needed to distinguish between local namservice and other nameservices configured, but does not support distcp between two HA clusters. dfs.internal.nameservices for example is relevant for DNs so they don't register with the other cluster.
To support distcp between multiple HA clusters you simply have to define multiple nameservices like this for an example: <configuration>
<!-- services -->
<property>
<name>dfs.nameservices</name>
<value>serviceId1,serviceId2</value>
</property>
<!-- serviceId2 properties -->
<property>
<name>dfs.client.failover.proxy.provider.nameservices2</name>
<value>org.apache.hadoop.hdfs.server
.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.namenodes.serviceId2</name>
<value>nn201,nn202</value>
</property>
<property>
<name>dfs.namenode.rpc-address.serviceId2.nn201</name>
<value>nn201.pro.net:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.serviceId2.nn201</name>
<value>nn201.pro.net:54321</value>
</property>
<property>
<name>dfs.namenode.http-address.serviceId2.nn201</name>
<value>nn201.pro.net:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.serviceId2.nn201</name>
<value>nn201.prod.com:50470</value>
</property>
<property>
<name>dfs.namenode.rpc-address.serviceId2.nn202</name>
<value>nn202.pro.net:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.serviceId2.nn202</name>
<value>nn202.pro.net:54321</value>
</property>
<property>
<name>dfs.namenode.http-address.serviceId2.nn202</name>
<value>nn202.pro.net:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.serviceId2.nn202</name>
<value>nn202.prod.net:50470</value>
</property>
<!—- serviceId1 -->
<property>
<name>dfs.client.failover.proxy.provider.nameservices1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.
ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.namenodes.nameservices1</name>
<value>nn101,nn102</value>
</property>
<property>
<name>dfs.namenode.rpc-address.serviceId1.nn101</name>
<value>nn101.poc.net:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.serviceId1.nn101</name>
<value>nn101.poc.net:54321</value>
</property>
<property>
<name>dfs.namenode.http-address.serviceId1.nn101</name>
<value>nn101.poc.net:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.serviceId1.nn101</name>
<value>nn101.poc.net:50470</value>
</property>
<property>
<name>dfs.namenode.rpc-address.serviceId1.nn102</name>
<value>nn102.poc.net:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.serviceId1.nn102</name>
<value>nn102.poc.net:54321</value>
</property>
<property>
<name>dfs.namenode.http-address.serviceId1.nn102</name>
<value>nn102.poc.net:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.serviceId1.nn102</name>
<value>nn102.poc.net:50470</value>
</property>
</configuration>
Adding this to the hdfs-site config makes both nameservices serviceId1,serviceId2 available.
... View more
03-24-2016
07:16 PM
1 Kudo
HCatalog does not support writing into a bucketed table. HCat explicitly checks if a table is bucketed, and if so disable storing to it to avoid writing to the table in a destructive way. From HCatOutputFormat: if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) {
throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with bucket definition from Pig/Mapreduce is not supported");
}
... View more
03-22-2016
07:22 PM
1 Kudo
+1 for the aspect to reuse Spark code itself
... View more
03-22-2016
04:17 PM
1 Kudo
In in an advanced architecture you would leverage Zookeeper to announce a new model to the topology without taking it offline.
... View more
03-22-2016
04:14 PM
3 Kudos
You can use PMML (https://de.wikipedia.org/wiki/Predictive_Model_Markup_Language). Spark does support (not all) model to be exported to PMML: http://spark.apache.org/docs/latest/mllib-pmml-model-export.html (UPDATE: As @Simon Elliston Ball rightfully points out in his answer, in case the PMML model is not supported the Spark libs can be reused as most of them have no dependency to the SparkContext) One way could be to use JPMML with Java in Storm: http://henning.kropponline.de/2015/09/06/jpmml-example-random-forest/ https://github.com/jpmml/jpmml-storm The other could be to use R in Storm. I have seen it done, but don't have a reference at hand.
... View more
03-10-2016
09:54 PM
1 Kudo
Thank you for pointing me to that documentation. That helped me with something else.
... View more
03-10-2016
09:53 PM
2 Kudos
I was told that to avoid users from being able to append custom configurations to a config file supports_adding_forbidden can be added to configuration tag like so: <configuration supports_final="true" supports_adding_forbidden="true"> ... </configuration>
... View more
03-10-2016
08:47 AM
2 Kudos
I created a custom service with config files. How do I disable the Custom config section for that config file in Ambari?
... View more
Labels:
- Labels:
-
Apache Ambari
02-20-2016
11:43 PM
1 Kudo
Origin: http://henning.kropponline.de/2015/05/19/hivesink-for-flume/ With the most recent release of HDP (v2.2.4) Hive Streaming is shipped as technical preview. It can for example be used with Storm to ingest streaming data collected from Kafka as demonstrated here. But it also still has some serious limitations and in case of Storm a major bug.
Nevertheless Hive Streaming is likely to become the tool of choice when
it comes to streamline data ingestion to Hadoop. So it is worth to
explore already today. Flume's upcoming release 1.6 will contain a HiveSink
capable of leveraging Hive Streaming for data ingestion. In the
following post we will use it as a replacement for the HDFS sink used in
a previous post here.
Other then replacing the HDFS sink with a HiveSink none of the previous
setup will change, except for Hive table schema which needs to be
adjusted as part of the requirements that currently exist around Hive Streaming. So let's get started by looking into these restrictions. Hive Streaming Limitations The only file format supported is ORC. So the original schema of the stocks table needs to be adjusted to reflect that: DROP TABLE IF EXISTS stocks;
CREATE EXTERNAL TABLE stocks (
date STRING,
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume BIGINT,
adj_close DOUBLE)
PARTITIONED BY(year STRING)
CLUSTERED BY (date) into 3 buckets
STORED AS ORC
LOCATION '/ingest/stocks'; As you can see from the schema the table now also is bucketed, which is required by Hive Streaming. Further more we need to set the following: hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager hive.compactor.initiator.on = true hive.compactor.worker.threads > 0 (eg. 5) Configuration diff as given by Ambari:
Also
important to know is that the current Streaming API only supports
delimited input data (CSV, tab seperated) or JSON (strict syntax). Flume Hive Sink For the Flume Hive Sink the following configurations with their defaults can or must be configured: hive.metastore hive.database hive.table hive.partition hive.txnsPerBatchAsk = 100 batchSize = 15000 serializer (delimited | json) serializer.delimiter = , serializer.fieldnames batchSize = 15000 idleTimeout = 0 callTimeout = 10000 heartBeatInterval = 240 maxOpenConnections useLocalTimeStamp timeZone roundUnit round hour minute second roundValue With the previous example
we can use the following Flume configuration. The batch size and the
transactions per batch are not set very high which probably be different
in a production setup, but is also dependent on the data stream to
expect. flume-hive-ingest.sources = src1
flume-hive-ingest.channels = chan1
flume-hive-ingest.sinks = sink1
flume-hive-ingest.sources.src1.type = spooldir
flume-hive-ingest.sources.src1.channels = chan1
flume-hive-ingest.sources.src1.spoolDir = /vagrant/flume_log
flume-hive-ingest.sources.src1.interceptors = skipHeadI dateI
flume-hive-ingest.sources.src1.interceptors.skipHeadI.type = regex_filter
flume-hive-ingest.sources.src1.interceptors.skipHeadI.regex = ^Date.*
flume-hive-ingest.sources.src1.interceptors.skipHeadI.excludeEvents = true
flume-hive-ingest.sources.src1.interceptors.dateI.type = regex_extractor
flume-hive-ingest.sources.src1.interceptors.dateI.regex = ^(\d+)-.*
flume-hive-ingest.sources.src1.interceptors.dateI.serializers = y
flume-hive-ingest.sources.src1.interceptors.dateI.serializers.y.name = year
flume-hive-ingest.channels.chan1.type = memory
flume-hive-ingest.channels.chan1.capacity = 1000
flume-hive-ingest.channels.chan1.transactionCapacity = 100
flume-hive-ingest.sinks.sink1.type = hive
flume-hive-ingest.sinks.sink1.channel = chan1
flume-hive-ingest.sinks.sink1.hive.metastore = thirft://one.hdp:9083
flume-hive-ingest.sinks.sink1.hive.database = default
flume-hive-ingest.sinks.sink1.hive.table = stocks
flume-hive-ingest.sinks.sink1.hive.partition = %{year}
flume-hive-ingest.sinks.sink1.hive.txnsPerBatchAsk = 2
flume-hive-ingest.sinks.sink1.batchSize = 10
flume-hive-ingest.sinks.sink1.serializer = delimited
flume-hive-ingest.sinks.sink1.serializer.delimiter = ,
flume-hive-ingest.sinks.sink1.serializer.fieldnames = date,open,high,low,close,volume,adj_close Before starting a Flume agent with this configuration you might need to set HIVE_HOME and HCAT_HOME as flume-ng will only put the required Hive jars into the classpath with this logic: add_hive_paths(){
if [ -d "${HIVE_HOME}/lib" ]; then
info "Including Hive libraries found via ($HIVE_HOME) for Hive access"
FLUME_CLASSPATH="$FLUME_CLASSPATH:$HIVE_HOME/lib/*"
fi
if [ -d "${HCAT_HOME}/share/hcatalog" ]; then
info "Including HCatalog libraries found via ($HCAT_HOME) for Hive access"
FLUME_CLASSPATH="$FLUME_CLASSPATH:${HCAT_HOME}/share/hcatalog/*"
fi
} Setting them in my case was pretty straight forward: export HIVE_HOME=/usr/hdp/current/hive-server2
export HCAT_HOME=/usr/hdp/current/hive-webhcat Now we can start the flume agent, obviously after we have created the stocks table: $ hcat -f data/stocks_schema.hive
$ apache-flume-1.6.0-bin/bin/flume-ng agent -f data/flume-file-hive-ingest.conf -n flume-hive-ingest When
working correctly you should be able to see output similar to this,
once you copy the stocks data into the spooling directory: 16/05/15 15:19:18 INFO ql.Driver: OK
16/05/15 15:19:18 INFO log.PerfLogger: <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:18 INFO log.PerfLogger: </PERFLOG method=releaseLocks start=1431703158539 end=1431703158543 duration=4 from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:18 INFO log.PerfLogger: </PERFLOG method=Driver.run start=1431703158452 end=1431703158543 duration=91 from=org.apache.hadoop.hive.ql.Driver>
16/05/15 15:19:20 INFO hive.metastore: Trying to connect to metastore with URI thirft://one.hdp:9083
16/05/15 15:19:20 INFO hive.metastore: Connected to metastore.
16/05/15 15:19:20 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[1743...1744] on endPoint = {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1996] }. Switching to first txn
16/05/15 15:19:20 INFO hive.HiveWriter: Committing Txn 1742 on EndPoint: {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1997] }
16/05/15 15:19:20 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[1745...1746] on endPoint = {metaStoreUri='thirft://one.hdp:9083', database='default', table='stocks', partitionVals=[1997] }. Switching to first txn Troubleshooting If something goes wrong, for example with failing connection to the metastore please: Check the requirements posted here or on the Hive wiki. Also check that your schema is bucketed and read the Exception message carefully. Increase the timeout for the HiveWriter to connect to the Metastore and again read the Exception message carefully. Make hdfs://tmp/hive and file:///tmp/hive writable (eg. chmod 777) A typical error message could look like this: 16/05/15 14:53:39 WARN hive.HiveSink: sink1 : Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:98)
at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='one.hdp:9083', database='default', table='stocks', partitionVals=[${year}] }
at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:320)
at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
... 6 more
Caused by: java.lang.NullPointerException
at org.apache.thrift.transport.TSocket.open(TSocket.java:168)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:358)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:215)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:161)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.getMetaStoreClient(HiveEndPoint.java:448)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:274)
at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:316)
at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:313)
at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:366)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
16/05/15 14:53:39 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. Further Readings Hive Streaming
Flume HiveSink
HDP Flume
Apache Flume: Distributed Log Collection for Hadoop - Second Edition (Amazon) Apache Hive Essentials (Amazon)
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- Flume
- flume-1.6
- hive-streaming
- How-ToTutorial
Labels:
02-20-2016
11:43 PM
2 Kudos
Origin: http://henning.kropponline.de/2015/09/27/storm-serialization-with-avro-using-kryo-serializer/ Working with complex data events can be a challenge designing Storm
topologies for real-time data processing. In such cases emitting single
values for multiple and varying event characteristics soon reveals it's
limitations. For message serialization Storm leverages the Kryo
serialization framework used by many other projects. Kryo keeps a
registry of serializers being used for corresponding Class types.
Mappings in that registry can be overridden or added making the
framework extendable to diverse type serializations. On the other hand Avro is a very popular "data serialization system"
that bridges between many different programming languages and tools.
While the fact that data objects can be described in JSON makes it
really easy to use, Avro is often being used for it's support of schema
evolution. With support for schema evolution the same implementation
(Storm topology) could be capable of reading different versions of the
same data event without adaptation. This makes it a very good fit for
Storm as a intermediator between data ingestion points and data storage
in today's Enterprise Data Architectures. Storm Enterprise Data Architecture The
example here does not provide complex event samples to illustrated that
point, but it gives an end to end implementation of a Storm topology
where events get send to a Kafka queue as Avro objects processesed natively by a real-time processing topology. The example can be found here.
It's a simple Hive Streaming example where stock events are read from a
CSV file and send to Kafka. Stock events are a flat, none complex data
type as already mentioned, but we'll still use it to demo serialization
with using Avro. Deserialization in Storm Before
we look at the beginning, let's start with the end. When we have
everything working properly we should be able to use our defined event
object as such in any bolt part of the topology: Stock stock = (Stock) tuple.getValueByField("myobj_fieldname");
// OR by index //
Stock stock = tuple.getValue(0); As demonstrated we should be
able to cast our object simply from the tuple as it will already be
present in serialized form inside the tuple. Storm will take care of the
serialization for us. Remember Storm internally is using Kryo for
Serialization as described here.
It is using this for all data types in a tuple. To make this work with
our object described in Avro we simply have to register a customer
serializer with Storm's Kryo. The above snippet also concludes, that if we try to get retrieve the date in any other way, for example like this tuple.getBinary(0) we will receive an error. An Exception in such a case could look like this: 2015-09-23 10:52:57 s.AvroStockDataBolt [ERROR] java.lang.ClassCastException: storm_hive_streaming_example.model.Stock cannot be cast to [B
java.lang.ClassCastException: storm_hive_streaming_example.model.Stock cannot be cast to [B
at backtype.storm.tuple.TupleImpl.getBinary(TupleImpl.java:144) ~[storm-core-0.10.0.2.3.0.0-2557.jar:0.10.0.2.3.0.0-2557]
at storm_hive_streaming_example.FieldEmitBolt.execute(FieldEmitBolt.java:34) ~[stormjar.jar:na]
The sample error message clearly stats that our already serialized object simply can not be cast to a binary. So how do we set things up from the start? Spout Scheme Let's
return to the beginning of all, the ingestion of events into a queue
for example. The part being responsible for reading an event of a data
source, like for example a message broker, is known as a Spout to Storm.
Typically we have one spout for a specific data source other than
having single purpose Spouts of a topology. Hence the a spout needs to
be adaptive to the use case and events being issued. Storm uses so
called "Scheme" to configure the data declaration of receiving and
emitting events by the Spout. The Scheme interface declares the methods deserialize(byte[] pojoBytes)
for deserializing the event collected. It returns a list of objects
instead of just one object as one event could potentially be serialized
into several data fields. Here the StockAvroScheme emits the complete Stock object in one field. The second method that needs to be implemented by the Scheme interface is the getOutputFields()
method. This method is responsible for advertising the field definition
to the receiving bolts. As by the implementation below the stock object
gets send in one field. public class StockAvroScheme implements Scheme {
private static final Logger LOG = LoggerFactory.getLogger(Stock.class);
// deserializing the message recieved by the Spout
public List<Object> deserialize(byte[] pojoBytes) {
StockAvroSerializer serializer = new StockAvroSerializer(); // Kryo Serializer
Stock stock = serializer.read(null, new Input(pojoBytes), Stock.class);
List<Object> values = new ArrayList<>();
values.add(0, stock);
return values;
}
// defining the output fields of the Spout
public Fields getOutputFields() {
return new Fields(new String[]{ FieldNames.STOCK_FIELD });
}
} This Scheme can be as illustrated below by the YAML topology configuration using Storm Flux: components:
# defines a scheme for the spout to emit a Stock.class object
- id: "stockAvroScheme"
className: "storm_hive_streaming_example.serializer.StockAvroScheme"
# adding the defined stock scheme to the multi-scheme that can be assigned to the spout
- id: "stockMultiScheme"
className: "backtype.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stockAvroScheme"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${hive-streaming-example.zk.hosts}"
# configuring the spout to read bytes from Kafka and emit Stock.class
- id: "stockSpoutConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts" # brokerHosts
- "${hive-streaming-example.kafka.topic}" # topic
- "${hive-streaming-example.kafka.zkRoot}" # zkRoot
- "${hive-streaming-example.kafka.spoutId}" # id
properties:
- name: "scheme"
ref: "stockMultiScheme" # use the stock scheme previously defined Last but not least we still need to register our customer serializer with Storm. Registering the Serializer Tuples
are send to Spouts and Bolts running in a separate JVMs either on the
same or on a remote host. In case of sending the tuple it needs to get
serialized and deserialized prior to placing the tuple on the the output
collector. For the serialization Storm uses Kryo Serializer. In
order to use a custom Serializer implementation it needs to get
registered with the Kryo instance being used by Strom. This can be done
as part of the topology configuration. Here is the configuration
definition using Storm Flux: name: "hive-streaming-example-avro-scheme"
config:
topology.workers: 1
# define serializers being used by tuples de-/serializing values. See http://storm.apache.org/documentation/Serialization.html
topology.kryo.register:
- storm_hive_streaming_example.model.Stock: storm_hive_streaming_example.serializer.StockAvroSerializer With this registration of the custom Kryo Serializer the AvroStockDataBolt can simply cast the Stock object from the tuple value emit it to the FieldEmitBolt, which decomposes the Stock instance into separate field being used by the HiveBolt. Having the AvroStockDataBolt and FieldEmitBolt
would not make sense in a real implementation as the Scheme could
obviously already be configured to do all that - deserialize and emit
fields to the HiveBolt. Having these two extra bolts is just for demonstration purposes. Finally the custom Kryo Serializer which implements a write(Kryo kryo, Output output, Stock object) and read(Kryo kryo, Input input, Class<Stock> type). Having a general implementation of generic Avro types would be ideal. public class StockAvroSerializer extends Serializer<Stock> {
private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
private Schema SCHEMA = Stock.getClassSchema();
public void write(Kryo kryo, Output output, Stock object) {
DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
try {
writer.write(object, encoder);
encoder.flush();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
IOUtils.closeQuietly(out);
byte[] outBytes = out.toByteArray();
output.writeInt(outBytes.length, true);
output.write(outBytes);
}
public Stock read(Kryo kryo, Input input, Class<Stock> type) {
byte[] value = input.getBuffer();
SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
Stock record = null;
try {
record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
} catch (IOException e) {
LOG.error(e.toString(), e);
}
return record;
}
} Further Readings Storm Serialization
Storm Hive Streaming Example (Github) Storm Flux
Avro Specification
Kafka Storm Starter (Github) Kryo Serializable
http://www.confluent.io/blog/stream-data-platform-2/
Simple Example Using Kryo
Storm Blueprints: Patterns for Distributed Realtime Computation (Amazon) Storm Applied: Strategies for Real-Time Event Processing (Amazon)
... View more
- Find more articles tagged with:
- Avro
- Data Ingestion & Streaming
- How-ToTutorial
- Storm
Labels:
02-20-2016
09:54 AM
3 Kudos
Repo DescriptionRepo Info Github Repo URL https://github.com/hkropp/storm-hive-streaming-example Github account name hkropp Repo name storm-hive-streaming-example
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- examples
- hive-streaming
- Storm
- utilities
Labels:
02-09-2016
08:36 PM
1 Kudo
Hi, I can't remember how or if I solved it 😉 but could you try to
configure the Hive View with manual configuration instead of the option
to get the cluster configuration? Let me know if this works. Thanks!
... View more
02-09-2016
08:36 PM
1 Kudo
Hi, I can't remember how or if I solved it 😉 but could you try to configure the Hive View with manual configuration instead of the option to get the cluster configuration? Let me know if this works. Thanks!
... View more