Member since
02-16-2016
45
Posts
24
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
6416 | 07-28-2016 03:37 PM | |
9273 | 02-20-2016 11:34 PM |
03-18-2016
05:11 PM
2 Kudos
No it is not the old files. I run my program with spark submit command. and my path variable is the path to my hdfs directory. Also, yes the content is changing.
... View more
03-17-2016
09:22 PM
2 Kudos
One more thing, it works fine but still it saves the part-0000 file(last or first RDD) as well. Is there anyway I can get rid of that?
... View more
03-17-2016
09:15 PM
2 Kudos
Thank you so much! you saved me a lot of time.
... View more
03-17-2016
05:18 PM
2 Kudos
I am writing a simple consumer program using spark streaming. My code save some of the data in to the file but not ALL of the data. Can anyone help me how to fix this. I am not sure where I am losing the data. import org.apache.spark.streaming.kafka.*;
import kafka.serializer.StringDecoder;
import java.io.*;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class ConsumerFile {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
String topic = args[0];
final String path=new String(args[2]);
String broker = args[1];
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", broker);
JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);
JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();}});
words.foreachRDD(
new Function2<JavaRDD<String>, Time, Void>() {
public Void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
public JavaRow call(String line) throws Exception{
String[] fields = line.split(",");
JavaRow record = new JavaRow(fields[0], fields[1],fields[2], fields[3], f ields[4], fields[5],Long.parseLong(fields[6].trim()), fields[7],fields[8],fields[9],
Long.parseLong(fields[10].trim()),Long.parseLong(fields[11].trim()),Long.parseLong(fields[12].trim()),Long.parseLong(fields[13].trim()),fields[14],fields[15],fields[16],fields[17],Long.parseLong(fields[18].trim()),fields[19],Long.parseLong(fields[20].trim()),Long.parseLong(fields[21].trim()) );
return record;
}
});
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
wordsDataFrame.registerTempTable("Data");
DataFrame wDataFrame = sqlContext.sql(" select * from Data");
if(!wDataFrame.rdd().isEmpty()){
wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
return null;
}} );
ssc.start();
ssc.awaitTermination();}
}
... View more
Labels:
- Labels:
-
Apache Spark
02-20-2016
11:34 PM
1 Kudo
I solved that error by adding this dependency to my project. <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
... View more
02-19-2016
07:50 PM
2 Kudos
\*I am using java to develop a simple spark streaming program. I want to read messages from kafka. I have a topic "test" in my kafka. I can read the messages from console command. However I can not read it with my program. I do not get any error either. Here is my code .*/
import org.apache.spark.streaming.kafka.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class SparkTest {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("test", new Integer(3));
SparkConf conf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com", "default",map);
JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();
} } );
data.print();
ssc.start();
ssc.awaitTermination();
}
}
Here is what I get when I run the code:
-------------------------------------------
Time: 1455910293000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910296000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910299000 ms
-------------------------------------------
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
02-17-2016
06:54 PM
1 Kudo
HDP version is the latest one 2.3.2. I use that too but, when I try to run a simple consumer program in java, it does not work. I use kaka.consumer package in my code. My producer code works fine now I want to get those data and do some analysis on them in my consumer code.
... View more
02-17-2016
06:33 PM
1 Kudo
Thank you for your response. I realized that I cannot even run a simple kafka consumer program. Even with out using spark. I still get this error java.lang.NoClassDefFoundError: scala/reflect/ClassManifest. Do you have any idea why? I checked all of my version are correct.
... View more
02-16-2016
07:02 PM
1 Kudo
@Neeraj Sabharwalthanks that is a great tutorial but, that example is in scala as well.
... View more
- « Previous
- Next »