Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

PySpark UDF, only None is passed to the input

Highlighted

PySpark UDF, only None is passed to the input

New Contributor

I have trouble with UDF function at my Kafka streaming application. Whenever UDF function is called only None value is passed to the input instead of the valid column value. TypeError
then raised, because app expecting str, not None.

 

UDF function definition:

 

 

@udf(returnType=StringType())
def get_asn(ip_addr):
   from fm_kafka2parquet.asn_lookup import AsnLookup

   result = AsnLookup\
      .get_instance(ASN_DB_PATH)\
      .get().lookup(ip_addr)[0] # first record from tuple is ASN number
   if result is None:
      return "n/a"
   return result

 

 

UDF function calling:

 

 

# data frame for netflow reading
df = spark \
   .readStream \
   .format("kafka") \
   .option("kafka.bootstrap.servers", CONFIG_KAFKA_BOOTSTRAP) \
   .option("subscribe", CONFIG_KAFKA_TOPIC) \
   .option("startingOffsets", "latest") \
   .load() \
   .selectExpr("CAST(value AS STRING)") \
   .withColumn("net", from_json("value", Structures.get_ipfix_structure())) \
   .select("net.*")

# remove ipfix prefix in case of ipfixv1 collector
temp_list = []
for c in df.columns:
   new_name = c.replace('ipfix.', '')
   temp_list.append(new_name)
df = df.toDF(*temp_list)

# enrichment
edf = df \
   .withColumn("sourceAS", get_asn('sourceIPv4Address')) \
   .withColumn("destinationAS", get_asn('destinationIPv4Address'))

 

 

Everything ends with err, which is raised by pyasn library used by get_asn UDF function:

 

 

TypeError: search_best() argument 1 must be str, not None

 

 

 

I am suspecting Cloudera platform to doing it because everything works fine at my local environment.

Don't have an account?
Coming from Hortonworks? Activate your account here