import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.Path import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.hadoop.security.UserGroupInformation import org.apache.kudu.ColumnSchema import org.apache.kudu.client.{KuduClient, Operation, SessionConfiguration} import java.io.{FileNotFoundException, IOException} import java.security.PrivilegedExceptionAction import java.text.SimpleDateFormat import java.util import java.util.{Date, Properties, UUID} import scala.collection.JavaConversions._ //对应方舟用户行为日志到大数据集群 object UBLKafka2KUDU { def main(args: Array[String]): Unit = { //获取配置文件中相关配置信息 val parameters = ParameterTool.fromArgs(args) val path = parameters.get("path", null) // val path = "E:\\Workspace\\git\\real-time-flink\\real_time_flink\\conf\\config_fzubl.properties" var configParameterTool: ParameterTool = null try { configParameterTool = ParameterTool.fromPropertiesFile(path) } catch { case e: IOException => if (e.isInstanceOf[FileNotFoundException]) { System.out.println("error: configFilePath:" + path + " doesn't exist.") return } } //获取配置文件中kerberos认证相关配置 val hadoop_krb5_conf = configParameterTool.get("hadoop.krb5.conf") // val kafka_jaas = configParameterTool.get("kafka.jaas") //kdc认证 System.setProperty("java.security.krb5.conf", hadoop_krb5_conf) // System.setProperty("java.security.auth.login.config", kafka_jaas) //获取配置文件中kafka相关配置 val kafka_host = configParameterTool.get("kafka.host") val group_id = configParameterTool.get("kafka.group.id") val topic = configParameterTool.get("kafka.topic") val topic_list: util.List[String] = topic.split(",").toList val enable_auto_commit = configParameterTool.get("kafka.enable.auto.commit") val auto_commit_interval_ms = configParameterTool.get("kafka.auto.commit.interval.ms") val zookeeper_connect = configParameterTool.get("zookeeper.connect") val session_timeout_ms = configParameterTool.get("kafka.session.timeout.ms") val auto_offset_reset = configParameterTool.get("kafka.auto.offset.reset") val key_deserializer = configParameterTool.get("kafka.key.deserializer") val value_deserializer = configParameterTool.get("kafka.value.deserializer") val security_protocol = configParameterTool.get("kafka.security.protocol") val ck_path = configParameterTool.get("ck.path") //kdc认证 System.setProperty("java.security.krb5.conf", hadoop_krb5_conf) System.setProperty("java.security.krb5.conf", hadoop_krb5_conf) // kafka consumer配置构建 val props = new Properties props.setProperty("bootstrap.servers", kafka_host) props.setProperty("key.deserializer", key_deserializer) props.setProperty("value.deserializer", value_deserializer) props.setProperty("enable.auto.commit", enable_auto_commit) // kafka 0.8 props.setProperty("auto.commit.enable", enable_auto_commit) props.setProperty("auto.commit.interval.ms", auto_commit_interval_ms) props.setProperty("session.timeout.ms", session_timeout_ms) props.setProperty("zookeeper.connect", zookeeper_connect) props.setProperty("auto.offset.reset", auto_offset_reset) props.setProperty("group.id", group_id) props.setProperty("security.protocol", security_protocol) //创建flink上下文环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //时间语义是以数据自带的时间戳字段为准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // checkpoint相关配置 // 默认checkpoint功能是disabled的,想要使用的时候需要先启用;每隔1000ms进行启动一个检查点【设置checkpoint的周期】 env.enableCheckpointing(1000) // 设置模式为exactly-once (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确保检查点之间有至少500ms的间隔【checkpoint最小间隔】 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 env.getCheckpointConfig.setCheckpointTimeout(60000) // 同一时间只允许进行一个检查点 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置statebackend env.setStateBackend(new FsStateBackend(new Path(ck_path))) // 构建FlinkKafkaConsumer 源端kafka版本为0.8 val myConsumer = new FlinkKafkaConsumer08[String](topic_list, new SimpleStringSchema, props) myConsumer.setCommitOffsetsOnCheckpoints(true) val ds: DataStreamSource[String] = env.addSource(myConsumer) ds.addSink(new KuduSink(configParameterTool)) env.execute("UBKafka2KuduFlink") } // case class Event(xWhen: String, xContext: JSONObject, appId: String, xWho: String, xWhat: String) class KuduSink extends RichSinkFunction[String] { var client: KuduClient = null var sdf: SimpleDateFormat = null var tableName: String = null var databaseName: String = null var configParameterTool: ParameterTool = null def this(prop: ParameterTool) { this() this.configParameterTool = prop } /** * open方法在sink第一次启动时调用,一般用于sink的初始化操作 */ @throws[Exception] @Override override def open(parameters: Configuration): Unit = { super.open(parameters) try this.client = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() { override def run() = { println("invoke run") val kuduHost = configParameterTool.get("kudu.host") new KuduClient.KuduClientBuilder(kuduHost).build } }) catch { case e => e.printStackTrace() } this.databaseName = configParameterTool.get("fangzhouubl.database") this.tableName = configParameterTool.get("fangzhouubl.table") } /** * invoke方法是sink数据处理逻辑的方法,source端传来的数据都在invoke方法中进行处理 * 其中invoke方法中第一个参数类型与RichSinkFunction中的泛型对应。第二个参数 * 为一些上下文信息 */ @Override override def invoke(value: String, context: SinkFunction.Context[_]): Unit = { for (item <- JSON.parseArray(value)) { val value = item.asInstanceOf[JSONObject] optKudu(client, databaseName, tableName, value) } } /** * @param opType 操作类型 * @param tableName 表名 * @param primaryKey 表主键 * @param data 数据 * @return * @throws KuduException */ @throws[Exception] private def optKudu(client: KuduClient, databaseName: String, tableName: String, data: JSONObject): Unit = { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS") val kuduSession = client.newSession kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH) //拼接kudu所需表名 val formatTableName = "impala::" + databaseName + tableName if (!client.tableExists(formatTableName)) { System.out.println("表:" + formatTableName + "不存在") return } //初始化kudu表操作类 val kuduTable = client.openTable(formatTableName) //kafka日志数据,只考虑insert val opt: Operation = kuduTable.newInsert val row = opt.getRow // id,xwhen,xcontext,appid,xwho,xwhat val columns: util.List[ColumnSchema] = kuduTable.getSchema.getColumns() for (item <- columns) { val colName: String = item.getName val colValue: String = data.getString(colName.toLowerCase()) // kudu里的id列为主键,随机串; opts为当前时间; 这2个数据不来自于 kafka数据 if (!colName.equals("id") && !colName.equals("opts") && colValue != null) { row.addString(colName, colValue) } } row.addString("opts", sdf.format(new Date)) val uuid = UUID.randomUUID().toString.replace("-", "") //使用随机串做主键 row.addString("id", uuid) kuduSession.apply(opt) kuduSession.flush kuduSession.close } } } #kafka kafka.host=ark1:9092 zookeeper.connect=ark1:2181 kafka.group.id=htbd-flink kafka.enable.auto.commit=true kafka.auto.commit.interval.ms=100 kafka.session.timeout.ms=30000 kafka.auto.offset.reset=latest kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.topic=profile_jstestproject,event_jstestproject kafka.security.protocol=PLAINTEXT #LDAP+sentry hadoop.krb5.conf=/etc/krb5.conf hadoop.krb5.loginUserFromKeytab.user=htbd/htbdtfcx@HTCDH.COM hadoop.krb5.loginUserFromKeytab.path=/etc/security/krb5/htbd.keytab #kafka.jaas=/etc/security/krb5/htbd_kafka_jass.conf ck.path=hdfs://nameservice1/user/htbd/flink/fangzhouubl