1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
789 | 04-03-2024 06:39 AM | |
1528 | 01-12-2024 08:19 AM | |
782 | 12-07-2023 01:49 PM | |
1343 | 08-02-2023 07:30 AM | |
1947 | 03-29-2023 01:22 PM |
01-11-2021
07:11 AM
https://www.datainmotion.dev/2020/12/simple-change-data-capture-cdc-with-sql.html https://www.datainmotion.dev/2020/07/ingesting-all-weather-data-with-apache.html https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_15.html
... View more
01-08-2021
09:42 AM
have you tried to upgrade? is that latest file completed or in process. Does it have read permissions? Make sure you schedule your cron to after it is completed.
... View more
01-06-2021
09:16 PM
Sometimes you need real CDC and you have access to transaction change logs and you use a tool like QLIK REPLICATE or GoldenGate to pump out records to Kafka and then Flink SQL or NiFi can read them and process them. Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events. Apache NiFi can do this easily for you with QueryDatabaseTableRecord. you don't need to know anything but the database connection information, table name and what field may change. NiFi will query, watch state and give you new records. Nothing is hardcoded, parameterize those values and you have a generic Any RDBMS to Any Other Store data pipeline. We are reading as records, which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for. This can be ones that NiFi infers the schema or ones we use from a Schema Registry like Cloudera's amazing Open Source Schema Registry. Let's see what data is in our PostgreSQL table? How To QueryDatabaseTableRecord -We will output Json records, but could have done Parquet, XML, CSV or AVRO) UpdateAttribute (optional) - Set a table and schema name, can do with parameters as well. MergeRecord (optional) - Let's batch these up. PutORC - Let's send these records to HDFS (which could be on bare metal disks, GCS, S3, Azure or ADLS). This will build us an external Hive table. PutORC As you can see we are looking at the prices table and checking maximum values to increment on the updated_on date and the item_id sequential key. We then output JSON records. We could then: Add-Ons Examples PutKudu PutHDFS - Send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external PutHive3Streaming - Hive 3 ACID Tables PutS3 PutAzureDataLakeStorage PutHBaseRecord PublishKafkaRecord_2_* - Send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc... PutBigQueryStreaming - Google PutCassandraRecord PutDatabaseRecord - Let's send to another JDBC datastore PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud PutElasticSearchRecord PutMongoRecord PutSolrRecord PutRecord - To many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site) PutParquet - Store to HDFS as Parquet files) You can do any number or all of these or multiple copies of each to other clouds or clusters. You can also enrichment, transformation, alerts, queries or routing. These records can be also manipulated ETL/ ELT style with Record processing in stream with options such as: QueryRecord - Use Calcite ANSI SQL to query and transform records and can also change output type JoltTransformRecord - Use JOLT against any record not just JSON) LookupRecord - To match against Lookup services like caches, Kudu, REST services, ML models, HBase and more PartitionRecord - To break up into like groups SplitRecord - To break up record groups into records UpdateRecord - Update values in fields, often paired with LookupRecord ValidateRecord - Check against a schema and check for extra fields GeoEnrichIPRecord ConvertRecord - Change between types like JSON to CSV When you use PutORC, it will give you the details on building your external table. You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA. CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT)
STORED AS ORC
LOCATION '/user/tspann/prices' Part 2 REST to Database Let's reverse this now. Sometimes you want to take data, say from a REST service and store it to a JDBC datastore. InvokeHTTP - read from a REST endpoint) PutDatabaseRecord - put JSON to our JDBC store) That's it to store data to a database. We could add some of the ETL/ ELT enrichments mentioned above or others that manipulate content. REST Output Database Connection Pool Get the REST Data PutDatabaseRecord From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data. Incrementally Streaming Slides Resources https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_15.html https://github.com/tspannhw/EverythingApacheNiFi/blob/main/README.md https://www.youtube.com/watch?v=XsL63ZQYmLE https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-1-of-3/ta-p/246623 https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-cdc-mysql-nar/1.5.0/org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL/ https://dzone.com/articles/change-data-capture-using-apache-nifi https://www.linkedin.com/pulse/building-near-real-time-big-data-lake-part-2-boris-tyukin/ https://www.qlik.com/us/data-management/nifi https://www.cloudera.com/tutorials/cdp-importing-rdbms-data-into-hive.html https://www.cdata.com/kb/tech/oracledb-jdbc-apache-nifi.rst https://nathanlabadie.com/apache-nifi-ms-sql-and-kerberos-authentication/ https://dzone.com/articles/lets-build-a-simple-ingest-to-cloud-data-warehouse https://github.com/tspannhw/EverythingApacheNiFi#etl--elt--cdc--load--ingest https://www.linkedin.com/pulse/2020-streaming-edge-ai-events-tim-spann/ https://github.com/tspannhw/ApacheConAtHome2020
... View more
Labels:
01-06-2021
06:16 AM
Cloudera Schema Registry is not the same as Confluent Schema Registry they have some differences and that seems to be one. The Cloudera one will be connected to the Hive, Spark and Flink metastores and will eliminate the need for such things. Cloudera's SR does not register schemas automatically at this time.
... View more
01-05-2021
01:44 PM
You may want to look at Flink SQL for joining and lookups Also look at nifi lookuprecord https://www.datainmotion.dev/2019/03/properties-file-lookup-augmentation-of.html https://www.datainmotion.dev/2020/12/simple-change-data-capture-cdc-with-sql.html lookuprecord to sql https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-lookup-services-nar/1.12.1/org.apache.nifi.lookup.db.DatabaseRecordLookupService/index.html https://www.datainmotion.dev/2020/10/top-25-use-cases-of-cloudera-flow.html
... View more
01-05-2021
01:41 PM
https://www.progress.com/tutorials/jdbc/ingest-salesforce-data-incrementally-into-hive-using-apache-nifi https://www.datainmotion.dev/2019/11/ingest-salesforce-data-into-hive-using.html
... View more
01-05-2021
01:40 PM
Thoughts maybe queryrecord maybe partitionrecord get one row at a time perhaps JoltRecordprocessing see these articles https://www.datainmotion.dev/2020/12/smart-stocks-with-flank-nifi-kafka.html https://www.datainmotion.dev/2020/12/simple-change-data-capture-cdc-with-sql.html https://www.datainmotion.dev/2020/12/ingesting-websocket-data-for-live-stock.html
... View more
01-05-2021
01:34 PM
ARRAYs are a bit tricky. But JSONReader and Writer may work better.
... View more
01-05-2021
01:31 PM
check for errors in the logs. stop the processor and make sure it's really stopped. check the offsets in storagte perhaps the offsets in storage are corrupt or wrong or updated by someone/something else https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-azure-nar/1.12.1/org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub/index.html
... View more
01-05-2021
01:28 PM
executesqlrecord or putdatabaserecord may be easier.
... View more