Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to print the values of two different readStream with pyspark?

Highlighted

How to print the values of two different readStream with pyspark?

New Contributor

I have a problem with printing my dataframe on pyspark. Basically I'm reading from two different json files and I've joined them. I also have columns with equal names and values, I had to join them.
Unfortunately I can get the columns but not the rows of values.

This is the output:

+---+----+----+---+-------------+----+-------------+----+----------+-------------+
|id|Date|Name|Hour|Last Price|Var%|Last Value|Type|Volatility|Value_at_risk|
+---+----+----+---+-------------+----+-------------+----+----------+-------------+
+---+----+----+---+-------------+----+-------------+----+----------+-------------+


This is my code:

# Subscribe to 1 topic
jsonschema = StructType().add("id", StringType()) \
.add("Date", StringType()) \
.add("Name", StringType()) \
.add("Hour", StringType()) \
.add("Last Price", StringType()) \
.add("Var%", StringType()) \
.add("Last Value", StringType())

jsonschema1 = StructType().add("id", StringType()) \
.add("Date", StringType()) \
.add("Name", StringType()) \
.add("Volatility", StringType()) \
.add("Value_at_risk", StringType())

df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:2181") \
.option("subscribe", "Be_borsa") \
.option("startingOffsets", "earliest") \
.load()

df = df.select(from_json(col("value").cast("string"), jsonschema).alias("parsed_m_values"))

df = df.select("parsed_m_values.*")

df1 = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:2181") \
.option("subscribe", "Be_teleborsa") \
.option("startingOffsets", "earliest") \
.load()


df1 = df1.select(from_json(col("value").cast("string"), jsonschema1).alias("parsed_m1_values"))

df1 = df1.select("parsed_m1_values.*")


dfResult=df.join(df1,["id","Date","Name"])


dfResult.createOrReplaceTempView("table1")

# Start the stream and query the in-memory table
query=dfResult.writeStream.format("memory").queryName("t10").start()

raw= spark.sql("select * from t10")

raw.show()


These are the two JSON files that I am consuming from two different topics:

{"id": "Be_20200330", "Date": "2020-03-30", "Name": "Be", "Hour": "15.49.24", "Last Price": "0,862", "Var%": "-1,93", "Last Value": "1.020"}

{"id": "Be_20200330", "Date": "2020-03-30", "Name": "Be", "Volatility": "2,352", "Value_at_risk": "5,471"}

 

This is the result that I would like to achieve.

id | Date | Name | Hour | Last Price | Var% | Last Value | Volatility |Value_at_risk
----------------------- RESULTS -------------------------------------


How can I do it using pyspark? Thanks

Don't have an account?
Coming from Hortonworks? Activate your account here