Created 02-06-2017 02:17 AM
I have been trying to make the following Dataframe query work but its not giving me the results. Can someone please help?
Is there a reference guide to see all the syntax to create dataframe queries? Here this is what I want - My dataframe df has many cols among which 4 are -
For all rows, if deviceFlag is set to 1, then form a string "device#domain" and store that value in a new col "id". Else, just take the value in the "device" col and store it in the new "id" col without any transformation. So I am creating a new column with the .withColumn and setting value for each row depending upon the corresponding values of "device" and "deviceFlag" cols in that row.
DataFrame result= df.withColumn("id",
    ((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
        concat(df.col("device"), lit("#"),
            df.col("domain")))).otherwise(df.col("device")).alias("id"));Created 02-07-2017 02:38 PM
I think your solution is ok. Probably you have problem with the input data (Try to call df.show() and df.printSchema() with the original df before the transformation). I tried out your code with this class:
package com.hortonworks.spark.example;
import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
public class SparkStarter {
  public static void main(String[] args) {
    new SparkStarter().run();
  }
  private void run() {
    SparkConf conf = new SparkConf().setAppName("test").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<Record> data = Arrays.asList(new Record(1, null, "b"),new Record(1, "a", "b"), new Record(0, "b", "c"));
    JavaRDD<Record> distData = sc.parallelize(data);
    SQLContext sql = new SQLContext(sc);
    Dataset<Row> df = sql.createDataFrame(distData, Record.class);
    df.withColumn("id",
        ((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
            concat(df.col("device"), lit("#"),
                df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
  }
  public static class Record implements Serializable {
    int deviceFlag;
    String device;
    String domain;
    public int getDeviceFlag() {
      return deviceFlag;
    }
    public void setDeviceFlag(int deviceFlag) {
      this.deviceFlag = deviceFlag;
    }
    public String getDevice() {
      return device;
    }
    public void setDevice(String device) {
      this.device = device;
    }
    public String getDomain() {
      return domain;
    }
    public void setDomain(String domain) {
      this.domain = domain;
    }
    public Record(int deviceFlag, String device, String domain) {
      this.deviceFlag = deviceFlag;
      this.device = device;
      this.domain = domain;
    }
  }
}
And it worked well:
+------+----------+------+----+ |device|deviceFlag|domain| id| +------+----------+------+----+ | null| 1| b|null| | a| 1| b| a#b| | b| 0| c| b| +------+----------+------+----+
What was your problem exactly?
Created 02-06-2017 03:36 AM
Try
df.withColumn("id",when($"deviceFlag"===1,        concat($"device", lit("#"),$"domain")).otherwise($"device"));Created 02-06-2017 04:18 PM
@Karthik Narayanan - thanks, tried that but its giving me - " Incompatible operand types String and int" at the statement - when($"deviceFlag"===1. I am writing in java - is there something else I am missing?
Created 02-06-2017 04:27 PM
Try with a "1" , deviceflag may be a string so it needs to be compared with string .
Created on 02-06-2017 07:57 PM - edited 08-18-2019 04:55 AM
This worked well for me in Spark (Scala), which looks to be the same as @Karthik Narayanan's answer:
import org.apache.spark.sql.SparkSession
import spark.implicits._
val data = Seq((1111, 1, "device1", "domain1"), (2222, 0, "device2", "domain2"), (3333, 1, "device3", "domain3"), (4444, 0, "device4", "domain4"))
val df = data.toDF("id","deviceFlag","device","domain")
df.show()
val result = df.withColumn("new_id", when($"deviceFlag"===1, concat($"device", lit("#"), $"domain")).otherwise($"device"))
result.show()
This will output:
As an alternative (and reference for others), the PySpark syntax will look like this:
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, concat
data = [(1111, 1, 'device1', 'domain1'), (2222, 0, 'device2', 'domain2'), (3333, 1, 'device3', 'domain3'), (4444, 0, 'device4', 'domain4')]
df = spark.createDataFrame(data, ['id','deviceFlag','device','domain'])
df.show()
result = df.withColumn("new_id", F.when(df["deviceFlag"]==1, concat(df["device"], lit("#"), df["domain"])).otherwise(df["device"]))
result.show()
Hope this helps!
Created 02-06-2017 09:47 PM
Good pick Dan. Just noticed that Id was part of his original DF.
Created 02-06-2017 10:17 PM
Thanks for the detailed answer. My code is in Java so I am running into issues trying to convert this scala solution into Java. Any idea on what the version Java version of this could be?
@Karthik Narayanan - deviceFlag is Int so your comparison is correct. Just need the Java equivalent of this..
Created 02-07-2017 06:02 AM
df.withColumn("id",when(df.col("deviceFlag").equalTo(1),concat(df.col("device"), lit("#"),df.col("domain"))).otherwise(df.col("device")));Created 02-09-2017 07:09 PM
Thanks for the details, this is helpful
Created 02-07-2017 02:38 PM
I think your solution is ok. Probably you have problem with the input data (Try to call df.show() and df.printSchema() with the original df before the transformation). I tried out your code with this class:
package com.hortonworks.spark.example;
import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
public class SparkStarter {
  public static void main(String[] args) {
    new SparkStarter().run();
  }
  private void run() {
    SparkConf conf = new SparkConf().setAppName("test").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<Record> data = Arrays.asList(new Record(1, null, "b"),new Record(1, "a", "b"), new Record(0, "b", "c"));
    JavaRDD<Record> distData = sc.parallelize(data);
    SQLContext sql = new SQLContext(sc);
    Dataset<Row> df = sql.createDataFrame(distData, Record.class);
    df.withColumn("id",
        ((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
            concat(df.col("device"), lit("#"),
                df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
  }
  public static class Record implements Serializable {
    int deviceFlag;
    String device;
    String domain;
    public int getDeviceFlag() {
      return deviceFlag;
    }
    public void setDeviceFlag(int deviceFlag) {
      this.deviceFlag = deviceFlag;
    }
    public String getDevice() {
      return device;
    }
    public void setDevice(String device) {
      this.device = device;
    }
    public String getDomain() {
      return domain;
    }
    public void setDomain(String domain) {
      this.domain = domain;
    }
    public Record(int deviceFlag, String device, String domain) {
      this.deviceFlag = deviceFlag;
      this.device = device;
      this.domain = domain;
    }
  }
}
And it worked well:
+------+----------+------+----+ |device|deviceFlag|domain| id| +------+----------+------+----+ | null| 1| b|null| | a| 1| b| a#b| | b| 0| c| b| +------+----------+------+----+
What was your problem exactly?
 
					
				
				
			
		
