Member since
06-06-2016
13
Posts
4
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1166 | 06-23-2016 06:26 PM |
03-16-2017
10:13 PM
1 Kudo
Is it possible to append to a destination file when using writestream in Spark 2. Example:
I've got a Kafka topic and a stream running and consuming data as it is written to the topic. I want to perform some transformations and append to an existing csv file (this can be local for now, but eventually I'd want this to be on hdfs).
query = csv_select \
.writeStream \
.format("csv") \
.option("format", "append") \
.option("path", "/destination_path/") \
.option("checkpointLocation", "/checkpoint_path") \
.outputMode("append") \
.start()
This is the code I'm using and when i produce new data to the topic, it outputs as a new csv file in /destination_path. Any advice on how to append to an existing file rather than create a new one every time?
NOTE: I'm using csv for simplicity here, but if you need to show an example in parquet or some other format that would be helpful too. EDIT: The example I'm working from is here: https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html
... View more
Labels:
- Labels:
-
Apache Spark
08-09-2016
11:00 PM
Hi all, I've got a Postgres table with columns, val text, created_at timestamp, updated_at timestamp. I've got a QueryDatabaseTable running on this table using updated_at as the "Maximum-value Columns" value. It's returning some values over and over again as the precision on the timestamp query is being stored as timestamp(3) whereas the precision in the pg table is 6. I've checked the query it was running and it turned out something like: select*from test where updated_at > '2016-08-09 15:52:14.731'; when the updated_at timestamp value in the table is '2016-08-09 15:52:14.731234', meaning the data gets returned over and over again. Is there a work-around for this? I'm not sure if it's a limitation of the JDBC driver or with the way NiFi is storing the max value each time.
... View more
Labels:
- Labels:
-
Apache NiFi
07-19-2016
09:58 PM
Great article. One thing I've found with the InferSchemaWithAvro and working with CSVs is that it only seems to infer the type based on the first line after the header. The "Number Of Records To Analyze" property specifically states that it's only to be used with JSON content type. I'm having some difficulty right now thinking on a way to work around this. I've got a CSV column with "12345" on the first line and "12345.45" on the second line and it's inferring the type as LONG when I want it to be DOUBLE. EDIT: It also doesn't work completely as expected with negative values. (STRING instead of numeric)
... View more
07-13-2016
03:21 PM
1 Kudo
I have a kafka topic that contains a series of small (<1KB) messages and want to set up a consumer in NiFi to pull this data through and write to HDFS. I want to do minimal transformation on the data in NiFi. I want to avoid the "small files problem" I've read so much about and I'm trying to come up with the best method of pushing the messages through to HDFS. Documented here I've written a small template in NiFi that does it but it doesn't seem optimal. Basically I have a GetKafka consumer writing to a MergeContent processor with "Minimum Group Size" set to hold the data until it reaches a certain size. The two problems I see with this are: 1. Latest Data is stuck in limbo until certain size is reached 2. I'm thinking the higher the better for the Minimum Group Size property, but the higher it is, the longer the data is stuck in limbo. But the smaller it is, the less optimal the structure will be in HDFS. The other way I was playing around with is instead of holding the data, I can write the messages to files locally on my nifi instance and have a separate ExecuteProcess processor constantly running and appending the files to a single file in HDFS. Any help is much appreciated!! Thank you.
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Kafka
-
Apache NiFi
06-23-2016
06:26 PM
1 Kudo
I've had the same problem when pulling data from an execute sql processor. Not specifically the solution you were asking for, but there is this processor which looks just about ready to be merged. Maybe you could pull this down and merge it into your own nifi instance. https://issues.apache.org/jira/browse/NIFI-1583
... View more
06-06-2016
10:10 PM
Thanks a lot. I never noticed the "Prepend" setting in ReplaceText before but this worked a treat! Appreciate it!
... View more
06-06-2016
09:57 PM
Thanks Pierre, I'll try out the ReplaceText processor and let you know. Appreciate the quick reply!
... View more
06-06-2016
09:48 PM
1 Kudo
Hi all, I've currently got a simple processor set up to consume data from a Kafka topic and write it unmodified to hdfs. The way it works right now is that one message makes up one line of text in a file. I want to append, to each message, a timestamp of when the message is processed in nifi before running the PutHDFS processor. Allowing me to have on hand, in hdfs, the date and time it was processed by nifi. I'm unsure of the best way to do it. I've currently got a MergeContent processor set up and I think I could do some funky stuff with the demarcator in there but it feels like there should be a more elegant way to do so. Thanks in advance, any help would be appreciated.
... View more
Labels:
- Labels:
-
Apache NiFi