Reply
New Contributor
Posts: 2
Registered: ‎09-14-2017

Leveraging Brickhouse in Spark2 (pivot)

[ Edited ]

Resources:

  1. https://stackoverflow.com/questions/23025380/how-to-transpose-pivot-data-in-hive
  2. https://github.com/klout/brickhouse
  3. https://community.hortonworks.com/articles/768/using-hive-udfudafudtf-with-sparksql.html

 

Trying to leverage Brickhouse UDAF in my pyspark2 program.

 

started Spark2 shell with:

pyspark2 --jars brickhouse-0.7.1-SNAPSHOT.jar

 

I did the following to register the function:

spark.sql("create temporary function collect as 'brickhouse.udf.collect.CollectUDAF'")

 

Then to use the UDAF on TempView (tab1):

(issue_id : string, field: string, from_val: string)
(Issue1, priority, P1)
(Issue1, severity, Critical)
(Issue2, priority, P3)
(Issue2, severity, Medium)

 

Query used:

 

spark.sql("select issue_id, g_m['priority'] as priority, g_m['severity'] as severity from (
    select issue_id, collect(field, from_val) as g_m from tab1 group_by issue_id
) aa")

 

 

To achieve the following pivot:

issue_id, priority, severity
Issue1, P1, Critical
Issue2, P3, Medium

 

But what I get is:

Traceback (most recent call last):
File "", line 1, in 
File "/opt/..../pyspark/sql/session.py", line 545, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/opt/..../java_gateway.py", line 1133, in call
File "/opt/..../utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input 'from' expecting {, 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 88)\n\n== SQL ==\nselect issue_id, created_date, g_m['priority'] as priority, g_m['severity'] as severity from ( select issue_id, to_date(i_created_date) as created_date, collect(field, from_val) as g_m from tab1 group_by issue_id, created_date) aa\n----------------------------------------------------------------------------------------^^^\n"

 

I would have preferred to do this by:

some_df.groupBy("issue_id").pivot("field").max("from_val")

But this thows an error saying that for the aggregate function max, I cannot use a string field.

 

 

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/pyspark/sql/group.py", line 40, in _api
    jdf = getattr(self._jgd, name)(_to_seq(self.sql_ctx._sc, cols))
  File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'"from_val" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

 

Any idea how I can solve this error? Or is there any better way to do this?

 

Announcements