Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

trying to stream the data from file to kafka producer and then to topic ?

trying to stream the data from file to kafka producer and then to topic ?

Explorer

I have been trying to stream the data from local file to producer and then producer publish the messages to topic using below code

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.Properties;

import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaFileProducer extends Thread {

private static final String topicName = "COMPLETED_CLAIM";

public static final String fileName = "/home/i373591/sample/Processed_subject101.dat";

private final KafkaProducer<String, String> producer; private final Boolean isAsync;

public KafkaFileProducer(String topic, Boolean isAsync) {

Properties props = new Properties(); props.put("bootstrap.servers", "host:44445");

props.put("client.id", "DemoProducer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("CommonClientConfigs.SECURITY_PROTOCOL_CONFIG", "SSL");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(props);

this.isAsync = isAsync;

}

public void sendMessage(String key, String value) {

long startTime = System.currentTimeMillis();

if (isAsync) {

// Send asynchronously producer.send( new ProducerRecord<String, String>(topicName, key), (Callback) new DemoCallBack(startTime, key, value));

} else {

// Send synchronously try { producer.send( new ProducerRecord<String, String>(topicName, key, value)) .get(); System.out.println("Sent message: (" + key + ", " + value + ")"); }

catch (InterruptedException e)

{ e.printStackTrace(); }

catch (ExecutionException e)

{ e.printStackTrace(); } } }

public static void main(String [] args){

KafkaFileProducer producer = new KafkaFileProducer(topicName, false);

int lineCount = 0;

FileInputStream fis;

BufferedReader br = null;

try { fis = new FileInputStream(fileName); //Construct BufferedReader from InputStreamReader br = new BufferedReader(new InputStreamReader(fis));

String line = null;

while ((line = br.readLine()) != null) {

lineCount++; producer.sendMessage(lineCount+"", line);

} } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }

finally{ try { br.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class DemoCallBack implements Callback { private long startTime; private String key; private String message; public DemoCallBack(long startTime, String key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * A callback method the user can implement to provide asynchronous handling * of request completion. This method will be called when the record sent to * the server has been acknowledged. Exactly one of the arguments will be * non-null. * * @param metadata * The metadata for the record that was sent (i.e. the partition * and offset). Null if an error occurred. * @param exception * The exception thrown during processing of this record. Null if * no error occurred. */ public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println("message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }

getting below error?

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at KafkaFileProducer.sendMessage(KafkaFileProducer.java:45) at KafkaFileProducer.main(KafkaFileProducer.java:70)

Could someone help me out on this ?

Don't have an account?
Coming from Hortonworks? Activate your account here