Support Questions
Find answers, ask questions, and share your expertise

Spark Dataframe Query not working

I have been trying to make the following Dataframe query work but its not giving me the results. Can someone please help?

Is there a reference guide to see all the syntax to create dataframe queries? Here this is what I want - My dataframe df has many cols among which 4 are -

  1. id
  2. deviceFlag
  3. device
  4. domain

For all rows, if deviceFlag is set to 1, then form a string "device#domain" and store that value in a new col "id". Else, just take the value in the "device" col and store it in the new "id" col without any transformation. So I am creating a new column with the .withColumn and setting value for each row depending upon the corresponding values of "device" and "deviceFlag" cols in that row.

DataFrame result= df.withColumn("id",
    ((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
        concat(df.col("device"), lit("#"),
            df.col("domain")))).otherwise(df.col("device")).alias("id"));
1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark Dataframe Query not working

Cloudera Employee

I think your solution is ok. Probably you have problem with the input data (Try to call df.show() and df.printSchema() with the original df before the transformation). I tried out your code with this class:

package com.hortonworks.spark.example;


import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;


public class SparkStarter {
  public static void main(String[] args) {
    new SparkStarter().run();
  }

  private void run() {

    SparkConf conf = new SparkConf().setAppName("test").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Record> data = Arrays.asList(new Record(1, null, "b"),new Record(1, "a", "b"), new Record(0, "b", "c"));
    JavaRDD<Record> distData = sc.parallelize(data);
    SQLContext sql = new SQLContext(sc);
    Dataset<Row> df = sql.createDataFrame(distData, Record.class);

    df.withColumn("id",
        ((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
            concat(df.col("device"), lit("#"),
                df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
  }

  public static class Record implements Serializable {
    int deviceFlag;
    String device;
    String domain;

    public int getDeviceFlag() {
      return deviceFlag;
    }

    public void setDeviceFlag(int deviceFlag) {
      this.deviceFlag = deviceFlag;
    }

    public String getDevice() {
      return device;
    }

    public void setDevice(String device) {
      this.device = device;
    }

    public String getDomain() {
      return domain;
    }

    public void setDomain(String domain) {
      this.domain = domain;
    }

    public Record(int deviceFlag, String device, String domain) {
      this.deviceFlag = deviceFlag;
      this.device = device;
      this.domain = domain;

    }
  }
}

And it worked well:

+------+----------+------+----+
|device|deviceFlag|domain|  id|
+------+----------+------+----+
|  null|         1|     b|null|
|     a|         1|     b| a#b|
|     b|         0|     c|   b|
+------+----------+------+----+

What was your problem exactly?

View solution in original post

12 REPLIES 12

Re: Spark Dataframe Query not working

Expert Contributor

Try

  1. df.withColumn("id",
  2. when($"deviceFlag"===1,
  3. concat($"device", lit("#"),
  4. $"domain")).otherwise($"device"));

Re: Spark Dataframe Query not working

@Karthik Narayanan - thanks, tried that but its giving me - " Incompatible operand types String and int" at the statement - when($"deviceFlag"===1. I am writing in java - is there something else I am missing?

Re: Spark Dataframe Query not working

Expert Contributor

Try with a "1" , deviceflag may be a string so it needs to be compared with string .

Re: Spark Dataframe Query not working

@Mark Wallace

This worked well for me in Spark (Scala), which looks to be the same as @Karthik Narayanan's answer:

import org.apache.spark.sql.SparkSession
import spark.implicits._

val data = Seq((1111, 1, "device1", "domain1"), (2222, 0, "device2", "domain2"), (3333, 1, "device3", "domain3"), (4444, 0, "device4", "domain4"))

val df = data.toDF("id","deviceFlag","device","domain")

df.show()

val result = df.withColumn("new_id", when($"deviceFlag"===1, concat($"device", lit("#"), $"domain")).otherwise($"device"))

result.show()

This will output:

12157-screen-shot-2017-02-06-at-25603-pm.png

As an alternative (and reference for others), the PySpark syntax will look like this:

from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, concat

data = [(1111, 1, 'device1', 'domain1'), (2222, 0, 'device2', 'domain2'), (3333, 1, 'device3', 'domain3'), (4444, 0, 'device4', 'domain4')]

df = spark.createDataFrame(data, ['id','deviceFlag','device','domain'])

df.show()

result = df.withColumn("new_id", F.when(df["deviceFlag"]==1, concat(df["device"], lit("#"), df["domain"])).otherwise(df["device"]))

result.show()

Hope this helps!

Re: Spark Dataframe Query not working

Expert Contributor

Good pick Dan. Just noticed that Id was part of his original DF.

Re: Spark Dataframe Query not working

Thanks for the detailed answer. My code is in Java so I am running into issues trying to convert this scala solution into Java. Any idea on what the version Java version of this could be?

@Karthik Narayanan - deviceFlag is Int so your comparison is correct. Just need the Java equivalent of this..

Re: Spark Dataframe Query not working

Expert Contributor
  1. df.withColumn("id",
  2. when(df.col("deviceFlag").equalTo(1),
  3. concat(df.col("device"), lit("#"),
  4. df.col("domain"))).otherwise(df.col("device")));

Re: Spark Dataframe Query not working

Thanks for the details, this is helpful

Re: Spark Dataframe Query not working

Cloudera Employee

I think your solution is ok. Probably you have problem with the input data (Try to call df.show() and df.printSchema() with the original df before the transformation). I tried out your code with this class:

package com.hortonworks.spark.example;


import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;


public class SparkStarter {
  public static void main(String[] args) {
    new SparkStarter().run();
  }

  private void run() {

    SparkConf conf = new SparkConf().setAppName("test").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Record> data = Arrays.asList(new Record(1, null, "b"),new Record(1, "a", "b"), new Record(0, "b", "c"));
    JavaRDD<Record> distData = sc.parallelize(data);
    SQLContext sql = new SQLContext(sc);
    Dataset<Row> df = sql.createDataFrame(distData, Record.class);

    df.withColumn("id",
        ((Column) when(df.col("deviceFlag").$eq$eq$eq(1),
            concat(df.col("device"), lit("#"),
                df.col("domain")))).otherwise(df.col("device")).alias("id")).show();
  }

  public static class Record implements Serializable {
    int deviceFlag;
    String device;
    String domain;

    public int getDeviceFlag() {
      return deviceFlag;
    }

    public void setDeviceFlag(int deviceFlag) {
      this.deviceFlag = deviceFlag;
    }

    public String getDevice() {
      return device;
    }

    public void setDevice(String device) {
      this.device = device;
    }

    public String getDomain() {
      return domain;
    }

    public void setDomain(String domain) {
      this.domain = domain;
    }

    public Record(int deviceFlag, String device, String domain) {
      this.deviceFlag = deviceFlag;
      this.device = device;
      this.domain = domain;

    }
  }
}

And it worked well:

+------+----------+------+----+
|device|deviceFlag|domain|  id|
+------+----------+------+----+
|  null|         1|     b|null|
|     a|         1|     b| a#b|
|     b|         0|     c|   b|
+------+----------+------+----+

What was your problem exactly?

View solution in original post