Reply
Contributor
Posts: 44
Registered: ‎01-05-2016
Accepted Solution

Pyspark: Table Dataframe returning empty records from Partitioned Table

Hi all, I think it's time to ask for some help on this, after 3 days of tries and extensive search on the web.

 

Long story short:

 

 

FACTS

----------

- Pyspark with iPython

- version 1.5.0-cdh5.5.1

- I have 2 simple (test) partitioned tables. One external, one managed

- If I query them via Impala or Hive I can see the data. No errors

- If I try to create a Dataframe out of them, no errors. But the Column Values are NULL, except from the "partitioning" column which appears to be correct. Well, behaviour is slightly different according to how I create the Table. More on this below...

 

 

HOW I CREATED THE TABLES

-------------------------------------------

- Created both of them using Impala

 

Table 1:

CREATE EXTERNAL TABLE mn.opt_tbl_blade
PARTITIONED BY (st_insdt string)
STORED AS AVRO
LOCATION 'hdfs:///user/hive/mn/opt_tbl_blade'
TBLPROPERTIES ('avro.schema.url'='hdfs://172.16.0.10/user/mn/SCHEMAS/opt_tbl_blade/opt_tbl_blade.avsc');

ALTER TABLE mn.opt_tbl_blade ADD PARTITION (st_insdt="2008-02");

 

Table 2:

create table mn.logs (field1 string, field2 string, field3 string)
partitioned by (year string, month string , day string, host string)
row format delimited fields terminated by ',';

 

 

HOW I PUT THE DATA INTO THE TABLES

---------------------------------------------------------

Table 1 (EXTERNAL):

- I just moved the Avro files obtained using Sqoop under the "st_insdt=2008-02" Directory in HDFS

- I repeat: I can query the Data from Hue (Impala, Hive)

 

Table 2 (MANAGED):

insert into logs partition (year="2013", month="07", day="28", host="host1") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="28", host="host2") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="29", host="host1") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="29", host="host2") values ("foo","foo","foo");
insert into logs partition (year="2013", month="08", day="01", host="host1") values ("foo","foo","foo");

- Also in this case, a simple query "select * from logs" gives me the right results!

 

 

NOW LET'S LAUNCH PYSPARK AND:

----------------------------------------------------

- Regarding Table 1 (EXTERNAL):

 

In [1]: blade_DF = sqlContext.table("mn.opt_tbl_blade")


In [2]: blade_DF.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- descr: string (nullable = true)
 |-- thickness: double (nullable = true)
 |-- flg_active: string (nullable = true)
 |-- flg_delete: string (nullable = true)
 |-- st_ins_date: string (nullable = true)
 |-- st_upd_date: string (nullable = true)
 |-- st_created_by: long (nullable = true)
 |-- st_last_upd_by: long (nullable = true)
 |-- st_last_upd_oslogin: string (nullable = true)
 |-- st_insdt: string (nullable = true)
 
 
 In [2]: blade_DF.count()
 
 Out[3]: 6


In [4]: blade_DF.show()

+----+----+-----+---------+----------+----------+-----------+-----------+-------------+--------------+-------------------+--------+
|  id|name|descr|thickness|flg_active|flg_delete|st_ins_date|st_upd_date|st_created_by|st_last_upd_by|st_last_upd_oslogin|st_insdt|
+----+----+-----+---------+----------+----------+-----------+-----------+-------------+--------------+-------------------+--------+
|null|null| null|     null|      null|      null|       null|       null|         null|          null|               null| 2008-02|
|null|null| null|     null|      null|      null|       null|       null|         null|          null|               null| 2008-02|
|null|null| null|     null|      null|      null|       null|       null|         null|          null|               null| 2008-02|
|null|null| null|     null|      null|      null|       null|       null|         null|          null|               null| 2008-02|
|null|null| null|     null|      null|      null|       null|       null|         null|          null|               null| 2008-02|
|null|null| null|     null|      null|      null|       null|       null|         null|          null|               null| 2008-02|
+----+----+-----+---------+----------+----------+-----------+-----------+-------------+--------------+-------------------+--------+

As you can see, the Rows are somehow "sensed", as the number is correct (6 records) and the last field on the right (the Partitioning Field) is correct (this table has just one partition).

 

But all the fields are NULL. This is definitely not true, and it's not what I see from a "select * from opt_tbl_blade"!!

 

 

- Regarding Table 2 (MANAGED):

 

In [5]: logs_DF = sqlContext.table("mn.logs")

In [6]: logs_DF.show()

+-----------+------+------+----+-----+---+-----+
|     field1|field2|field3|year|month|day| host|
+-----------+------+------+----+-----+---+-----+
|foo,foo,foo|  null|  null|2013|   07| 29|host1|
|foo,foo,foo|  null|  null|2013|   08| 01|host1|
|foo,foo,foo|  null|  null|2013|   07| 29|host2|
|foo,foo,foo|  null|  null|2013|   07| 28|host2|
|foo,foo,foo|  null|  null|2013|   07| 28|host1|
+-----------+------+------+----+-----+---+-----+

