Support Questions

Find answers, ask questions, and share your expertise

newAPIHadoopRDD Spark API doesn't retrieve unflushed data written to Hbase table

avatar
New Contributor

Reading from an HBase table with a few hundred records that haven't been persisted (flushed) to HDFS doesn't show up in Spark. However, the records become visible after forced flush via Hbase shell or system triggered flush (when size of Memstore crosses the configured threshold), and anything written after initial flush is immediately visible in Spark. Additionally, records in Memstore before flush are immediately visible to Hbase-client's scan api.

Note: We cannot use spark-hbase connector as schema is unknown beforehand. Hence, we resort to use newAPIHadoopRDD API

Spark read API

@transient val conf : Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
conf.set(TableInputFormat.INPUT_TABLE, "test")
spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


We have tried passing custom scan object setting/unsetting several flags but none of them work. Below, we try to force Hbase Scan Api to read from replica-0 but to no avail.

// Function to convert Scan object to a base64 string
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
@transient val scan = new Scan()
scan.setReplicaId(0)
@transient val scanStr = convertScanToString(scan)
@transient val conf : Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
conf.set(TableInputFormat.INPUT_TABLE, "test")
spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

 

Following Hbase client read API retrieves records stored in Memstore before flush as expected.

val table = connection.getTable(TableName.valueOf(Bytes.toBytes("test")))
val scan = table.getScanner(new Scan())
scan.asScala.foreach(result => {
println(result)
})

 

1 REPLY 1

avatar
Community Manager

@Manju1993 Welcome to our community! To help you get the best possible answer, I have tagged in our Spark experts @RangaReddy @Babasaheb  who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.


Cy Jervis, Manager, Community Program
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.