Member since
05-28-2019
46
Posts
16
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
6891 | 01-04-2017 07:03 PM | |
1433 | 01-04-2017 06:00 PM |
10-02-2017
07:52 PM
3 Kudos
The use case is to get the updates using timestamp column from multiple Databases and do some simple transformations in nifi and stream the data into hive transactional tables using puthivestreaming processor in nifi. If not careful, this can easily lead to platform wide instability for hive.
ACID is for slowly changing tables and not for 100s of concurrent queries trying to update the same partition. ACID tables are bucketed tables. Have correct number of buckets and have uniform data distribution among buckets. Can easily lead to data skew and only one CPU core writes to a single bucket. Transactional manager and lock manager are stored in hive metastore. Transactional manager keeps the transactional state(open, commit and abort) and lock manager maintains the necessary locks for transactional tables. The recommendation is to separate Hive, oozie and ambari database and configure high availability for databases. Nifi can overwhelm hive ACID tables. Nifi will stream data using hive streaming API available with puthivestreaming processor. Default value for timer driven scheduling in nifi processors is 0 which will cause a hit on the hive metastore. The recommendation is to microbatch the data from nifi with scheduling time around 1 min or more(higher the better). Batching 5000-10000 records gave the best throughput. Compaction of the table necessary for ACID read performance tuning. Compaction can be automatic or on-demand. Make sure to enable email alerting on hive database when the count reached a threshold of around 100,000 transactions. hive=# select count(*) from TXNS where txn_state='a' and TXN_ID not in (select tc_txnid from txn_components);
count
------
3992
(1 row) One of the data source was overwhelming the metastore. After proper batching and scheduling, the metastore was able to clean by itself. Optimize the hive metastore DB for performance tuning. 10/02/2017 (7pm):
hive=# select txn_user, count(*) from txns where txn_state='a' group by txn_user ;
txn_user | count
hive | 73763
nifi | 1241297
(2 rows)
10/02/2017 (9am):
hive=# select txn_user, count(*) from txns where txn_state='a' group by txn_user ;
txn_user | count
hive | 58794
nifi | 26962
(2 rows)
... View more
Labels:
09-01-2017
01:43 AM
This article helps with configuration of HDF processors to integrate with HDP components
Integration for puthivestreaming processor:
Pre-requisites: In HDP ambari, enable the below properties for hive.
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true
hive.compactor.worker.threads > 0
BUG: HiveStreaming processor not picking the updated hive-meta store principal from hive-site. xml
Resolution:
1. Copy hive-site.xml, core-site.xml, hdfs-site.xml to the conf directory of NiFi
2. Clear the Hive Configuration Resources property
3. Create an ExecuteStream processor on the canvas scheduled to run as often as the ticket needs to be refreshed (every hour should do for most setups) with the following groovy script, replacing nifi@HDF.COM with your principal and /etc/nifi.headless.keytab with your keytab
Steps: Create /etc/hdp/tgt.groovy and copy the code below into the file on all nifi nodes.
Note: Change the kerberos principal and keytab location as necessary
import org.apache.nifi.nar.NarClassLoader
import org.apache.nifi.nar.NarClassLoaders NarClassLoaders.instance.extensionClassLoaders.each { c ->
if (c instanceof NarClassLoader && c.workingDirectory. absolutePath.contains('nifi-hive')) {
def originalClassloader = Thread.currentThread().
getContextClassLoader();
Thread.currentThread().setContextClassLoader(c); try {
def configClass = c.loadClass('org.apache.hadoop.conf. Configuration', true)
def hiveConfigurator = c.loadClass('org.apache.nifi.util. hive.HiveConfigurator', true).newInstance();
def config = hiveConfigurator.getConfigurationFromFiles('')
hiveConfigurator.preload(config)
c.loadClass('org.apache.hadoop.security.UserGroupInformation'
, true).getMethod('setConfiguration', configClass).invoke(null, config)
c.loadClass('org.apache.hadoop.security.UserGroupInformation' , true).getMethod('loginUserFromKeytab', String.class, String.clas s).invoke(null, 'nifi@HDF.NET', '/etc/security/keytabs/nifi. headless.keytab')
log.info('Successfully logged in')
session.transfer(session.create(), REL_SUCCESS) } catch (Exception e) {
log.error('Unable to login with keytab', e)
session.transfer(session.create(), REL_FAILURE) } finally {
Thread.currentThread().setContextClassLoader
(originalClassloader);
} }
}
Do the following on all the nifi nodes,
chown nifi:nifi /etc/hdp/tgt.groovy; chmod +x /etc/hdp/tgt.groovy;
cp /etc/hdp/hive-site.xml /etc/nifi/conf/; cp /etc/hdp/core-site.xml /etc/nifi/conf/; cp /etc/hdp /hdfs-site.xml /etc/nifi/conf/;
chown nifi:nifi /etc/nifi/conf/hive-site.xml /etc/nifi/conf/core-site.xml /etc/nifi/conf/hdfs-site. xml;
Integration for hbase processor:
Add hostname and IP of all HDP nodes to /etc/hosts file in all nifi nodes
<Hostname1> <IpAddress1>
<Hostname2> <IpAddress2>
Integration for kafka processor:
Create a file under the location "/etc/hdp/zookeeper-jaas.conf" and copy the code below
Note: Change the kerberos principal and keytab location as necessary
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/nifi.headless.keytab" storeKey=true
useTicketCache=false
principal="nifi@HDF.COM";
};
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true
renewTicket=true
serviceName="kafka"
useKeyTab=true keyTab="/etc/security/keytabs/nifi.headless.keytab" principal="nifi@HDF.COM";
};
Add below configuration to advanced nifi-bootstrap.conf in HDP ambari and restart the nifi service.
java.arg.20=-Djava.security.auth.login.config=/etc/hdp/zookeeper-jaas.conf
... View more
Labels:
08-22-2017
11:14 PM
I have tested my mac laptop in kerberized client clusters, so it is not kerberos client on local mac. krb5.conf is copied from kdc server. I copied the storm headless keytab from sandbox to my local.
... View more
08-22-2017
11:11 PM
krb5kdc requires 88 port only and it is open. I dont do kadmin stuff, so I didnt check on 749. It looks like a network problem between local and VM to me as well.
... View more
08-22-2017
01:40 PM
The sandbox services are working fine after kerberos setup. I am able to obtain tickets on the VM. The problem is currently with the obtaining a ticket on local mac to authenticate various UI's. I am using a headless keytab to obtain ticket on local mac. Error while obtaining ticket on local mac,
HW:Downloads nbalaji-elangovan$ env KRB5_TRACE=/dev/stdout kinit -kt /Users/nbalaji-elangovan/Downloads/storm.headless.keytab storm-sandbox@EXAMPLE.COM
2017-08-22T09:29:28 set-error: -1765328242: Reached end of credential caches
2017-08-22T09:29:28 set-error: -1765328243: Principal storm-sandbox@EXAMPLE.COM not found in any credential cache
2017-08-22T09:29:28 set-error: -1765328234: Encryption type des-cbc-md5-deprecated not supported
2017-08-22T09:29:28 Adding PA mech: ENCRYPTED_CHALLENGE
2017-08-22T09:29:28 Adding PA mech: ENCRYPTED_TIMESTAMP
2017-08-22T09:29:28 krb5_get_init_creds: loop 1
2017-08-22T09:29:28 KDC sent 0 patypes
2017-08-22T09:29:28 fast disabled, not doing any fast wrapping
2017-08-22T09:29:28 Trying to find service kdc for realm EXAMPLE.COM flags 0
2017-08-22T09:29:28 configuration file for realm EXAMPLE.COM found
2017-08-22T09:29:28 submissing new requests to new host
2017-08-22T09:29:28 connecting to host: udp ::1:kerberos (sandbox-hdf.hortonworks.com) tid: 00000001
2017-08-22T09:29:28 Queuing host in future (in 3s), its the 2 address on the same name: udp 127.0.0.1:kerberos (sandbox-hdf.hortonworks.com) tid: 00000002
2017-08-22T09:29:28 writing packet: udp ::1:kerberos (sandbox-hdf.hortonworks.com) tid: 00000001
2017-08-22T09:29:28 reading packet: udp ::1:kerberos (sandbox-hdf.hortonworks.com) tid: 00000001
2017-08-22T09:29:28 host completed: udp ::1:kerberos (sandbox-hdf.hortonworks.com) tid: 00000001
2017-08-22T09:29:28 set-error: -1765328378: Client unknown
2017-08-22T09:29:28 krb5_sendto_context EXAMPLE.COM done: 0 hosts 1 packets 1 wc: 0.067794 nr: 0.001019 kh: 0.000823 tid: 00000002
2017-08-22T09:29:28 krb5_get_init_creds: loop 2
2017-08-22T09:29:28 krb5_get_init_creds: processing input
2017-08-22T09:29:28 krb5_get_init_creds: got an KRB-ERROR from KDC
2017-08-22T09:29:28 set-error: -1765328378: Client (storm-sandbox@EXAMPLE.COM) unknown
2017-08-22T09:29:28 krb5_get_init_creds: KRB-ERROR -1765328378/Client (storm-sandbox@EXAMPLE.COM) unknown
kinit: krb5_get_init_creds: Client (storm-sandbox@EXAMPLE.COM) unknown
krb5.conf on local mac,
HW:Downloads nbalaji-elangovan$ cat /etc/krb5.conf
[libdefaults]
renew_lifetime = 7d
forwardable = true
default_realm = EXAMPLE.COM
ticket_lifetime = 24h
dns_lookup_realm = false
dns_lookup_kdc = false
default_ccache_name = /tmp/krb5cc_%{uid}
#default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5
#default_tkt_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5
[logging]
default = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
kdc = FILE:/var/log/krb5kdc.log
[realms]
EXAMPLE.COM = {
admin_server = sandbox-hdf.hortonworks.com
kdc = sandbox-hdf.hortonworks.com
}
/etc/hosts file on local mac, HW:Downloads nbalaji-elangovan$ cat /etc/hosts
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting. Do not change this entry.
##
127.0.0.1 localhost
#127.0.0.1 sandbox.hortonworks.com
255.255.255.255 broadcasthost
::1 localhost
127.0.0.1 localhost sandbox-hdf.hortonworks.com
#172.17.0.2 sandbox-hdf.hortonworks.com
## vagrant-hostmanager-start id: e133f37a-cb74-4747-9f85-151944869ced
#192.168.66.121 node1
## vagrant-hostmanager-end
... View more
Labels:
- Labels:
-
Apache Storm
-
Kerberos
03-13-2017
04:55 AM
Worked. Thanks
... View more
03-12-2017
05:12 AM
That doesnt work, I tried it
... View more
03-12-2017
04:51 AM
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.read \
.format("org.apache.phoenix.spark") \
.option("table", "TABLE1") \
.option("zkUrl", "<host>:2181:/hbase-secure") \
.load()
print(df)
Run using: spark-submit --master local --jars /usr/hdp/current/phoenix-client/phoenix-client.jar,/usr/hdp/current/phoenix-client/lib/phoenix-spark-4.7.0.2.5.3.0-37.jar --conf "spark.executor.extraClassPath=/usr/hdp/current/phoenix-client/phoenix-client.jar" spark_phoenix.py Error: 17/03/11 23:20:03 INFO ZooKeeper: Initiating client connection, connectString=<host>:2181 sessionTimeout=90000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@653a27b2
17/03/11 23:20:03 INFO ClientCnxn: Opening socket connection to server <host>/10.18.106.4:2181. Will not attempt to authenticate using SASL (unknown error)
17/03/11 23:20:03 INFO ClientCnxn: Socket connection established to <host>/10.18.106.4:2181, initiating session
17/03/11 23:20:03 INFO ClientCnxn: Session establishment complete on server <host>/10.18.106.4:2181, sessionid = 0x15a7b6220540b7e, negotiated timeout = 40000
Traceback (most recent call last):
File "/d3/app/bin/spark_phoenix.py", line 10, in <module>
.option("zkUrl", "<host>:2181:/hbase-secure") \
File "/usr/hdp/2.5.3.0-37/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 139, in load
File "/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/hdp/2.5.3.0-37/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/usr/hdp/2.5.3.0-37/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o43.load.
: java.sql.SQLException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Sat Mar 11 23:20:03 EST 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68236: row 'SYSTEM:CATALOG,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=usor7dhc01w06.use.ucdp.net,16020,1488988574688, seqNum=0
at org.apache.phoenix.query.ConnectionQueryServicesImpl$13.call(ConnectionQueryServicesImpl.java:2590)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$13.call(ConnectionQueryServicesImpl.java:2327)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2327)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:233)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:142)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:202)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:98)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:57)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:45)
at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:277)
at org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:106)
at org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:57)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
... View more
Labels:
- Labels:
-
Apache Phoenix
-
Apache Spark
02-20-2017
08:11 PM
1 Kudo
How to submit a python spark job with kerberos keytab and principal ?
... View more