Member since
02-24-2018
20
Posts
10
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2533 | 04-03-2018 07:29 PM |
04-03-2018
07:29 PM
2 Kudos
I resolved this by using an ExecuteStreamCommand processor which calls a python script which converts a JSON line in to it's respective SQL insert statement. The table of interest in this case is `reddit_post`. Code of the python script (I'm aware that there is no need for the `INSERT` argument, but this is there because I plan on adding an `UPDATE` option later on): import json
import argparse
import sys
# For command line arguments
parser = argparse.ArgumentParser(description='Converts JSON to respective SQL statement')
parser.add_argument('statement_type', type=str, nargs=1)
parser.add_argument('table_name', type=str, nargs=1)
# Reading the command line arguments
statement_type = parser.parse_args().statement_type[0]
table_name = parser.parse_args().table_name[0]
# Initialize SQL statement
statement = ''
for line in sys.stdin:
# Load JSON line
lineJSON = json.loads(line)
# Add table name and SQL syntax
if statement_type == 'INSERT':
statement += 'INSERT INTO {} '.format(table_name)
# Add table parameters and SQL syntax
statement += '({}) '.format(', '.join(lineJSON.keys()))
# Add table values and SQL syntax
# Note that strings are formatted with single quotes, other data types are converted to strings (for the join method)
statement += "VALUES ({});".format(', '.join("'{0}'".format(value) if type(value) == str else str(value) for value in lineJSON.values()))
# Send statement to stdout
print(statement)
Configuration of ExecuteStreamCommand (Note that Argument Delimeter is set to a single space) Flow snippet: I hope this can help someone that came across a similar issue. If you have any advice on how to improve the script, flow, or anything else please don't hesitate to let me know!
... View more
04-02-2018
12:15 AM
1 Kudo
Hi Hortonworks! I'm facing issues with the insertion of timestamps in to a PostgreSQL database using PutSQL. More specifically when trying to insert a date in the format '2018-01-31T19:01:09+00:00' in to a timestamptz column I get the following error message: 2018-04-01 19:29:40,091 ERROR [Timer-Driven Process Thread-5] o.apache.nifi.processors.standard.PutSQL PutSQL[id=7997503a-0162-1000-ee81-a0361cad5e0c] Failed to update database for StandardFlowFileRecord[uuid=d02e8b39-e564-4c37-a08a-dab8931e9890,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522615075930-15, container=default, section=15], offset=11492, length=163],offset=0,name=32836401373126,size=163] due to java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp; routing to failure: java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:711)
at org.apache.nifi.processors.standard.PutSQL.lambda$null$5(PutSQL.java:313)
at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
at org.apache.nifi.processors.standard.PutSQL.lambda$new$6(PutSQL.java:311)
at org.apache.nifi.processors.standard.PutSQL.lambda$new$9(PutSQL.java:354)
at org.apache.nifi.processor.util.pattern.PutGroup.putFlowFiles(PutGroup.java:91)
at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:101)
at org.apache.nifi.processors.standard.PutSQL.lambda$onTrigger$20(PutSQL.java:574)
at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
at org.apache.nifi.processors.standard.PutSQL.onTrigger(PutSQL.java:574)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable date: "2018-01-31T20:19:35+00:00"
at java.text.DateFormat.parse(DateFormat.java:366)
at org.apache.nifi.processors.standard.PutSQL.setParameter(PutSQL.java:911)
at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:707)
... 21 common frames omitted
I have tested the insertion of '2018-01-31T19:01:09+00:00' in to the timestamptz column from the command line and it works perfectly. I have tried various alternative formats such as:
'2018-01-31 19:01:09+00:00' '2018-01-31 19:01:09+00' '2018-01-31T19:01:09+00' '2018-01-31 19:01:09' They all fail with the same error in Nifi, even though they are all inserted just fine when performing the INSERT from the command line. Please find a screenshot of my flow attached. Let me know if you need any more details. I have come across various questions regarding this topic, but I still don't get what is going on. Most notably:
https://stackoverflow.com/questions/42352064/putsql-date-format-error?rq=1 https://stackoverflow.com/questions/45685703/nifi-putsql-timestamp-datetime-error-cannot-be-converted-error/45701012 https://stackoverflow.com/questions/11133759/0000-00-00-000000-can-not-be-represented-as-java-sql-timestamp-error https://stackoverflow.com/questions/46702722/trying-to-solve-remove-the-timestamp-format-conversion-error-by-using-updateatt?noredirect=1&lq=1
... View more
Labels:
- Labels:
-
Apache NiFi
04-01-2018
06:57 PM
It did indeed help. I found the following StackOverflow answer to help too: https://stackoverflow.com/questions/49467969/python-script-using-executestreamcommand Especially: Command Arguments: any flags or args, delimited by ; (i.e. /path/to/my_script.py)
Command Path: /path/to/python3 Note the Command Path that you did not specify in the processor. This also allows the use of for example a predefined Anaconda environment. Anyhow, thank you for the help!
... View more
03-29-2018
07:21 PM
But by deleting the templates you delete them from the "templates menu" as well. Aren't templates and process groups independent? I can add a template and then wrap it in a process group. Why isn't it enough to remove the particular instance of that template?
... View more
03-29-2018
06:37 PM
Ah I see. Thank you! What is the reasoning behind this (feel free to just point me in the right direction if it's elaborate)?
... View more
03-29-2018
06:18 PM
I'm a little confused as to why I can't delete an empty process group. I had wrapped a template in a process group but I want to remove it from my "Nifi screen". I'm not sure what to do about the displayed error (attached screenshot) as I have already deleted all components that were inside of the process group. It's also not connected to anything else. Is there no "force delete" or something of that sort? Thanks in advance.
... View more
Labels:
- Labels:
-
Apache NiFi
03-24-2018
11:56 PM
1 Kudo
Hi @Alexander Polson, did you end up finding answers to your questions?
... View more
03-24-2018
11:50 PM
@Matt Burgess Is there a way of either adding non-pure Python modules or otherwise force the script to run under a predefined environment (e.g. a local anaconda environment)?
... View more
03-24-2018
10:42 PM
1 Kudo
@Rahul Soni Thank you for your reply. The reason that I want to use ExecuteStreamCommand instead of ExecuteScript is because Jython is not an option for me. I am running a wide array of python commands and they need to be executed under a particular Anaconda environment. I can't seem to find any solid examples of ExecuteStreamCommand, would you mind providing an example or pointing me in the right direction? Edit: just to add, ExecuteProcess is also not an option as it does not allow for an incoming FlowFile.
... View more
03-24-2018
05:51 PM
1 Kudo
Albeit obvious, the processor should have the following properties:
Calls a python script Able to supply the FlowFile in to the python script Read the FlowFile from within the python script Update either the original FlowFile or create a new FlowFile from within the python script Output the updated/new FlowFile back in to Nifi Original question (without any responses) Any pointers/advice/help is appreciated
... View more
Labels:
- Labels:
-
Apache NiFi