Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka to HBase streaming

Kafka to HBase streaming

New Contributor

Hello Guys,

The use case which I am working on to stream Kafka records from HBase using a scala code in spark. And a table is created in the HBase before running the Spark application in spark shell. Now, I wanted to stream the same records to HBase from Kafka without specifying the Columns and Column family in the code. And is there a way to automate the code, Even posting the Scala code which I am using to stream the records to HBase. Where CUST is the column family and CUST_*** are columns.

val ssc = new StreamingContext(sc, Seconds(3))

val kafkaParams = Map[String, Object]

( "bootstrap.servers" -> "1**.**.*.**:***7",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"" -> "testkafkatohase25",

"auto.offset.reset" -> "latest",

"" -> (false: java.lang.Boolean)


val topics = Array("*****")

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)

val lines =



rdd.foreachPartition(iter => {

val hConf = HBaseConfiguration.create()

val hTable = new HTable(hConf, "*****")

iter.foreach(record => {

val str1 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(0).split(":")(1)

val str2 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(1).split(":")(1)

val str3 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(2).split(":")(1)

val str4 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(3).split(":")(1)

val str5 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(4).split(":")(1)

val str6 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(5).split(":")(1)

val str7 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(6).split(":")(1)

val str8 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(7).split(":")(1)

val str9 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(8).split(":")(1)

val str10 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(9).split(":")(1)

val str11 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(10).split(":")(1).split("}")(0)

val id_con = str1

val id = id_con.toString

val thePut = new Put(Bytes.toBytes(id))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str1))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str2))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str3))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str4))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str5))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str6))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_A****"), Bytes.toBytes(str7))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str8))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str9))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str10))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str11))








Re: Kafka to HBase streaming

Expert Contributor

If you are trying to stream records from Kafka to HBase, I'd recommend giving Apache NiFi a look. It's part of our stack and is geared towards moving data from one place to another.

Don't have an account?
Coming from Hortonworks? Activate your account here