Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Parse json data from kafka topic using pyspark

Parse json data from kafka topic using pyspark

New Contributor

Hi,

I'm trying to parse json data that is coming in from a kafka topic into a dataframe. However, when I query the in-memory table, the schema of the dataframe seems to be correct, but all the values are null and I don't really know why.

I am using NiFi to read the data into a kafka topic, and have configured NiFi to get the schema from Hortonworks Schema Registry. So it would be good if someone could show me how to reference that in my python code instead of explicitly typing out the schema.

The json data going into the kafka topic looks as such:

{"index":"0","Conrad":"Persevering system-worthy intranet","address":"8905 Robert Prairie\nJoefort, LA 41089","bs":"envisioneer web-enabled mindshare","city":"Davidland","date_time":"1977-06-26 06:12:48","email":"eric56@parker-robinson.com","paragraph":"Kristine Nash","randomdata":"Growth special factor bit only. Thing agent follow moment seat. Nothing agree that up view write include.","state":"1030.0"}

The code in my Zeppelin notebook is as such:

%dep 
z.load("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")

%pyspark

#Defining my schema

from pyspark.sql.types import StructType , StringType , LongType , IntegerType

schema = StructType().add("index", IntegerType()).add("Conrad", StringType()).add("address",StringType()).add("bs",StringType()).add("city",StringType()).add("date_time",LongType()).add("email",StringType()).add("name",StringType()).add("paragraph",StringType()).add("randomdata",IntegerType()).add("state",StringType())

# Read data from kafka topic

lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers","x.x.x.x:2181").option("startingOffsets", "latest").option("subscribe","testdata").load().select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

# Start the stream and query the in-memory table
query=lines.writeStream.format("memory").queryName("t10").start()
raw= spark.sql("select parsed_value.* from t10")
Don't have an account?
Coming from Hortonworks? Activate your account here