Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

overwrite hbase default timestamp with table column value

overwrite hbase default timestamp with table column value

New Contributor

Hi I have two data frames in pyspark with json data format and joining them and was able to load data into hbase,but instead of using hbase default timestamp when we insert data , i want to use my own "date" column value(column in my table) while inserting data to hbase.How can i do that .

below is my code

import json
from pyspark import SparkContext
import json
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
import os
import happybase
sc = SparkContext("local", "test",conf=conf)
sqlContext = SQLContext(sc)
df1 = sqlContext.read.format('json').load('/user/***/df1').withColumn("id",col("id")).withColumn("time",col("date"))
df1=df1.groupBy("id","date").agg(first(struct(col("id"),col("name"),col("address"))).alias("table1"))
df2= sqlContext.read.format('json').load('/user/***/df2').withColumn("id",col("id")).withColumn("time",col("date"))
df2=df2.groupBy("id","date").agg(first(struct(col("id"),col("employeeid"),col("designation"))).alias("table2"))
df=df1.join(df2,['id','date']).withColumnRenamed("id","id").coalesce(1).withColumnRenamed("id","id") \
.orderBy(asc("date"))
def pushToHbase(row):
        connection = happybase.Connection('hostname')
        table=connection.table('hbasetable')
        table.put(row.id,{"table1:table1":row.table1})
        connection.close()
out=df.foreach(pushToHbase)
Don't have an account?
Coming from Hortonworks? Activate your account here