package org.example; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.spark.HBaseContext; import org.apache.hadoop.hbase.spark.JavaHBaseContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.sql.Timestamp; import java.util.*; import static org.apache.commons.lang.math.RandomUtils.nextInt; @Slf4j public class Main { private static final String tableNameStr = "census2"; private static TableName tableName = TableName.valueOf(tableNameStr); private static final byte[] PERSONAL_CF = Bytes.toBytes("personal"); private static final byte[] PROFESSIONAL_CF = Bytes.toBytes("professional"); private static final byte[] NAME_COLUMN = Bytes.toBytes("name"); private static final byte[] FIELD_COLUMN = Bytes.toBytes("field"); private static JavaSparkContext JSC; private static JavaHBaseContext HBASE_CONTEXT; private static Configuration conf; private static Connection connection; private static final Logger LOG = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws IOException, InterruptedException { System.out.println("You are in main !2"); log.info("Test logging with Slf4j !"); SparkSession spark = SparkSession.builder() .appName("test") .master("yarn") .config("hbase.client.retries.number", "2") // .config("spark.jars", "hbase-spark-1.0.0.jar:hbase-protocol-2.1.0.jar") // I get null pointer exception if I uncomment that .getOrCreate(); log.info("keytabPath : " + System.getProperty("keytabPath")); log.info("principal : " + System.getProperty("principal")); initConnection(); new HBaseContext(spark.sparkContext(), conf, "/tmp/"); deleteTable(); createTable(); populateTable(); String hbaseCatalog = "{\r\n" + "\"table\":{\"name\":\"census2\"},\r\n" + "\"rowkey\":\"id1\",\r\n" + "\"columns\":{\r\n" + "\"ID1\":{\"cf\":\"rowkey\", \"col\":\"id1\", \"type\":\"string\"},\r\n" + "\"NAME\":{\"cf\":\"personal\", \"col\":\"name\", \"type\":\"string\"},\r\n" + "\"FIELD\":{\"cf\":\"personal\", \"col\":\"field\", \"type\":\"string\"},\r\n" + "\"PROFESSIONAL\":{\"cf\":\"professional\", \"col\":\"emailCount\", \"type\":\"string\"}\r\n" + "}\r\n" + "}"; SQLContext sqlContext = new SQLContext(spark); // import org.apache.hadoop.fs.Path // val conf = HBaseConfiguration.create() // conf.addResource(new Path("/path/to/hbase-site.xml")) // new HBaseContext(sc, conf) // "sc" is the SparkContext you created earlier. System.out.println("printing table contents"); withCatalog(hbaseCatalog, sqlContext); } private static Dataset withCatalog(String catalog, SQLContext sqlContext) { Map map = new HashMap<>(); map.put(HBaseTableCatalog.tableCatalog(), catalog); Dataset df = sqlContext.read().options(map) .format("org.apache.hadoop.hbase.spark"). option("hbase.spark.use.hbasecontext", false).load(); System.out.println("Count df : " + df.count()); df.show(100, false); df.createOrReplaceTempView("census2"); Dataset sqlDF1 = sqlContext.sql("SELECT * FROM census2"); sqlDF1.show(100, false); Dataset sqlDF2 = sqlContext.sql("SELECT * FROM census2 WHERE ID1 LIKE '____|001_|%'"); sqlDF2.show(100, false); return df; } private static void createTable() throws IOException { Connection connection = getConnection(); Admin admin = connection.getAdmin(); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableNameStr)) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(PERSONAL_CF).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(PROFESSIONAL_CF).build()) .build(); if (!admin.tableExists(tableDescriptor.getTableName())) { System.out.println("Creating the census2 table. "); admin.createTable(tableDescriptor); System.out.println("Done."); } else { System.out.println("Table already exists."); } } private static void deleteTable() throws IOException { Connection connection = getConnection(); Admin admin = connection.getAdmin(); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName).build(); if (admin.tableExists(tableDescriptor.getTableName())) { System.out.println("Table exists, Deleting..."); admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println("Done."); } else { System.out.println("Table already exists."); } } private static void populateTable() throws IOException { Connection connection = getConnection(); int numberOfPersons = 200; int nbContactBound = 10; String[] sourcePers = {"REPER", "WEB"}; String[] typeContacts = {"CRCO", "AGENDA", "SCRA"}; String[] sourceContacts = {"appli_contacts", "appli_tel"}; long offset = Timestamp.valueOf("2010-01-01 00:00:00").getTime(); long end = Timestamp.valueOf("2022-01-11 00:00:00").getTime(); long diff = end - offset + 1; Random random = new Random(); Admin admin = connection.getAdmin(); Table table = connection.getTable(tableName); if (admin.tableExists(tableName)) { List putList = new ArrayList<>(); for (int k = 0; k <= numberOfPersons; k++) { String source_personne = sourcePers[random.nextInt(2)]; int nbcontacts = random.nextInt(nbContactBound - 1) + 1; for (int j = 0; j <= nbcontacts; j++) { String uniqueIDcontact = UUID.randomUUID().toString(); String typeContact = typeContacts[random.nextInt(3)]; String sourceContact = sourceContacts[random.nextInt(2)]; Timestamp randTp = new Timestamp(offset + (long) (Math.random() * diff)); String rowkey = source_personne + "|" + String.format("%04d", k) + "|" + sourceContact + "|" + randTp + "|" + uniqueIDcontact + "|" + typeContact; Put put1 = new Put(Bytes.toBytes(rowkey)); put1.addColumn(PERSONAL_CF, NAME_COLUMN, Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10))); put1.addColumn(PERSONAL_CF, FIELD_COLUMN, Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10))); putList.add(put1); } table.put(putList); } } else { System.out.println("Table already exists."); } } public List transformHbaseCatalogToHM(String hbaseCatalog) { // hbaseCatalog : "AA:rowkey.key,AB:d.AB,AC:d.AC,AD:d.AD,AE:d.AE,AF:d.AF,AG:d.AG,AH:d.AH,AI:d.AI,AJ:d.AJ,AK:d.AK,AL:d.AL,AM:d.AM,AN:d.AN,AO:d.AO,AP:d.AP,AQ:d.AQ,AR:d.AR,AS:d.AS,AT:d.AT,AU:d.AU,AV:d.AV,AW:d.AW,AX:d.AX,AY:d.AY,AZ:d.AZ,BA:d.BA,BB:d.BB,BC:d.BC,BD:d.BD,BE:d.BE,BF:d.BF,BG:d.BG,BH:d.BH,BI:d.BI,BQ:d.BQ,DA:d.DA" List list = new ArrayList(); String[] params = hbaseCatalog.split(","); for (String param : params) { String[] mapping = param.split(":"); list.add(new HbaseMapping(mapping[0], mapping[1].split("\\.")[0], mapping[1].split("\\.")[1])); } return list; } private static Connection getConnection() throws IOException { if (connection == null || connection.isAborted() || connection.isClosed()) { try { initConnection(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return connection; } private static void initConnection() throws InterruptedException, UnknownHostException { conf = HBaseConfiguration.create(); String[] confs = new String[]{ "/etc/hbase/conf/core-site.xml", "/etc/hbase/conf/hdfs-site.xml", "/etc/hbase/conf/hdfs-site.xml", "/etc/hbase/conf/ssl-client.xml"}; for (String confPath : confs) { conf.addResource(new Path(confPath.trim())); } conf.reloadConfiguration(); UserGroupInformation.setConfiguration(conf); Random rd = new Random(); int random_int = rd.nextInt(1000000); String hostname = InetAddress.getLocalHost().getHostName(); // validation du ticket kerberos try { log.info("Connection à kerberos en cours ! principal : " + System.getProperty("principal") + " , keytabPath : " + System.getProperty("keytabPath") + " random int : " + random_int + " hostname : " + hostname); UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(System.getProperty("principal"), System.getProperty("keytabPath")); log.info("Connection à kerberos en cours AFTER ugi ! random int : " + random_int + " hostname : " + hostname); UserGroupInformation.setLoginUser(ugi); ugi.doAs((PrivilegedExceptionAction) () -> { connection = ConnectionFactory.createConnection(conf); log.info("Connection à kerberos terminée after connection ! random int : " + random_int + " hostname : " + hostname); return null; }); } catch (IOException | InterruptedException e1) { // TODO Auto-generated catch block System.out.println("Impossible de se connecter via kerberos !!!" + e1); } } }