Support Questions

Find answers, ask questions, and share your expertise

Join JSON property in array of objects, set it to attribute

avatar
Explorer

I have an avro schema that I can get from a registry:

 

 

{
    "name": "bar_tbl",
    "fields": [
        { "name": "foo1", "type": "string" },
        { "name": "foo2", "type": "string" }
    ]
}

 

 

I know how to use `EvaluateJsonPath` and get `name` and `fields`.

I'd like to set an attribute with `UpdateAttribute` to equal:

 

 

select foo1, foo2 from bar_tbl

 

 

to use it later downstream in another processor.

How can I achieve this?

 

Or, more directly, I have a REST api that takes an url-encoded sql statement.  How can I feed the sql statement to `InvokeHttp`?

1 ACCEPTED SOLUTION

avatar
Explorer

I was able to achieve this with `ExecuteScript` with pyhon.

 

import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

flow_file = session.get()

if flow_file is not None:

    # get the flowfile content as json
    stream_content = session.read(flow_file)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)
    
    # close the stream; without this, `putAttribute` will fail
    stream_content.close()
    
    # get table name and fields
    table_name = json_content["name"]
    fields = json_content["fields"]
    
    columns = ",".join(col["name"] for col in fields)
    
    # format the select statement
    select_statement = "select " + columns + " from " + table_name
    
    # set the select statement as an attr
    flow_file = session.putAttribute(flow_file, "select_statement", select_statement)
    
    # finalize and pass it on
    session.transfer(flow_file, REL_SUCCESS)
    session.commit()

View solution in original post

1 REPLY 1

avatar
Explorer

I was able to achieve this with `ExecuteScript` with pyhon.

 

import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets

flow_file = session.get()

if flow_file is not None:

    # get the flowfile content as json
    stream_content = session.read(flow_file)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)
    
    # close the stream; without this, `putAttribute` will fail
    stream_content.close()
    
    # get table name and fields
    table_name = json_content["name"]
    fields = json_content["fields"]
    
    columns = ",".join(col["name"] for col in fields)
    
    # format the select statement
    select_statement = "select " + columns + " from " + table_name
    
    # set the select statement as an attr
    flow_file = session.putAttribute(flow_file, "select_statement", select_statement)
    
    # finalize and pass it on
    session.transfer(flow_file, REL_SUCCESS)
    session.commit()