Member since
10-27-2016
14
Posts
6
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2240 | 02-22-2017 10:44 AM | |
15981 | 02-07-2017 02:38 PM | |
2546 | 01-19-2017 10:29 AM |
02-22-2017
10:44 AM
1 Kudo
It could be this one: https://issues.apache.org/jira/browse/SPARK-19541
... View more
02-16-2017
09:48 AM
With spark 2: Generate test files: echo "1,2,3" > /tmp/test.csv
echo "1|2|3" > /tmp/test.psv Read csv: scala> val t = spark.read.csv("/tmp/test.csv")
t: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]
scala> t.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| 1| 2| 3|
+---+---+---+
Read psv: scala> val p = spark.read.option("delimiter","|").csv("/tmp/test.psv")
p: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]
scala> p.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| 1| 2| 3|
+---+---+---+
You can also read from "/tmp/test*.csv" But it will read multiple files to the same dataset. For older versions of spark you can use: https://github.com/databricks/spark-csv
... View more
02-07-2017
02:38 PM
2 Kudos
I think your solution is ok. Probably you have problem with the input data (Try to call df.show() and df.printSchema() with the original df before the transformation).
I tried out your code with this class: package com.hortonworks.spark.example;
import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
public class SparkStarter {
public static void main(String[] args) {
new SparkStarter().run();
}
private void run() {
SparkConf conf = new SparkConf().setAppName("test").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Record> data = Arrays.asList(new Record(1, null, "b"),new Record(1, "a", "b"), new Record(0, "b", "c"));
JavaRDD<Record> distData = sc.parallelize(data);
SQLContext sql = new SQLContext(sc);
Dataset<Row> df = sql.createDataFrame(distData, Record.class);
df.withColumn("id",
((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
concat(df.col("device"), lit("#"),
df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
}
public static class Record implements Serializable {
int deviceFlag;
String device;
String domain;
public int getDeviceFlag() {
return deviceFlag;
}
public void setDeviceFlag(int deviceFlag) {
this.deviceFlag = deviceFlag;
}
public String getDevice() {
return device;
}
public void setDevice(String device) {
this.device = device;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public Record(int deviceFlag, String device, String domain) {
this.deviceFlag = deviceFlag;
this.device = device;
this.domain = domain;
}
}
}
And it worked well: +------+----------+------+----+
|device|deviceFlag|domain| id|
+------+----------+------+----+
| null| 1| b|null|
| a| 1| b| a#b|
| b| 0| c| b|
+------+----------+------+----+ What was your problem exactly?
... View more
01-19-2017
10:29 AM
1 Kudo
It depends on your configuration, but you changed the log configuration of the containers (separated worker processes) not the log configuration of the nodemanger process itself.
Check the YARN_LOG_DIR variable in the yarn-env.sh for the location of the nodemanager's log.
... View more
01-17-2017
09:03 AM
Not clear what you are doing. Please define your problem with more details:
What is your environment? How did you start the containers? What would you like to achieve? What have you tried so far?
To copy any file from the host machine to a docker container you can use docker cp command or use docker volumes.
If you would like to copy from the host machine to hdfs the easiest way to mount a volume to the docker container and use hdfs dfs -copyFromLocal from inside the containers as the hdfs configuration most probably already there...
... View more
01-06-2017
05:04 PM
This connection error usually means that the interpreter has been failed for some reason. 1. First of all check the log of the interpreter in the logs directory. 2. As you use yarn-client I guess spark has not been configured properly to use spark. Check if you have the right yarn-site.xml and core-site.xml in your $SPARK_CONF_DIR. You should also check if SPARK_HOME and SPARK_CONF_DIR set in your zeppelin-env.sh 3. Usually the spark-submit parameters are visible from the interpreter log, you can also check the log and try to submit an example application from the command line with the same parameters. 4. Sometime the spark-submit works well but the yarn application master is failed for some reason, so you can also check if you have any application on your spark web ui.
... View more
12-22-2016
11:19 AM
Use df.map(row => {
val row1 = row.getAs[String]("vbeln")
val make = if (row1.toLowerCase == "125") "S" else "its 123"
(row(0),make,row(0)) }).collect().foreach(println)
Instead of: df.map(row => {
val row1 = row.getAs[String]("vbeln")
val make = if (row1.toLowerCase == "125") "S" else "its 123"
Row(row(0),make,row(0)) }).collect().foreach(println) You can map each line to a new one, but you don't need to return with a new Row. You should map each record to a new tuple (in case of simple types) or a new case class instance.
... View more
12-21-2016
04:46 PM
1 Kudo
Unfortunately the error itself is not included in the question. but most probably you don't need to map to Row type. Map the Row to a case class or a tuple.
Tested with Spark 2.0.2: val spark = session.sparkContext
import session.sqlContext.implicits._
val df: DataFrame = session.sqlContext.read.format("csv").load("file:///tmp/ls.txt")
df.printSchema()
df.show()
val df2 = df.map(r => (r.getString(0),r.getString(0).length))
df2.printSchema() From the output: root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
+---+---+
|_c0|_c1|
+---+---+
| a| 1|
| b| 2|
| c| 3|
+---+---+
root
|-- _1: string (nullable = true)
|-- _2: integer (nullable = true)
... View more
10-27-2016
08:13 AM
Do you mean a standalone spark cluster or yarn cluster?
... View more