Support Questions
Find answers, ask questions, and share your expertise

Kafka to HBase Spark Streaming

Hello Guys,

I am working on a use case, I have a data source from which I am getting JSON data to kafka topics. And I am using a Scala consumer code running in Spark shell to stream those records from Kafka topics and send them to the HBase. Where the use case I am working entire thing is in the Hortonworks environment. After running the scala consumer code I can see the records printing in spark shell, but not reaching to the HBase and also not throwing any error. Where I have spit the JSON string to the substring and using PUT method I have trying to send the records into each column. Where CUST is the given column family and the CUST_**** are the columns. And also table is created prior in the HBase.

code:

val ssc = new StreamingContext(sc, Seconds(3))

val kafkaParams = Map[String, Object]

( "bootstrap.servers" -> "1**.**.*.**:***7",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "testkafkatohase25",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val topics = Array("*****")

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)

val lines = stream.map(_.value())

lines.print()

lines.foreachRDD{rdd=>

rdd.foreachPartition(iter => {

val hConf = HBaseConfiguration.create()

val hTable = new HTable(hConf, "*****")

iter.foreach(record => {

val str1 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(0).split(":")(1)

val str2 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(1).split(":")(1)

val str3 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(2).split(":")(1)

val str4 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(3).split(":")(1)

val str5 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(4).split(":")(1)

val str6 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(5).split(":")(1)

val str7 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(6).split(":")(1)

val str8 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(7).split(":")(1)

val str9 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(8).split(":")(1)

val str10 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(9).split(":")(1)

val str11 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(10).split(":")(1).split("}")(0)

val id_con = str1

val id = id_con.toString

val thePut = new Put(Bytes.toBytes(id))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str1))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str2))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str3))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str4))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str5))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str6))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_A****"), Bytes.toBytes(str7))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str8))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str9))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str10))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str11))

hTable.put(thePut);

})

})

}

ssc.start()

4 REPLIES 4

@sohan kanamarlapudi

Which connector are you using for hbase? Have you consider shc?

https://github.com/hortonworks-spark/shc

I recently did something similar, stream data from kafka to hbase (but I used python instead) - Here is the github link if you like to review:

https://github.com/felixalbani/future-of-data-santiago-e1-spark-nifi

On the above ^ I used shc which is very easy to use and worked just fine for me.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Hello Guys,

And these are the logs which I have found in Hbase:

java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125) 2018-07-09 21:50:58,047 INFO [regionserver/hortonworks1.*****.com/172.**.*.**:16020-SendThread(hortonworks1.******.com:2181)] zookeeper.ClientCnxn: Opening socket connection to server hortonworks1.******.com/172.**.*.**:2181. Will not attempt to authenticate using SASL (unknown error) 2018-07-09 21:50:58,048 WARN [regionserver/hortonworks1.*****.com/172.**.*.**:16020-SendThread(hortonworks1.*******.com:2181)] zookeeper.ClientCnxn: Session 0x164711f065400b6 for server null, unexpected error, closing socket connection and attempting reconnect

Hey Felix,

Thanks for the reply, I am not using any connector to send records to HBase. And now I am able to send records to HBase I have figured it out. Using spark scala streamer I am able to send JSON records from Kafka to HBase.

Thanks

New Contributor

Hi Sohan and Felix,

I have a similar spark streaming code and I receive an error at the end.Would you please tell me how you solved yours?

park-shell --master yarn-client \

--driver-memory 6g --executor-memory 6g --executor-cores 4 \

--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \

--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \

--jars kafkaspark/spark-streaming-kafka-0-10-assembly_2.11-2.3.0.jar,$hbase_jars \

--files spark_jaas.conf,/keytab,/etc/hbase/conf/hbase-site.xml \

/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.5.0-292-hadoop2.7.3.2.6.5.0-292.jar


import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.sql.{SQLContext,SaveMode}

import spark.implicits._

import org.apache.spark.sql.DataFrame

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql._

import org.apache.spark.sql.Column

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.sql.functions._

import org.apache.hadoop._


case class kafkarec(cskey:String,csvalue:String)

