package kafka; import static org.apache.spark.streaming.kafka.KafkaUtils.createStream; import static utils.SparkUtilities.addShutdownHook; import java.util.HashMap; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; import kafka.serializer.StringDecoder; public class KafkaKerberosReader { // Spark information private static SparkConf conf; private static final String appName = "KafkaKerberosReader"; private static JavaStreamingContext context; private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName()); // Kafka information private static final String zkQuorum = "host1:2181,host2:2181,host3:2181"; private static final String kfkQuorum = "host1:6667,host2:6667,host3:6667"; private static final String group = "test_spark_kerberos"; private static final Integer threads = 1; private static Map kafkaParams = new HashMap(); public static void main(String[] args) { // Check that all required args were passed in. if (args.length != 1) { System.err.println("Usage: KafkaKerberosReader \n"); System.exit(1); } // Configure the application configureSpark(); // Create the context context = createContext(args[0]); // Stop the application context.start(); context.awaitTermination(); } /** * ----------------------------------------------- * | This is the kernel of the spark application | * ----------------------------------------------- * */ private static JavaStreamingContext createContext(String topic) { logger.info("-------------------------------------------------------"); logger.info("| Starting: {} |", appName); logger.info("-------------------------------------------------------"); // Create the spark streaming context context = new JavaStreamingContext(conf, Seconds.apply(5)); // Read from a Kerberized Kafka JavaPairReceiverInputDStream kafkaStream = createStream(context, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER()); kafkaStream.map(message -> message._2.toLowerCase()).print(); logger.info("-------------------------------------------------------"); logger.info("| Finished: {} |", appName); logger.info("-------------------------------------------------------"); return context; } /** * Create a SparkConf and configure it. * */ private static void configureSpark() { logger.info(">- Initializing '%s'.", appName); conf = new SparkConf().setAppName(appName); kafkaParams.put("group.id", group); kafkaParams.put("auto.offset.reset", "smallest"); kafkaParams.put("security.protocol", "PLAINTEXTSASL"); kafkaParams.put("bootstrap.servers", kfkQuorum); kafkaParams.put("zookeeper.connect", zkQuorum); logger.info(">- Configuration done with the follow properties:"); logger.info(conf.toDebugString()); } }