Member since
08-24-2018
6
Posts
0
Kudos Received
0
Solutions
01-25-2019
09:46 AM
Hello, I'm currently trying to automate integration testing on my nifi dataflow, I intend to write a script to check the flowfile contents after each processor in the entire flow. What I'm thinking of is to connect a new processor(to serve as a sink) after each processor to check the structure of the contents. I'm currently looking through the NiFi API and can't find any way to do this? Hoping someone could point me to some useful links. Thank you!
... View more
Labels:
10-18-2018
09:08 AM
Hi, I'm trying to parse json data that is coming in from a kafka topic into a dataframe. However, when I query the in-memory table, the schema of the dataframe seems to be correct, but all the values are null and I don't really know why. I am using NiFi to read the data into a kafka topic, and have configured NiFi to get the schema from Hortonworks Schema Registry. So it would be good if someone could show me how to reference that in my python code instead of explicitly typing out the schema. The json data going into the kafka topic looks as such: {"index":"0","Conrad":"Persevering system-worthy intranet","address":"8905 Robert Prairie\nJoefort, LA 41089","bs":"envisioneer web-enabled mindshare","city":"Davidland","date_time":"1977-06-26 06:12:48","email":"eric56@parker-robinson.com","paragraph":"Kristine Nash","randomdata":"Growth special factor bit only. Thing agent follow moment seat. Nothing agree that up view write include.","state":"1030.0"} The code in my Zeppelin notebook is as such: %dep
z.load("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
%pyspark
#Defining my schema
from pyspark.sql.types import StructType , StringType , LongType , IntegerType
schema = StructType().add("index", IntegerType()).add("Conrad", StringType()).add("address",StringType()).add("bs",StringType()).add("city",StringType()).add("date_time",LongType()).add("email",StringType()).add("name",StringType()).add("paragraph",StringType()).add("randomdata",IntegerType()).add("state",StringType())
# Read data from kafka topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers","x.x.x.x:2181").option("startingOffsets", "latest").option("subscribe","testdata").load().select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
# Start the stream and query the in-memory table
query=lines.writeStream.format("memory").queryName("t10").start()
raw= spark.sql("select parsed_value.* from t10")
... View more
Labels:
10-02-2018
01:14 PM
Hi, I'm planning to automate some ETL jobs on tables that I have in Hive using pyspark. I've been using Zeppelin with pyspark interpreter (%pyspark) to develop my code, and want to use oozie to automate it. As far as I know, oozie can only automate python scripts (.py files) and not Zeppelin notebooks, is there any way I can convert my existing Zeppelin notebooks into python scripts? Also, I'm not sure if there is a way to use oozie to spark-submit a python script, to take advantage of Spark & Yarn for parellel processing. Thanks!
... View more
Labels:
09-28-2018
09:21 AM
Hi, I'm trying to test the integration of NiFi with Schema Register by ingesting a sample csv file. This csv file contains a few columns of timestamps, but NiFi doesn't seem to be able to ingest the timestamp data. I keep getting thrown an error. I follow the instructions and template from the following link: https://community.hortonworks.com/articles/119766/installing-a-local-hortonworks-registry-to-use-wit.html I've attached pictures of the avro-schema that I'm using, the error message that I get and the nifi-template. Thanks!
... View more
Labels:
08-27-2018
01:39 AM
I've begun exploring Druid since hearing about the Druid + Hive integration. From what I can see, Druid tables offer real-time querying and way quicker pre-aggregation. With that in mind, I'm curious asa to when Hive tables would be used over Druid tables? Maybe when you have to calculate statistics ( average, SD ) ?
... View more
Labels: