Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi - Postgres SQL insert into table as select ?

avatar

Hi,

how can we implement the following Postgres SQL Statement in Nifi?:

insert into kum.checkmk_hosts (

ctime,

host_state,

host,

host_icons,

num_services_ok,

num_services_warn,

num_services_unknown,

num_services_crit,

num_services_pending

)

select

ctime::timestamp,

host_state,

host,

host_icons,

num_services_ok,

num_services_warn,

num_services_unknown,

num_services_crit,

num_services_pending

from kum.ext_checkmk_hosts

where date(ctime) >= current_date;

3 REPLIES 3

avatar
Master Guru

Can you provide some more details about your use case? It appears that you'd like to do the above except that the "ext_chkmk_hosts" table is not on the same Postgres instance as checkmk_hosts, meaning you can't just run that SQL statement as-is.

If my above assumption is correct, then you should be able to use QueryDatabaseTable for the "select" part of your statement above; after being configured with a database connection pool, table name, max-value column (which in your case is ctime), etc. then when it executes, it will get all the rows from ext_checkmk_hosts whose ctime value is greater than the last time the processor ran.

QueryDatabaseTable outputs its records in Avro format. Prior to NiFi 1.2.0, you can follow that with ConvertAvroToJSON then ConvertJSONToSQL, then PutSQL. This flow would perform one INSERT for each record in the source table, so you will find it less performant than what an INSERT INTO ... SELECT statement would normally be.

As of NiFi 1.2.0, you can connect your QueryDatabaseTable to a PutDatabaseRecord processor, using an AvroReader configured with a schema specifying the field names and their datatypes. One possible schema (based on a glance at your field names) might be:

{
  "type": "record",
  "name": "ServiceStatusRecord",
  "fields" : [
    {"name": "ctime", "type": "long", logicalType: "timestamp-millis"},
    {"name": "host_state", "type": ["null", "string"]},
    {"name": "host", "type": ["null", "string"]},
    {"name": "host_icons", "type": ["null", "string"]},
    {"name": "num_services_ok", "type": "int"},
    {"name": "num_services_warn", "type": "int"},
    {"name": "num_services_unknown", "type": "int"},
    {"name": "num_services_crit", "type": "int"},
    {"name": "num_services_pending", "type": "int"}
  ]
}

You would point PutDatabaseRecord at your target database, setting the table name to checkmk_hosts, etc.

avatar

We access Json HDFS files as an external Table in HAWQ (external table:ext_chkmk_hosts). The "insert into table check_host ... select ..." loads the new data into a HAWQ table (check_hosts) in the same database.

We want to schedule all the jobs with Nifi. 1. Load Data from System (InvokeHTTP) 2. Convert the JSON output (JoltTransformJSON) 3. add key:value (timestamp) (JoltTransformJSON) 4. store the data in HDFS (PutHDFS) 5. Access all HDFS JSON file in HAWQ (external table) 6. insert new data into HAWQ Table (insert into check_hosts ... select ... from ext_checkmk_hosts.

avatar
Master Guru

Are you using the JDBC Driver for HAWQ? If so, you should be able to use PutSQL to execute your INSERT...SELECT statement as-is. If you have tried this, what error(s) are you getting or did you have any trouble configuring things?