Support Questions
Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

Kafka to HBase streaming

Hello Guys,

The use case which I am working on to stream Kafka records from HBase using a scala code in spark. And a table is created in the HBase before running the Spark application in spark shell. Now, I wanted to stream the same records to HBase from Kafka without specifying the Columns and Column family in the code. And is there a way to automate the code, Even posting the Scala code which I am using to stream the records to HBase. Where CUST is the column family and CUST_*** are columns.

val ssc = new StreamingContext(sc, Seconds(3))

val kafkaParams = Map[String, Object]

( "bootstrap.servers" -> "1**.**.*.**:***7",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "testkafkatohase25",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val topics = Array("*****")

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)

val lines = stream.map(_.value())

lines.print()

lines.foreachRDD{rdd=>

rdd.foreachPartition(iter => {

val hConf = HBaseConfiguration.create()

val hTable = new HTable(hConf, "*****")

iter.foreach(record => {

val str1 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(0).split(":")(1)

val str2 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(1).split(":")(1)

val str3 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(2).split(":")(1)

val str4 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(3).split(":")(1)

val str5 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(4).split(":")(1)

val str6 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(5).split(":")(1)

val str7 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(6).split(":")(1)

val str8 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(7).split(":")(1)

val str9 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(8).split(":")(1)

val str10 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(9).split(":")(1)

val str11 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(10).split(":")(1).split("}")(0)

val id_con = str1

val id = id_con.toString

val thePut = new Put(Bytes.toBytes(id))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str1))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str2))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str3))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str4))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str5))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str6))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_A****"), Bytes.toBytes(str7))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str8))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str9))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str10))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str11))

hTable.put(thePut);

})

})

}

ssc.start()

Thanks

1 REPLY 1

Expert Contributor

If you are trying to stream records from Kafka to HBase, I'd recommend giving Apache NiFi a look. It's part of our stack and is geared towards moving data from one place to another.