This is maybe even more strange. The 3 Fields are "compacted" into one, with jiust one value and a comma separator. But if I run a plain "select * from mn.logs" I see just one "foo" string in each Field (field1, field2, field3)!!

 

I'm strongly suspecting I'm doing something wrong with field definitions/separators, but as I said in the beginning after extensive search and tries I did not succeed.

 

BUT: Please note that IF THE TABLE IS NOT PARTITIONED, IT CAN BE AVRO, IT CAN BE MANAGED, IT CAN BE WATHEVER... Results are correct!!!

 

Can anybody with fresh eyes give me some suggestions please?

 

Thanks in advance for any help...

 

 

 

 

Contributor
Posts: 44
Registered: ‎01-05-2016

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

For the records, I've just tried Testcase "Table 2" right now on a Cloudera Quickstart VM with Spark 1.3.0 and IT WORKS correctly...

 

Haven't tried the Avro files yet (I can't transfer the files right now), but I'm starting to think something has maybe broken up since Spark 1.3.0 ?

 

Regards

Contributor
Posts: 44
Registered: ‎01-05-2016

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

Update #2 (then I'm going to wait for some insights...)

 

I also tried Testcase 1 (Avro External Table) on the Quickstart VM with Spark 1.3.0 (using a different Table and Avro files, but using the same logic). IT WORKS too...

 

In this case, just before getting the output, I've noticed this Warning that isn't showing up at all with Spark 1.5.0:

 

 

16/01/05 16:00:03 INFO avro.AvroSerDe: Avro schema is {"type":"record",
"name":"cust1","namespace":"default","fields":[{"name":"customer_id",
"type":["null","int"],"doc":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000",
"default":null},{"name":"customer_fname","type":["null","string"],
"default":null},{"name":"customer_lname","type":["null","string"],
"default":null},{"name":"customer_email","type":["null","string"],
"default":null},{"name":"customer_password","type":["null","string"],
"default":null},{"name":"customer_street","type":["null","string"],
"default":null},{"name":"customer_city","type":["null","string"],
"default":null},{"name":"customer_state","type":["null","string"],
"default":null},{"name":"customer_zipcode","type":["null","string"],
"default":null}]} 16/01/05 16:00:03 WARN avro.AvroDeserializer: Received different schemas.
Have to re-encode: {"type":"record","name":"sqoop_import_customers",
"doc":"Sqoop import of customers","fields":[{"name":"customer_id",
"type":["int","null"],"columnName":"customer_id","sqlType":"4"},
{"name":"customer_fname","type":["string","null"],"columnName":"customer_fname",
"sqlType":"12"},{"name":"customer_lname","type":["string","null"],
"columnName":"customer_lname","sqlType":"12"},{"name":"customer_email",
"type":["string","null"],"columnName":"customer_email","sqlType":"12"},
{"name":"customer_password","type":["string","null"],
"columnName":"customer_password","sqlType":"12"},{"name":"customer_street",
"type":["string","null"],"columnName":"customer_street","sqlType":"12"},
{"name":"customer_city","type":["string","null"],"columnName":"customer_city",
"sqlType":"12"},{"name":"customer_state","type":["string","null"],
"columnName":"customer_state","sqlType":"12"},{"name":"customer_zipcode",
"type":["string","null"],"columnName":"customer_zipcode","sqlType":"12"}],
"tableName":"customers"}

 

But the Output is correct:

 

 

customer_id customer_fname customer_lname customer_email customer_password customer_street      customer_city customer_state customer_zipcode st_insdt
1           Richard        Hernandez      XXXXXXXXX      XXXXXXXXX         6303 Heather Plaza   Brownsville   TX             78521            2008-02 
2           Mary           Barrett        XXXXXXXXX      XXXXXXXXX         9526 Noble Embers... Littleton     CO             80126            2008-02 
3           Ann            Smith          XXXXXXXXX      XXXXXXXXX         3422 Blue Pioneer... Caguas        PR             00725            2008-02 
4           Mary           Jones          XXXXXXXXX      XXXXXXXXX         8324 Little Common   San Marcos    CA             92069            2008-02 
5           Robert         Hudson         XXXXXXXXX      XXXXXXXXX         10 Crystal River ... Caguas        PR             00725            2008-02 
6           Mary           Smith          XXXXXXXXX      XXXXXXXXX         3151 Sleepy Quail... Passaic       NJ             07055            2008-02 
7           Melissa        Wilcox         XXXXXXXXX      XXXXXXXXX         9453 High Concession Caguas        PR             00725            2008-02 
8           Megan          Smith          XXXXXXXXX      XXXXXXXXX         3047 Foggy Forest... Lawrence      MA             01841            2008-02 
9           Mary           Perez          XXXXXXXXX      XXXXXXXXX         3616 Quaking Street  Caguas        PR             00725            2008-02 
10          Melissa        Smith          XXXXXXXXX      XXXXXXXXX         8598 Harvest Beac... Stafford      VA             22554            2008-02 
11          Mary           Huffman        XXXXXXXXX      XXXXXXXXX         3169 Stony Woods     Caguas        PR             00725            2008-02 
12          Christopher    Smith          XXXXXXXXX      XXXXXXXXX         5594 Jagged Ember... San Antonio   TX             78227            2008-02 
13          Mary           Baldwin        XXXXXXXXX      XXXXXXXXX         7922 Iron Oak Gar... Caguas        PR             00725            2008-02 
14          Katherine      Smith          XXXXXXXXX      XXXXXXXXX         5666 Hazy Pony Sq... Pico Rivera   CA             90660            2008-02 
15          Jane           Luna           XXXXXXXXX      XXXXXXXXX         673 Burning Glen     Fontana       CA             92336            2008-02 
16          Tiffany        Smith          XXXXXXXXX      XXXXXXXXX         6651 Iron Port       Caguas        PR             00725            2008-02 
17          Mary           Robinson       XXXXXXXXX      XXXXXXXXX         1325 Noble Pike      Taylor        MI             48180            2008-02 
18          Robert         Smith          XXXXXXXXX      XXXXXXXXX         2734 Hazy Butterf... Martinez      CA             94553            2008-02 
19          Stephanie      Mitchell       XXXXXXXXX      XXXXXXXXX         3543 Red Treasure... Caguas        PR             00725            2008-02 
20          Mary           Ellis          XXXXXXXXX      XXXXXXXXX         4703 Old Route       West New York NJ             07093            2008-02 

 

