Created 07-17-2018 10:17 PM
Hello Guys,
The use case which I am working on to stream Kafka records from HBase using a scala code in spark. And a table is created in the HBase before running the Spark application in spark shell. Now, I wanted to stream the same records to HBase from Kafka without specifying the Columns and Column family in the code. And is there a way to automate the code, Even posting the Scala code which I am using to stream the records to HBase. Where CUST is the column family and CUST_*** are columns.
val ssc = new StreamingContext(sc, Seconds(3))
val kafkaParams = Map[String, Object]
( "bootstrap.servers" -> "1**.**.*.**:***7",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "testkafkatohase25",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("*****")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
val lines = stream.map(_.value())
lines.print()
lines.foreachRDD{rdd=>
rdd.foreachPartition(iter => {
val hConf = HBaseConfiguration.create()
val hTable = new HTable(hConf, "*****")
iter.foreach(record => {
val str1 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(0).split(":")(1)
val str2 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(1).split(":")(1)
val str3 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(2).split(":")(1)
val str4 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(3).split(":")(1)
val str5 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(4).split(":")(1)
val str6 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(5).split(":")(1)
val str7 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(6).split(":")(1)
val str8 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(7).split(":")(1)
val str9 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(8).split(":")(1)
val str10 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(9).split(":")(1)
val str11 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(10).split(":")(1).split("}")(0)
val id_con = str1
val id = id_con.toString
val thePut = new Put(Bytes.toBytes(id))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str1))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str2))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str3))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str4))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str5))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str6))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_A****"), Bytes.toBytes(str7))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str8))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str9))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str10))
thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str11))
hTable.put(thePut);
})
})
}
ssc.start()
Thanks
Created 07-20-2018 07:54 PM
If you are trying to stream records from Kafka to HBase, I'd recommend giving Apache NiFi a look. It's part of our stack and is geared towards moving data from one place to another.