Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

error while creating kafka producer from java api

Expert Contributor

hello,

i am using hortonworks. i want to fetch twitter tweets and dstore in kafka topic. i am doing using java API. i have written the producer code in eclipse neon, i installed the maven.

i am getting the error below:

Exception in thread "main" java.lang.Error: Unresolved compilation problems: The constructor ProducerConfig(Map<?,?>) is not visible The constructor Producer<String,String>(ProducerConfig) is undefined The method send(KeyedMessage<String,String>) in the type Producer<String,String> is not applicable for the arguments (ProducerRecord<String,String>) at com.saurzcode.twitter.TwitterKafkaProducer.run(TwitterKafkaProducer.java:38) at com.saurzcode.twitter.TwitterKafkaProducer.main(TwitterKafkaProducer.java:76)

following is pom.xml file:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.saurzcode.twitter</groupId>
<artifactId>twitter-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>

</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>

</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
</dependencies>
</project>

following is my java code:

package com.saurzcode.twitter;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


import org.apache.kafka.clients.producer.*;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class TwitterKafkaProducer {
private static final String topic = "twitter";
public static void run(String consumerKey, String consumerSecret,
String token, String secret) throws InterruptedException {
Properties properties = new Properties();
properties.put("metadata.broker.list", "xxxxx:6667");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("client.id","abc");

ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
// add some track terms
endpoint.trackTerms(Lists.newArrayList("#iphone"));
Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
secret);
// Authentication auth = new BasicAuth(username, password);
// Create a new BasicClient. By default gzip is enabled.
Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
.endpoint(endpoint).authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();
// Establish a connection
client.connect();
// Do whatever needs to be done with messages
for (int msgRead = 0; msgRead < 1000; msgRead++) {
ProducerRecord<String, String> message = null;
try {
message = new ProducerRecord<String, String>(topic, queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(message);
}
producer.close();
client.stop();
}
public static void main(String[] args) {
try {
TwitterKafkaProducer.run(args[0],args[1],args[2],args[3]);
} catch (InterruptedException e) {
System.out.println(e);
}
}
}
1 REPLY 1

Super Mentor

@heta desai

You are getting the error:

The constructor ProducerConfig(Map<?,?>) is not visible The constructor Producer<String,String>(ProducerConfig) is undefined

Please take a look :

https://github.com/bkimminich/apache-kafka-book-examples/blob/master/src/test/kafka/SimpleProducer.j...

https://www.ashishpaliwal.com/blog/2015/06/kafka-cookboook-simple-producer/

https://github.com/rajkrrsingh/KafkaClients/tree/master/src/main/java

.

- You seems to be mixing the two APIs. "org.apache.kafka" and "kafka.javaapi"

kafka.javaapi.producer.Producer
and
org.apache.kafka.clients.producer.KafkaProducer<br>

.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.