Any ideas?

Contributor
Posts: 44
Registered: ‎01-05-2016

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

[ Edited ]

Finally found a solution (or a workaround, don't know exactly how to call it... As apparently what I described above was not my fault, in the end, but was due to something that probably broke up since Spark 1.3.0).

 

Using "Parquet" storage file format apparently does the trick. It works smooth and with no warnings. I think I'm going to use this file format from now on... At least until they fix the Partitioned Avro thing.

 

Hope this helps somebody in the future :)

Explorer
Posts: 6
Registered: ‎02-04-2016

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

Hi all,

 

FrozenWaves solution works fine for managed tables; but we have a lot of raw data in textfile format (csv) in an external table definition that Pyspark isn't picking up either, exactly as described above. Again, accessing the data from Pyspark worked fine when we were running CDH 5.4 and Spark 1.3, but we've recently upgraded to CDH 5.5 and Spark 1.5 in order to run Hue 3.9 and the Spark Livy REST server.

 

Anyone got any ideas, or are we stuck with creating a Parquet managed table to access the data in Pyspark?

Contributor
Posts: 44
Registered: ‎01-05-2016

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

Hi GeoffD, I don't know if what follows is going to help out somehow, but I had the chance to try a few different scenarios lately, and I've noticed that also with Parquet file format a few inconsistencies can occur sometimes.

 

For instance: if you have data coming in from 2 different sources e.g. a Sqoop job creating a Parquet file and a Pig job creating another Parquet output, you have to take care to carefully define the names of the output fields to be all uppercase.

 

Also when you create the Managed Table you have to refer to a "LIKE PARQUET" file with uppercase field names to be inferred.

 

If you don't take care of all that, what I have experienced is that if you take the output files from several Pig and Sqoop procedures and put them under the Table's directory, the "NULL field" issue will happen. Even if the Schemas are apparently identical.

 

What I have found out is that under some conditions (e.g. when you rename fields in a Sqoop or Pig job), the resulting Parquet Files will differ in the fact that the Sqoop job will ALWAYS create Uppercase Field Names, where the corresponding Pig Job does not do that and keeps the exact Case you'd have specified inside the Pig script.

 

Now, I haven't been digging more in deep into all this, as for me getting to this result is enough. But I've started to think that in the new Spark, at some stage, there is something wrong when parsing the Field Names, probably due to a Case issue. So maybe you'd want to try and experiment around this, using your CSV and Plain Text Files too. Maybe you'll come up with a final solution with plain text files too :)

 

HTH

New Contributor
Posts: 1
Registered: ‎01-03-2017

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

Hi,

I have de same problem, when i do the query, i get null values.

 

You were able to solve the problem with the Avro Tables?

 

Regards

Contributor
Posts: 44
Registered: ‎01-05-2016

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

Hi AdrianMonter, sorry to say I haven't found a specific solution for the Avro file format in the meanwhile.

 

I'm sticking to Parquet file format since I had this problem, and for now it covers all my needs... Maybe in the latest CDH/Spark releases this has been fixed? Maybe somebody from @cloudera can tell us someting more?

Highlighted
Cloudera Employee
Posts: 20
Registered: ‎01-17-2017

Re: Pyspark: Table Dataframe returning empty records from Partitioned Table

A schema or protocol may not contain multiple definitions of a fullname. Further, a name must be defined before it is used ("before" in the depth-first, left-to-right traversal of the JSON parse tree, where the types attribute of a protocol is always deemed to come "before" the messages attribute.)

Announcements