Created on 01-06-2021 09:16 PM
Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events. Apache NiFi can do this easily for you with QueryDatabaseTableRecord. you don't need to know anything but the database connection information, table name and what field may change. NiFi will query, watch state and give you new records. Nothing is hardcoded, parameterize those values and you have a generic Any RDBMS to Any Other Store data pipeline. We are reading as records, which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for. This can be ones that NiFi infers the schema or ones we use from a Schema Registry like Cloudera's amazing Open Source Schema Registry.
Let's see what data is in our PostgreSQL table?
As you can see we are looking at the prices table and checking maximum values to increment on the updated_on date and the item_id sequential key. We then output JSON records.
We could then:
PutKudu
PutHDFS - Send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external
PutHive3Streaming - Hive 3 ACID Tables
PutS3
PutAzureDataLakeStorage
PutHBaseRecord
PublishKafkaRecord_2_* - Send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc...
PutBigQueryStreaming - Google
PutCassandraRecord
PutDatabaseRecord - Let's send to another JDBC datastore
PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud
PutElasticSearchRecord
PutMongoRecord
PutSolrRecord
PutRecord - To many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site)
PutParquet - Store to HDFS as Parquet files)
You can do any number or all of these or multiple copies of each to other clouds or clusters. You can also enrichment, transformation, alerts, queries or routing.
These records can be also manipulated ETL/ ELT style with Record processing in stream with options such as:
QueryRecord - Use Calcite ANSI SQL to query and transform records and can also change output type
JoltTransformRecord - Use JOLT against any record not just JSON)
LookupRecord - To match against Lookup services like caches, Kudu, REST services, ML models, HBase and more
PartitionRecord - To break up into like groups
SplitRecord - To break up record groups into records
UpdateRecord - Update values in fields, often paired with LookupRecord
ValidateRecord - Check against a schema and check for extra fields
GeoEnrichIPRecord
ConvertRecord - Change between types like JSON to CSV
When you use PutORC, it will give you the details on building your external table. You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA.
CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT)
STORED AS ORC
LOCATION '/user/tspann/prices'
Let's reverse this now. Sometimes you want to take data, say from a REST service and store it to a JDBC datastore.
InvokeHTTP - read from a REST endpoint)
PutDatabaseRecord - put JSON to our JDBC store)
That's it to store data to a database. We could add some of the ETL/ ELT enrichments mentioned above
or others that manipulate content.
REST Output
Database Connection Pool
Get the REST Data
PutDatabaseRecord
From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data.