Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Explorer

When Storm developers want to share data across multiple bolts or cache a large amount of state information in a single bolt, one of the common choices is to use HBase. In a unsecured HDP cluster, the related code in the HBase Bolt is very intuitive:

public class myHBaseBolt implements IRichBolt {
	...
	private OutputCollector collector;
	private Connection connection;
	private Table myTable;
	...
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector; 
		try {
			this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
			this.myTable = connection.getTable(TableName.valueOf(MY_TABLE_NAME));
			...
		} catch (Exception e) {
			...
		}
	}
}

However, in a Kerberized HDP cluster, we need to configure lots of information in our code to enable secured connection. First, we need to configure storm keytab and principal for the hbase client in storm bolt. For example, in the myTopology.java code:

Map<String, Object> mapHbase = new HashMap<String,Object>();
mapHbase.put("storm.keytab.file","/your/storm/keytab/path");
mapHbase.put("storm.kerberos.principal","yourStormPrincipalName"));         

Config conf = new Config();        
conf.put("hbase.config", mapHbase);
StormSubmitter.submitTopology("myTopology", conf, builder.createTopology());

Second, in the hbase bolt, we need to use the keytabs information to set up secured connection.

public class myHBaseBolt implements IRichBolt {
	...
	private OutputCollector collector;
	private Connection connection;
	private Table myTable;


	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector; 
		try {
			final Configuration hbConfig = HBaseConfiguration.create();
			Map<String, Object> conf = (Map<String, Object>) stormConf.get(this.configKey);
	        for(String key : conf.keySet()) {
	            hbConfig.set(key, String.valueOf(conf.get(key)));	
	        }
	        this.provider = HBaseSecurityUtil.login(conf, hbConfig);
	        this.connection = 
	        provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<Connection>(){
				@Override
				public Connection run() throws Exception {
					return ConnectionFactory.createConnection(hbConfig);
				}
	        });	        
	        
	        this.myTable = 
	        provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<Table>(){
				@Override
				public Table run() throws Exception {
					return connection.getTable(TableName.valueOf(MY_TABLE_NAME));
				}
	        });
			...
		} catch (Exception e) {
			...
		}
	}
}

Basically, what we are doing here is to log in hbase through storm with HBaseSecurityUtil.login(), then create connections as service users.

1,706 Views