Member since
10-09-2015
7
Posts
20
Kudos Received
0
Solutions
09-26-2016
08:19 AM
2 Kudos
In some application use cases, developers want to save Spark DataFrame table directly into Phoenix instead of saving into HBase as a intermediate step. In those case, we can use Apache Phoenix-Spark plugin package. The related api is very simple: df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "OUTPUT_TABLE",
"zkUrl" -> "****:2181:/****"))
However, we need to pay attention that in Apache Phoenix, all the column names by default are considered as uppercase unless you surround it with quotation marks "". Therefore, if you have specified lowercase column name in your Phoenix Schema, you have to do some column names transformation in Spark. The example code is as follows: val oldNames = df.columns
val newNames = oldNames.map(name => col(name).as("\"" + name + "\""))
val df2 = df.select(newNames:_*)
... View more
Labels:
09-06-2016
11:57 PM
1 Kudo
It is a very common operation to do prefix scan in HBase. For example, when reading HBase table from HBase, we may use the following table scan api: val prefixFilter = new PrefixFilter(prefix)
val scan: Scan = new Scan()
scan.setFilter(prefixFilter)
However, the code above may appear to be very slow when scanning a large HBase table. The reason is: we need to set StartRow before using PrefixFilter. Without setting the start row properly, your HBase scan may have to begin with the very first region and waste lots of time to get to the first right place. The recommended way is to use setRowPrefixFilter(byte[] rowPrefix), from its source code below, we can see that it helps us to set up the start row before doing table scan. 409 public Scan setRowPrefixFilter(byte[] rowPrefix) {
410 if (rowPrefix == null) {
411 setStartRow(HConstants.EMPTY_START_ROW);
412 setStopRow(HConstants.EMPTY_END_ROW);
413 } else {
414 this.setStartRow(rowPrefix);
415 this.setStopRow(calculateTheClosestNextRowKeyForPrefix(rowPrefix));
416 }
417 return this;
418 } In addition, if you want to load HBase table into Spark, you can also use the Spark-HBase connector, which support Spark accessing HBase table as external data source. The method buildScan() can do hbase table scan and return RDD as result. Its related source code is here. Thanks to Weiqing Yang and Ted Yu for the kind help.
... View more
Labels:
05-06-2016
11:41 PM
5 Kudos
When Storm developers want to share data across multiple bolts or cache a large amount of state information in a single bolt, one of the common choices is to use HBase. In a unsecured HDP cluster, the related code in the HBase Bolt is very intuitive: public class myHBaseBolt implements IRichBolt {
...
private OutputCollector collector;
private Connection connection;
private Table myTable;
...
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
this.myTable = connection.getTable(TableName.valueOf(MY_TABLE_NAME));
...
} catch (Exception e) {
...
}
}
} However, in a Kerberized HDP cluster, we need to configure lots of information in our code to enable secured connection. First, we need to configure storm keytab and principal for the hbase client in storm bolt. For example, in the myTopology.java code: Map<String, Object> mapHbase = new HashMap<String,Object>();
mapHbase.put("storm.keytab.file","/your/storm/keytab/path");
mapHbase.put("storm.kerberos.principal","yourStormPrincipalName"));
Config conf = new Config();
conf.put("hbase.config", mapHbase);
StormSubmitter.submitTopology("myTopology", conf, builder.createTopology());
Second, in the hbase bolt, we need to use the keytabs information to set up secured connection. public class myHBaseBolt implements IRichBolt {
...
private OutputCollector collector;
private Connection connection;
private Table myTable;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
final Configuration hbConfig = HBaseConfiguration.create();
Map<String, Object> conf = (Map<String, Object>) stormConf.get(this.configKey);
for(String key : conf.keySet()) {
hbConfig.set(key, String.valueOf(conf.get(key)));
}
this.provider = HBaseSecurityUtil.login(conf, hbConfig);
this.connection =
provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<Connection>(){
@Override
public Connection run() throws Exception {
return ConnectionFactory.createConnection(hbConfig);
}
});
this.myTable =
provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<Table>(){
@Override
public Table run() throws Exception {
return connection.getTable(TableName.valueOf(MY_TABLE_NAME));
}
});
...
} catch (Exception e) {
...
}
}
}
Basically, what we are doing here is to log in hbase through storm with HBaseSecurityUtil.login(), then create connections as service users.
... View more
Labels:
01-05-2016
07:18 PM
2 Kudos
In a Kerberized HDP cluster, I use FreeIPA as LDAP and run HDP 2.3.2 including Storm, Kafka and Ranger. After creating a storm topology as a normal user, I keep getting the following runtime error: 2015-12-30 18:31:53.274 o.a.c.ConnectionState [ERROR] Authentication failed
...
2015-12-30 18:31:53.286 o.a.z.ClientCnxn [WARN] SASL configuration failed: javax.security.auth.login.LoginException: No password provided Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
...
2015-12-30 18:31:53.328 b.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode ... This problem comes from Kafka and Ranger. As a normal failure recovery mechanism, Kafka will keep creating the topic if it cannot find the designated topic mentioned in the Kafka producer. In a kerberized environment, this CREATE request will be sent to Ranger for approval. However, in HDP 2.3.2, Ranger 0.5.0.2.3 cannot understand/recognize the CREATE action from Kafka. Therefore, this request is blocked. This problem in Ranger is fixed in the latest HDP 2.3.4. In order to solve this problem temporarily, you only need to restart Kafka. It also helps to have a more powerful cluster and larger memory. This solution is still valid in HDP 2.5. Thanks to Sumit Mohanty, Madhan Neethiraj and Sriharsha Chintalapani for the kind help!
... View more
Labels:
10-28-2015
10:55 PM
1 Kudo
Or if storm is accessing Hbase via Phoenix user can add phoenix dependencies to the project.
... View more