Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Not able to split the column into multiple columns in Spark Dataframe

avatar
Super Collaborator

Hi all,

I want to create a dataframe in Spark and assign proper schema to the data. I have multiple files under one HDFS directory and I am reading all files using the following command:

%pyspark

logs_df = sqlContext.read.text("hdfs://sandbox.hortonworks.com:8020/tmp/nifioutput")

This is creating a dataframe and stores everything in a single column. Next, I want to derive multiple columns from this single column. Typing this:

%pyspark

from pyspark.sql.functions import split, expr

logs_df.select(expr("(split(value, '|'))[0]").cast("string").alias("IP"), expr("(split(value, '|'))[1]").cast("string").alias("Time"), expr("(split(value, '|'))[2]").cast("string").alias("Request_Type"), expr("(split(value, '|'))[3]").cast("integer").alias("Response_Code"), expr("(split(value, '|'))[4]").cast("string").alias("City"), expr("(split(value, '|'))[5]").cast("string").alias("Country"), expr("(split(value, '|'))[6]").cast("string").alias("Isocode"), expr("(split(value, '|'))[7]").cast("double").alias("Latitude"), expr("(split(value, '|'))[8]").cast("double").alias("Longitude")).show()

gives me a strange result. It takes only 1 character from the row instead of using the delimiter (i.e. I) and stores it in different columns.

+---+----+------------+-------------+----+-------+-------+--------+---------+

| IP|Time|Request_Type|Response_Code|City|Country|Isocode|Latitude|Longitude|

+---+----+------------+-------------+----+-------+-------+--------+---------+

| 1| 3| 3| null| 6| 8| .| 1.0| 8.0|

| 1| 3| 3| null| 6| 8| .| 1.0| 8.0|

| 1| 3| 3| null| 6| 8| .| 1.0| 8.0|

As you can see here, each column is taking only 1 character, 133.68.18.180 should be an IP address only. Is this the right way to create multiple columns out of one? Please help.

PS - Want to avoid regexp_extract in this.

1 ACCEPTED SOLUTION

avatar

You might want to use "com.databricks:spark-csv_2.10:1.5.0", e.g.

pyspark --packages "com.databricks:spark-csv_2.10:1.5.0"

and then use (the csv file is in the folder /tmp/data2):

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
  StructField("IP",             StringType()),
  StructField("Time",           StringType()),
  StructField("Request_Type",   StringType()),
  StructField("Response_Code",  StringType()),
  StructField("City",           StringType()),
  StructField("Country",        StringType()),
  StructField("Isocode",        StringType()),
  StructField("Latitude",       DoubleType()),
  StructField("Longitude",      DoubleType())
])

logs_df = sqlContext.read\
                    .format("com.databricks.spark.csv")\
                    .schema(schema)\
                    .option("header", "false")\
                    .option("delimiter", "|")\
                    .load("/tmp/data2")
logs_df.show()

Result:

+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|          IP|    Time|Request_Type|Response_Code|  City|       Country|Isocode|Latitude|Longitude|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|192.168.1.19|13:23:56|         GET|          200|London|United Kingdom|     UK| 51.5074|   0.1278|
|192.168.5.23|09:45:13|        POST|          404|Munich|       Germany|     DE| 48.1351|   11.582|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+

View solution in original post

2 REPLIES 2

avatar

You might want to use "com.databricks:spark-csv_2.10:1.5.0", e.g.

pyspark --packages "com.databricks:spark-csv_2.10:1.5.0"

and then use (the csv file is in the folder /tmp/data2):

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
  StructField("IP",             StringType()),
  StructField("Time",           StringType()),
  StructField("Request_Type",   StringType()),
  StructField("Response_Code",  StringType()),
  StructField("City",           StringType()),
  StructField("Country",        StringType()),
  StructField("Isocode",        StringType()),
  StructField("Latitude",       DoubleType()),
  StructField("Longitude",      DoubleType())
])

logs_df = sqlContext.read\
                    .format("com.databricks.spark.csv")\
                    .schema(schema)\
                    .option("header", "false")\
                    .option("delimiter", "|")\
                    .load("/tmp/data2")
logs_df.show()

Result:

+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|          IP|    Time|Request_Type|Response_Code|  City|       Country|Isocode|Latitude|Longitude|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+
|192.168.1.19|13:23:56|         GET|          200|London|United Kingdom|     UK| 51.5074|   0.1278|
|192.168.5.23|09:45:13|        POST|          404|Munich|       Germany|     DE| 48.1351|   11.582|
+------------+--------+------------+-------------+------+--------------+-------+--------+---------+

avatar
Super Collaborator

Thanks a lot @Bernhard Walter , that works like a charm