Created 02-15-2023 01:54 PM
I have an avro schema that I can get from a registry:
{
"name": "bar_tbl",
"fields": [
{ "name": "foo1", "type": "string" },
{ "name": "foo2", "type": "string" }
]
}
I know how to use `EvaluateJsonPath` and get `name` and `fields`.
I'd like to set an attribute with `UpdateAttribute` to equal:
select foo1, foo2 from bar_tbl
to use it later downstream in another processor.
How can I achieve this?
Or, more directly, I have a REST api that takes an url-encoded sql statement. How can I feed the sql statement to `InvokeHttp`?
Created 02-16-2023 06:38 AM
I was able to achieve this with `ExecuteScript` with pyhon.
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
flow_file = session.get()
if flow_file is not None:
# get the flowfile content as json
stream_content = session.read(flow_file)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
# close the stream; without this, `putAttribute` will fail
stream_content.close()
# get table name and fields
table_name = json_content["name"]
fields = json_content["fields"]
columns = ",".join(col["name"] for col in fields)
# format the select statement
select_statement = "select " + columns + " from " + table_name
# set the select statement as an attr
flow_file = session.putAttribute(flow_file, "select_statement", select_statement)
# finalize and pass it on
session.transfer(flow_file, REL_SUCCESS)
session.commit()
Created 02-16-2023 06:38 AM
I was able to achieve this with `ExecuteScript` with pyhon.
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
flow_file = session.get()
if flow_file is not None:
# get the flowfile content as json
stream_content = session.read(flow_file)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
# close the stream; without this, `putAttribute` will fail
stream_content.close()
# get table name and fields
table_name = json_content["name"]
fields = json_content["fields"]
columns = ",".join(col["name"] for col in fields)
# format the select statement
select_statement = "select " + columns + " from " + table_name
# set the select statement as an attr
flow_file = session.putAttribute(flow_file, "select_statement", select_statement)
# finalize and pass it on
session.transfer(flow_file, REL_SUCCESS)
session.commit()