Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Leveraging Brickhouse in Spark2 (pivot)

Highlighted

Leveraging Brickhouse in Spark2 (pivot)

New Contributor

Resources:

  1. https://stackoverflow.com/questions/23025380/how-to-transpose-pivot-data-in-hive
  2. https://github.com/klout/brickhouse
  3. https://community.hortonworks.com/articles/768/using-hive-udfudafudtf-with-sparksql.html

 

Trying to leverage Brickhouse UDAF in my pyspark2 program.

 

started Spark2 shell with:

pyspark2 --jars brickhouse-0.7.1-SNAPSHOT.jar

 

I did the following to register the function:

spark.sql("create temporary function collect as 'brickhouse.udf.collect.CollectUDAF'")

 

Then to use the UDAF on TempView (tab1):

(issue_id : string, field: string, from_val: string)
(Issue1, priority, P1)
(Issue1, severity, Critical)
(Issue2, priority, P3)
(Issue2, severity, Medium)

 

Query used:

 

spark.sql("select issue_id, g_m['priority'] as priority, g_m['severity'] as severity from (
    select issue_id, collect(field, from_val) as g_m from tab1 group_by issue_id
) aa")

 

 

To achieve the following pivot:

issue_id, priority, severity
Issue1, P1, Critical
Issue2, P3, Medium

 

But what I get is:

Traceback (most recent call last):
File "", line 1, in 
File "/opt/..../pyspark/sql/session.py", line 545, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/opt/..../java_gateway.py", line 1133, in call
File "/opt/..../utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input 'from' expecting {, 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 88)\n\n== SQL ==\nselect issue_id, created_date, g_m['priority'] as priority, g_m['severity'] as severity from ( select issue_id, to_date(i_created_date) as created_date, collect(field, from_val) as g_m from tab1 group_by issue_id, created_date) aa\n----------------------------------------------------------------------------------------^^^\n"

 

I would have preferred to do this by:

some_df.groupBy("issue_id").pivot("field").max("from_val")

But this thows an error saying that for the aggregate function max, I cannot use a string field.

 

 

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/pyspark/sql/group.py", line 40, in _api
    jdf = getattr(self._jgd, name)(_to_seq(self.sql_ctx._sc, cols))
  File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'"from_val" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

 

Any idea how I can solve this error? Or is there any better way to do this?