Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka to HBase Spark Streaming

Kafka to HBase Spark Streaming

New Contributor

Hello Guys,

I am working on a use case, I have a data source from which I am getting JSON data to kafka topics. And I am using a Scala consumer code running in Spark shell to stream those records from Kafka topics and send them to the HBase. Where the use case I am working entire thing is in the Hortonworks environment. After running the scala consumer code I can see the records printing in spark shell, but not reaching to the HBase and also not throwing any error. Where I have spit the JSON string to the substring and using PUT method I have trying to send the records into each column. Where CUST is the given column family and the CUST_**** are the columns. And also table is created prior in the HBase.

code:

val ssc = new StreamingContext(sc, Seconds(3))

val kafkaParams = Map[String, Object]

( "bootstrap.servers" -> "1**.**.*.**:***7",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "testkafkatohase25",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val topics = Array("*****")

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)

val lines = stream.map(_.value())

lines.print()

lines.foreachRDD{rdd=>

rdd.foreachPartition(iter => {

val hConf = HBaseConfiguration.create()

val hTable = new HTable(hConf, "*****")

iter.foreach(record => {

val str1 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(0).split(":")(1)

val str2 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(1).split(":")(1)

val str3 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(2).split(":")(1)

val str4 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(3).split(":")(1)

val str5 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(4).split(":")(1)

val str6 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(5).split(":")(1)

val str7 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(6).split(":")(1)

val str8 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(7).split(":")(1)

val str9 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(8).split(":")(1)

val str10 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(9).split(":")(1)

val str11 = record.substring(0, record.length - 1).split("\"payload\":")(1).split(",")(10).split(":")(1).split("}")(0)

val id_con = str1

val id = id_con.toString

val thePut = new Put(Bytes.toBytes(id))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_**"), Bytes.toBytes(str1))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str2))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str3))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str4))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str5))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str6))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_A****"), Bytes.toBytes(str7))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str8))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str9))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_****"), Bytes.toBytes(str10))

thePut.add(Bytes.toBytes("CUST"), Bytes.toBytes("CUST_*****"), Bytes.toBytes(str11))

hTable.put(thePut);

})

})

}

ssc.start()

4 REPLIES 4

Re: Kafka to HBase Spark Streaming

@sohan kanamarlapudi

Which connector are you using for hbase? Have you consider shc?

https://github.com/hortonworks-spark/shc

I recently did something similar, stream data from kafka to hbase (but I used python instead) - Here is the github link if you like to review:

https://github.com/felixalbani/future-of-data-santiago-e1-spark-nifi

On the above ^ I used shc which is very easy to use and worked just fine for me.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Re: Kafka to HBase Spark Streaming

New Contributor

Hello Guys,

And these are the logs which I have found in Hbase:

java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125) 2018-07-09 21:50:58,047 INFO [regionserver/hortonworks1.*****.com/172.**.*.**:16020-SendThread(hortonworks1.******.com:2181)] zookeeper.ClientCnxn: Opening socket connection to server hortonworks1.******.com/172.**.*.**:2181. Will not attempt to authenticate using SASL (unknown error) 2018-07-09 21:50:58,048 WARN [regionserver/hortonworks1.*****.com/172.**.*.**:16020-SendThread(hortonworks1.*******.com:2181)] zookeeper.ClientCnxn: Session 0x164711f065400b6 for server null, unexpected error, closing socket connection and attempting reconnect

Re: Kafka to HBase Spark Streaming

New Contributor

Hey Felix,

Thanks for the reply, I am not using any connector to send records to HBase. And now I am able to send records to HBase I have figured it out. Using spark scala streamer I am able to send JSON records from Kafka to HBase.

Thanks

Re: Kafka to HBase Spark Streaming

New Contributor

Hi Sohan and Felix,

I have a similar spark streaming code and I receive an error at the end.Would you please tell me how you solved yours?

park-shell --master yarn-client \

--driver-memory 6g --executor-memory 6g --executor-cores 4 \

--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \

--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=spark_jaas.conf" \

--jars kafkaspark/spark-streaming-kafka-0-10-assembly_2.11-2.3.0.jar,$hbase_jars \

--files spark_jaas.conf,/keytab,/etc/hbase/conf/hbase-site.xml \

/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.5.0-292-hadoop2.7.3.2.6.5.0-292.jar


import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.sql.{SQLContext,SaveMode}

import spark.implicits._

import org.apache.spark.sql.DataFrame

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql._

import org.apache.spark.sql.Column

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.sql.functions._

