Created 08-03-2017 07:23 AM
hello,
i am using hortonworks. i want to fetch twitter tweets and dstore in kafka topic. i am doing using java API. i have written the producer code in eclipse neon, i installed the maven.
i am getting the error below:
Exception in thread "main" java.lang.Error: Unresolved compilation problems: The constructor ProducerConfig(Map<?,?>) is not visible The constructor Producer<String,String>(ProducerConfig) is undefined The method send(KeyedMessage<String,String>) in the type Producer<String,String> is not applicable for the arguments (ProducerRecord<String,String>) at com.saurzcode.twitter.TwitterKafkaProducer.run(TwitterKafkaProducer.java:38) at com.saurzcode.twitter.TwitterKafkaProducer.main(TwitterKafkaProducer.java:76)
following is pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.saurzcode.twitter</groupId> <artifactId>twitter-stream</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.twitter</groupId> <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j --> <version>2.2.0</version> <!-- or whatever the latest version is --> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <exclusions> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> </dependencies> </project>
following is my java code:
package com.saurzcode.twitter; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; import com.google.common.collect.Lists; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; public class TwitterKafkaProducer { private static final String topic = "twitter"; public static void run(String consumerKey, String consumerSecret, String token, String secret) throws InterruptedException { Properties properties = new Properties(); properties.put("metadata.broker.list", "xxxxx:6667"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("client.id","abc"); ProducerConfig producerConfig = new ProducerConfig(properties); kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000); StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); // add some track terms endpoint.trackTerms(Lists.newArrayList("#iphone")); Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret); // Authentication auth = new BasicAuth(username, password); // Create a new BasicClient. By default gzip is enabled. Client client = new ClientBuilder().hosts(Constants.STREAM_HOST) .endpoint(endpoint).authentication(auth) .processor(new StringDelimitedProcessor(queue)).build(); // Establish a connection client.connect(); // Do whatever needs to be done with messages for (int msgRead = 0; msgRead < 1000; msgRead++) { ProducerRecord<String, String> message = null; try { message = new ProducerRecord<String, String>(topic, queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(message); } producer.close(); client.stop(); } public static void main(String[] args) { try { TwitterKafkaProducer.run(args[0],args[1],args[2],args[3]); } catch (InterruptedException e) { System.out.println(e); } } }
Created 08-03-2017 08:50 AM
You are getting the error:
The constructor ProducerConfig(Map<?,?>) is not visible The constructor Producer<String,String>(ProducerConfig) is undefined
Please take a look :
https://www.ashishpaliwal.com/blog/2015/06/kafka-cookboook-simple-producer/
https://github.com/rajkrrsingh/KafkaClients/tree/master/src/main/java
.
- You seems to be mixing the two APIs. "org.apache.kafka" and "kafka.javaapi"
kafka.javaapi.producer.Producer and org.apache.kafka.clients.producer.KafkaProducer<br>
.