Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Pyspark 2.4.3, Read Avro format message from Kafka - Pyspark Structured streaming

Pyspark 2.4.3, Read Avro format message from Kafka - Pyspark Structured streaming

New Contributor

I am trying to read Avro messages from Kafka, using PySpark 2.4.3. Based on the below stack over flow link , Am able to covert into Avro format (to_avro) and code is working as expected. but from_avro is not working and getting below issue.Are there any other modules that support reading avro messages streamed from Kafka? This is Cloudra distribution environment. Please suggest on this .

 

Reference : Pyspark 2.4.0, read avro from kafka with read stream - Python

Environment Details :

Spark :

 / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1.2.6.1.0-129
      /_/

Using Python version 3.6.1 (default, Jul 24 2019 04:52:09)

Pyspark :

pyspark 2.4.3

Spark_submit :

/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064


I am trying to read Avro messages from Kafka, using PySpark 2.4.3. Based on the below stack over flow link , Am able to covert into Avro format (to_avro) and code is working as expected. but from_avro is not working and getting below issue.Are there any other modules that support reading avro messages streamed from Kafka? This is Cloudra distribution environment. Please suggest on this .

Reference : Pyspark 2.4.0, read avro from kafka with read stream - Python

Environment Details :

Spark :

 / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1.2.6.1.0-129
      /_/

Using Python version 3.6.1 (default, Jul 24 2019 04:52:09)

Pyspark :

pyspark 2.4.3

Spark_submit :

/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064

to_avro

from pyspark.sql.column import Column, _to_java_column 

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 
from pyspark.sql.functions import col, struct


avro_type_struct = """
{
  "type": "record",
  "name": "struct",
  "fields": [
    {"name": "col1", "type": "long"},
    {"name": "col2", "type": "string"}
  ]
}"""
df = spark.range(10).select(struct(    col("id"),    col("id").cast("string").alias("id2")
).alias("struct"))avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))avro_struct_df.show(3)
+----------+
|      avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+only showing top 3 rows

from_avro:

avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)

Error Message :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.6.1.0-129/spark2/python/pyspark/sql/dataframe.py", line 993, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/hdp/2.6.1.0-129/spark2/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o61.select.
: java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;        at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:66)        at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala
Don't have an account?
Coming from Hortonworks? Activate your account here