import org.apache.hadoop._


case class kafkarec(cskey:String,csvalue:String)

var kafkaParams = Map[String, Object](

"bootstrap.servers" -> "hpserver....",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "dops",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

kafkaParams = kafkaParams + ("security.protocol" -> "SASL_PLAINTEXT")


// val sparkConf = new SparkConf().setAppName("HBaseStream")

// create a StreamingContext, the main entry point for all streaming functionality

val ssc = new StreamingContext(sc, Seconds(20))

val topics = Array("d_topic_name")

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams)

).map{e => kafkarec(e.key():String,e.value():String)}

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.{Put}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}

import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.spark.SparkConf

import org.apache.spark.rdd.PairRDDFunctions

import org.apache.spark.storage.StorageLevel

//import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}


######################### it works up to here no error #######################


stream.foreachRDD(rdd => {

if (!rdd.isEmpty()) {

rdd.foreachPartition(iter => {

val Conf = HBaseConfiguration.create()

// HTable is deprecated val hTable = new HTable(hConf, "dops:qlsvo_kafka")

conf.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

val jobConfig: JobConf = new JobConf(conf, this.getClass)

jobConfig.set("mapreduce.output.fileoutputformat.outputdir", "hdfs:///user/jamin4/qlsvokafka/UNIT_CONCERN")

jobConfig.setOutputFormat(classOf[TableOutputFormat])

jobConfig.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

iter.foreach(record => {

val str1 = record.split("\"qls_unit_id\":")(1).split(",")(0).split(":")(1)

val str2 = record.split("\"plant_id\":")(1).split(",")(1).split(":")(1)

val str3 = record.split("\"unit_collection_pt_timestamp\":")(1).split(",")(2).split(":")(1)

val str4 = record.split("\"unit_collection_pt_timestamp_s\":")(1).split(",")(3).split(":")(1)

val str5 = record.split("\"unit_concern_id\":")(1).split(",")(4).split(":")(1)

val str6 = record.split("\"collection_point_id\":")(1).split(",")(5).split(":")(1)

val str7 = record.split("\"collection_point_section_id\":")(1).split(",")(6).split(":")(1)

val str8 = record.split("\"ii_product_line_group_id\":")(1).split(",")(7).split(":")(1)

val str9 = record.split("\"ii_plg_plant_id\":")(1).split(",")(8).split(":")(1)

val str10 = record.split("\"usage_id\":")(1).split(",")(9).split(":")(1)

val str11 = record.split("\"inspection_item_group_id\":")(1).split(",")(10).split(":")(1).split("}")(0)

val id_con = str1

val id = id_con.toString

val thePut = new Put(Bytes.toBytes(id))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("qls_unit_id"), Bytes.toBytes(str1))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("plant_id"), Bytes.toBytes(str2))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("unit_collection_pt_timestamp"), Bytes.toBytes(str3))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("unit_collection_pt_timestamp_s"), Bytes.toBytes(str4))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("unit_concern_id"), Bytes.toBytes(str5))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("collection_point_id"), Bytes.toBytes(str6))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("collection_point_section_id"), Bytes.toBytes(str7))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("ii_product_line_group_id"), Bytes.toBytes(str8))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("ii_plg_plant_id"), Bytes.toBytes(str9))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("usage_id"), Bytes.toBytes(str10))

thePut.add(Bytes.toBytes("UNIT_CONCERN"), Bytes.toBytes("inspection_item_group_id"), Bytes.toBytes(str11))

/* hTable.put(thePut);

*/

})

})

}})


The ERROR:

<console>:75: error: not found: value HBaseConfiguration

val Conf = HBaseConfiguration.create()

<console>:77: error: object set is not a member of package org.apache.hadoop.conf

conf.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

<console>:77: error: not found: value TableOutputFormat

conf.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

<console>:78: error: package org.apache.hadoop.conf is not a value

val jobConfig: JobConf = new JobConf(conf, this.getClass)

<console>:80: error: not found: type TableOutputFormat

jobConfig.setOutputFormat(classOf[TableOutputFormat])

<console>:81: error: not found: value TableOutputFormat

jobConfig.set(TableOutputFormat.OUTPUT_TABLE, "dops:qlsvo_kafka")

<console>:84: error: value split is not a member of kafkarec

val str1 = record.split("\"qls_unit_id\":")(1).split(",")(0).split(":")(1)

<console>:99: error: not found: type Put

val thePut = new Put(Bytes.toBytes(id))