var kafkaParams = Map[String, Object](

"bootstrap.servers" -> "hpserver....",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "dops",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

kafkaParams = kafkaParams + ("security.protocol" -> "SASL_PLAINTEXT")


// val sparkConf = new SparkConf().setAppName("HBaseStream")

// create a StreamingContext, the main entry point for all streaming functionality

val ssc = new StreamingContext(sc, Seconds(20))

val topics = Array("d_topic_name")

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams)

).map{e => kafkarec(e.key():String,e.value():String)}

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.{Put}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}

import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.spark.SparkConf

import org.apache.spark.rdd.PairRDDFunctions

import org.apache.spark.storage.StorageLevel

//import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}


######################### it works up to here no error #######################


stream.foreachRDD(rdd => {

if (!rdd.isEmpty()) {

rdd.foreachPartition(iter => {

val Conf = HBaseConfiguration.create()

// HTable is deprecated val hTable = new HTable(hConf, "dops:qlsvo_kafka")

conf.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

val jobConfig: JobConf = new JobConf(conf, this.getClass)

jobConfig.set("mapreduce.output.fileoutputformat.outputdir", "hdfs:///user/jamin4/qlsvokafka/UNIT_CONCERN")

jobConfig.setOutputFormat(classOf[TableOutputFormat])

jobConfig.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

iter.foreach(record => {

val str1 = record.split("\"qls_unit_id\":")(1).split(",")(0).split(":")(1)

val str2 = record.split("\"plant_id\":")(1).split(",")(1).split(":")(1)

val str3 = record.split("\"unit_collection_pt_timestamp\":")(1).split(",")(2).split(":")(1)

val str4 = record.split("\"unit_collection_pt_timestamp_s\":")(1).split(",")(3).split(":")(1)

val str5 = record.split("\"unit_concern_id\":")(1).split(",")(4).split(":")(1)

val str6 = record.split("\"collection_point_id\":")(1).split(",")(5).split(":")(1)

val str7 = record.split("\"collection_point_section_id\":")(1).split(",")(6).split(":")(1)

val str8 = record.split("\"ii_product_line_group_id\":")(1).split(",")(7).split(":")(1)

val str9 = record.split("\"ii_plg_plant_id\":")(1).split(",")(8).split(":")(1)

val str10 = record.split("\"usage_id\":")(1).split(",")(9).split(":")(1)

val str11 = record.split("\"inspection_item_group_id\":")(1).split(",")(10).split(":")(1).split("}")(0)

val id_con = str1

val id = id_con.toString

val thePut = new Put(Bytes.toBytes(id))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("qls_unit_id"), Bytes.toBytes(str1))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("plant_id"), Bytes.toBytes(str2))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("unit_collection_pt_timestamp"), Bytes.toBytes(str3))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("unit_collection_pt_timestamp_s"), Bytes.toBytes(str4))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("unit_concern_id"), Bytes.toBytes(str5))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("collection_point_id"), Bytes.toBytes(str6))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("collection_point_section_id"), Bytes.toBytes(str7))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("ii_product_line_group_id"), Bytes.toBytes(str8))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("ii_plg_plant_id"), Bytes.toBytes(str9))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("usage_id"), Bytes.toBytes(str10))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("inspection_item_group_id"), Bytes.toBytes(str11))

/* hTable.put(thePut);

*/

})

})

}})


The ERROR:

<console>:75: error: not found: value HBaseConfiguration

val Conf = HBaseConfiguration.create()

<console>:77: error: object set is not a member of package org.apache.hadoop.conf

conf.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

<console>:77: error: not found: value TableOutputFormat

conf.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

<console>:78: error: package org.apache.hadoop.conf is not a value

val jobConfig: JobConf = new JobConf(conf, this.getClass)

<console>:80: error: not found: type TableOutputFormat

jobConfig.setOutputFormat(classOf[TableOutputFormat])

<console>:81: error: not found: value TableOutputFormat

jobConfig.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

<console>:84: error: value split is not a member of kafkarec

val str1 = record.split("\"qls_unit_id\":")(1).split(",")(0).split(":")(1)

<console>:99: error: not found: type Put

val thePut = new Put(Bytes.toBytes(id))

; ;