Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (2)
New Contributor

When Storm developers want to share data across multiple bolts or cache a large amount of state information in a single bolt, one of the common choices is to use HBase. In a unsecured HDP cluster, the related code in the HBase Bolt is very intuitive:

public class myHBaseBolt implements IRichBolt {
	...
	private OutputCollector collector;
	private Connection connection;
	private Table myTable;
	...
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector; 
		try {
			this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
			this.myTable = connection.getTable(TableName.valueOf(MY_TABLE_NAME));
			...
		} catch (Exception e) {
			...
		}
	}
}

However, in a Kerberized HDP cluster, we need to configure lots of information in our code to enable secured connection. First, we need to configure storm keytab and principal for the hbase client in storm bolt. For example, in the myTopology.java code:

Map<String, Object> mapHbase = new HashMap<String,Object>();
mapHbase.put("storm.keytab.file","/your/storm/keytab/path");
mapHbase.put("storm.kerberos.principal","yourStormPrincipalName"));         

Config conf = new Config();        
conf.put("hbase.config", mapHbase);
StormSubmitter.submitTopology("myTopology", conf, builder.createTopology());

Second, in the hbase bolt, we need to use the keytabs information to set up secured connection.

public class myHBaseBolt implements IRichBolt {
	...
	private OutputCollector collector;
	private Connection connection;
	private Table myTable;


	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector; 
		try {
			final Configuration hbConfig = HBaseConfiguration.create();
			Map<String, Object> conf = (Map<String, Object>) stormConf.get(this.configKey);
	        for(String key : conf.keySet()) {
	            hbConfig.set(key, String.valueOf(conf.get(key)));	
	        }
	        this.provider = HBaseSecurityUtil.login(conf, hbConfig);
	        this.connection = 
	        provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<Connection>(){
				@Override
				public Connection run() throws Exception {
					return ConnectionFactory.createConnection(hbConfig);
				}
	        });	        
	        
	        this.myTable = 
	        provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<Table>(){
				@Override
				public Table run() throws Exception {
					return connection.getTable(TableName.valueOf(MY_TABLE_NAME));
				}
	        });
			...
		} catch (Exception e) {
			...
		}
	}
}

Basically, what we are doing here is to log in hbase through storm with HBaseSecurityUtil.login(), then create connections as service users.

1,128 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎05-06-2016 11:41 PM
Updated by:
 
Contributors
Top Kudoed Authors