Support Questions

Find answers, ask questions, and share your expertise

Not able to split the column into multiple columns in Spark Dataframe

avatar
Super Collaborator

Hi all,

I want to create a dataframe in Spark and assign proper schema to the data. I have multiple files under one HDFS directory and I am reading all files using the following command:

%pyspark

logs_df = sqlContext.read.text("hdfs://sandbox.hortonworks.com:8020/tmp/nifioutput")

This is creating a dataframe and stores everything in a single column. Next, I want to derive multiple columns from this single column. Typing this:

%pyspark

from pyspark.sql.functions import split, expr

logs_df.select(expr("(split(value, '|'))[0]").cast("string").alias("IP"), expr("(split(value, '|'))[1]").cast("string").alias("Time"), expr("(split(value, '|'))[2]").cast("string").alias("Request_Type"), expr("(split(value, '|'))[3]").cast("integer").alias("Response_Code"), expr("(split(value, '|'))[4]").cast("string").alias("City"), expr("(split(value, '|'))[5]").cast("string").alias("Country"), expr("(split(value, '|'))[6]").cast("string").alias("Isocode"), expr("(split(value, '|'))[7]").cast("double").alias("Latitude"), expr("(split(value, '|'))[8]").cast("double").alias("Longitude")).show()

gives me a strange result. It takes only 1 character from the row instead of using the delimiter (i.e. I) and stores it in different columns.

+---+----+------------+-------------+----+-------+-------+--------+---------+

| IP|Time|Request_Type|Response_Code|City|Country|Isocode|Latitude|Longitude|

+---+----+------------+-------------+----+-------+-------+--------+---------+

| 1| 3| 3| null| 6| 8| .| 1.0| 8.0|

| 1| 3| 3| null| 6| 8| .| 1.0| 8.0|

| 1| 3| 3| null| 6| 8| .| 1.0| 8.0|

As you can see here, each column is taking only 1 character, 133.68.18.180 should be an IP address only. Is this the right way to create multiple columns out of one? Please help.

PS - Want to avoid regexp_extract in this.

1 ACCEPTED SOLUTION

avatar

You might want to use "com.databricks:spark-csv_2.10:1.5.0", e.g.

pyspark --packages "com.databricks:spark-csv_2.10:1.5.0"

and then use (the csv file is in the folder /tmp/data2):

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
  StructField("IP",             StringType()),
  StructField("Time",           StringType()),
  StructField("Request_Type",   StringType()),
  StructField("Response_Code",  StringType()),
  StructField("City",           StringType()),
  StructField("Country",        StringType()),
  StructField("Isocode",        StringType()),
  StructField("Latitude",       DoubleType()),
  StructField("Longitude",      DoubleType())
])

logs_df = sqlContext.read\
                    .format("com.databricks.spark.csv")\
                    .schema(schema)\
                    .option("header", "false")\
                    .option("delimiter", "|")\
                    .load("/tmp/data2")
logs_df.show()

Result:

+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|          IP|    Time|Request_Type|Response_Code|  City|       Country|Isocode|Latitude|Longitude|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|192.168.1.19|13:23:56|         GET|          200|London|United Kingdom|     UK| 51.5074|   0.1278|
|192.168.5.23|09:45:13|        POST|          404|Munich|       Germany|     DE| 48.1351|   11.582|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+

View solution in original post

2 REPLIES 2

avatar

You might want to use "com.databricks:spark-csv_2.10:1.5.0", e.g.

pyspark --packages "com.databricks:spark-csv_2.10:1.5.0"

and then use (the csv file is in the folder /tmp/data2):

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
  StructField("IP",             StringType()),
  StructField("Time",           StringType()),
  StructField("Request_Type",   StringType()),
  StructField("Response_Code",  StringType()),
  StructField("City",           StringType()),
  StructField("Country",        StringType()),
  StructField("Isocode",        StringType()),
  StructField("Latitude",       DoubleType()),
  StructField("Longitude",      DoubleType())
])

logs_df = sqlContext.read\
                    .format("com.databricks.spark.csv")\
                    .schema(schema)\
                    .option("header", "false")\
                    .option("delimiter", "|")\
                    .load("/tmp/data2")
logs_df.show()

Result:

+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|          IP|    Time|Request_Type|Response_Code|  City|       Country|Isocode|Latitude|Longitude|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|192.168.1.19|13:23:56|         GET|          200|London|United Kingdom|     UK| 51.5074|   0.1278|
|192.168.5.23|09:45:13|        POST|          404|Munich|       Germany|     DE| 48.1351|   11.582|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+

avatar
Super Collaborator

Thanks a lot @Bernhard Walter , that works like a charm