1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
836 | 04-03-2024 06:39 AM | |
1614 | 01-12-2024 08:19 AM | |
797 | 12-07-2023 01:49 PM | |
1385 | 08-02-2023 07:30 AM | |
2001 | 03-29-2023 01:22 PM |
01-05-2021
01:10 PM
I only see that configuration parameter in a non-Cloudera schema registry
... View more
01-05-2021
01:05 PM
You can try some of the different settings. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.12.1/org.apache.nifi.csv.CSVReader/index.html You can also try to clean up that text first with some preprocessing JOLT
... View more
01-05-2021
01:01 PM
I would recommend creating a view with that query and have it watch some synthetic column. Or in NiFi use two QueryDatabaseTableRecord processors. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.QueryDatabaseTableRecord/index.html And use QueryDatabaseTableRecord not QueryDatabaseTable.
... View more
01-05-2021
12:52 PM
You generally don't need to run multiple threads for that. Also make sure you size your nodes and cluster appropriately. https://www.datainmotion.dev/2020/07/sizing-your-apache-nifi-cluster-for.html
... View more
12-23-2020
09:18 AM
2 Kudos
Sometimes you need real CDC and you have access to transaction change logs and you use a tool like QLIK REPLICATE or GoldenGate to pump out records to Kafka, and then Flink SQL or NiFi can read them and process them. Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events. Apache NiFi can do this easily for you with QueryDatabaseTableRecord. You don't need to know anything, but the database connection information, table name and what field may change. NiFi will query, watch state and give you new records. Nothing is hardcoded, parameterize those values and you have a generic 'Any RDBMS' to 'Any Other Store' data pipeline. We are reading as records, which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for. This can be ones that NiFi infers the schema or ones we use to form a Schema Registry like Cloudera's amazing Open Source Schema Registry. Let's see what data is in our PostgreSQL table: How To QueryDatabaseTableRecord (we will output JSON records, but could have done Parquet, XML, CSV or AVRO) UpdateAttribute - optional - set a table and schema name, can do with parameters as well. MergeRecord - optional - let's batch these up. PutORC - let's send these records to HDFS (which could be on bare metal disks, GCS, S3, Azure or ADLS). This will build us an external Hive table. PutORC As you can see, we are looking at the "prices" table and checking maximum values to increment on the updated_on date and the item_id sequential key. We then output JSON records. We could then: Add-Ons Examples PutKudu PutHDFS (send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external PutHive3Streaming (Hive 3 ACID Tables) PutS3 PutAzureDataLakeStorage PutHBaseRecord PublishKafkaRecord_2_* - send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc... PutBigQueryStreaming (Google) PutCassandraRecord PutDatabaseRecord - let's send to another JDBC Datastore PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud PutElasticSearchRecord PutMongoRecord PutSolrRecord PutRecord (to many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site) PutParquet (store to HDFS as Parquet files) You can do any number or all of these or multiple copies of each to other clouds or clusters. You can also do enrichment, transformation, alerts, queries or routing. These records can be also manipulated ETL/ELT style with Record processing in stream with options such as: QueryRecord (use Calcite ANSI SQL to query and transform records and can also change output type) JoltTransformRecord (use JOLT against any record not just JSON) LookupRecord (to match against Lookup services like caches, Kudu, REST services, ML models, HBase and more) PartitionRecord (to break up into like groups) SplitRecord (to break up record groups into records) UpdateRecord (update values in fields, often paired with LookupRecord) ValidateRecord (check against a schema and check for extra fields) GeoEnrichIPRecord ConvertRecord (change between types like JSON to CSV) When you use PutORC, it will give you the details on building your external table. You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA. CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT) STORED AS ORC LOCATION '/user/tspann/prices' Part 2 REST to Database Let's reverse this now. Sometimes you want to take data, say from a REST service and store it to a JDBC datastore. InvokeHTTP (read from a REST endpoint) PutDatabaseRecord (put JSON to our JDBC store). That's it to store data to a database. We could add some of the ETL/ ELT enrichments mentioned above or others that manipulate content. REST Output Database Connection Pool Get the REST Data PutDatabaseRecord From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data. Incrementally Streaming Slides Resources https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_15.html https://github.com/tspannhw/EverythingApacheNiFi/blob/main/README.md https://www.youtube.com/watch?v=XsL63ZQYmLE https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-1-of-3/ta-p/246623 https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-cdc-mysql-nar/1.5.0/org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL/ https://dzone.com/articles/change-data-capture-using-apache-nifi https://www.linkedin.com/pulse/building-near-real-time-big-data-lake-part-2-boris-tyukin/ https://www.qlik.com/us/data-management/nifi https://www.cloudera.com/tutorials/cdp-importing-rdbms-data-into-hive.html https://www.cdata.com/kb/tech/oracledb-jdbc-apache-nifi.rst https://nathanlabadie.com/apache-nifi-ms-sql-and-kerberos-authentication/ https://dzone.com/articles/lets-build-a-simple-ingest-to-cloud-data-warehouse https://github.com/tspannhw/EverythingApacheNiFi#etl--elt--cdc--load--ingest https://www.linkedin.com/pulse/2020-streaming-edge-ai-events-tim-spann/ https://github.com/tspannhw/ApacheConAtHome2020
... View more
Labels:
12-22-2020
09:16 AM
1 Kudo
Smart Stocks with FLaNK (NiFi, Kafka, Flink SQL) I would like to track stocks from IBM and Cloudera frequently during the day using Apache NiFi to read the REST API. After that I have some Streaming Analytics to perform with Apache Flink SQL and I also want permanent fast storage in Apache Kudu queried with Apache Impala. Let's build that application cloud native in seconds in AWS or Azure. Source Code: https://github.com/tspannhw/SmartStocks To script Loading Schemas, Tables, Alerts, see scripts/setup.sh: Source Code: https://github.com/tspannhw/ApacheConAtHome2020 Kafka Topic Kafka Schema Kudu Table Flink Prep Flink SQL Client Run Flink SQL Client Configuration Once our automated admin has built our Cloud environment and populated it with the goodness of our app, we can bring out continuous SQL. If you know your data, build a schema, share to the registry One unique thing we added was a default value in our Avro schema and making it a logicalType for timestamp-millis. This is helpful for Flink SQL timestamp related queries. { "name" : "dt", "type" : ["long"], "default": 1, "logicalType": "timestamp-millis"} You can see the entire schema here: https://raw.githubusercontent.com/tspannhw/SmartStocks/main/stocks.avsc We will also want a topic for Stock Alerts that we will create later with Flink SQL., so let's define a schema for that as well. https://raw.githubusercontent.com/tspannhw/SmartStocks/main/stockalerts.avsc For our data today, we will use AVRO data with AVRO schemas for use inside Kafka topics and whoever will consume it. How to Build a Smart Stock DataFlow in X Easy Steps Retrieve data from source (example: InvokeHTTP against SSL REST Feed - say TwelveData) with a schedule Set a Schema Name (UpdateAttribute) ForkRecord: We use this to split out records from the header (/values) using RecordPath syntax. QueryRecord: Convert type and manipulate data with SQL. We aren't doing anything in this one, but this is an option to change fields, add fields, etc. UpdateRecord: This first one I am setting some fields in the record from attributes and adding a current timestamp. I also reformat by timestamp for conversion. UpdateRecord: I am making dt make numeric UNIX timestamp. UpdateRecord: I am making datetime my formatted String date time. (LookupRecord): I don't have this step yet as I don't have an internal record for this company in my Real-Time Data Mart. Apache Kudu architecture in a CDP public cloud deployment. I will probably add this step to augment or check my data. (ValidateRecord): For less reliable data sources, I may want to validate my data against our schema, otherwise we will get warnings or errors. PublishKafkaRecord_2_0: Convert from JSON to AVRO, send to our Kafka topic with headers, including reference to the correct schema stocks and it's version 1.0. Now that we are streaming our data to Kafka topics, we can utilize it in Flink SQL Continuous SQL applications, NiFi applications, Spark 3 applications and more. So in this case, CFM NiFi is our Producer and we will have CFM NiFi and CSA Flink SQL as Kafka Consumers. We can see what our data looks like in the new cleaned up format with all the fields we need. Viewing, Monitoring, Checking and Alerting On Our Streaming Data in Kafka Cloudera Streams Messaging Manager solves all of these difficult problems from one easy to use pre-integrated UI. It is pre-wired into my Kafka Datahubs and secured with SDX. I can see my AVRO data with associated stocks schema in the topic, ready to be consumed. I can then monitor who is consuming how much, and if there is a lag or latency. How to Store Our Streaming Data to Our Real-Time DataMart in the Cloud Consume stocks AVRO data with stocks schema then write to our Real-Time Data Mart in Cloudera Data Platform powered by Apache Impala and Apache Kudu. If something failed or could not connect, let's retry 3 times. We use a parameter for our 3+ Kafka brokers with port. We could also have parameters for topic names and consumer name. We read from stocks table, which uses stocks schema that is referenced in Kafka header automatically ready by NiFi. When we sent message to Kafka, Nifi passed on our schema name through schema.name attribute in NiFi. As we can see, it was schema attached Avro, so we use that Reader and convert to simple JSON with that schema. Writing to our Cloud Native Real-Time Data Mart could not be simpler, we reference the table stocks we have created and have permissions to and use the JSON reader. I like UPSERT since it handles INSERT AND UPDATE. First we need to create our Kudu table in either Apache Hue from CDP or from the command line scripted. Example: impala-shell -i edge2ai-1.dim.local -d default -f /opt/demo/sql/kudu.sql CREATE TABLE stocks ( uuid STRING, `datetime` STRING, `symbol` STRING, `open` STRING, `close` STRING, `high` STRING, `volume` STRING, `ts` TIMESTAMP, `dt` TIMESTAMP, `low` STRING, PRIMARY KEY (uuid,`datetime`) ) PARTITION BY HASH PARTITIONS 4 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); Using Apache Hue integrated in CDP, I can examine my Real-Time Data Mart table and then query my table. My data is now ready for reports, dashboards, applications, notebooks, web applications, mobile apps and machine learning. I can now spin up a Cloudera Visual Application on this table in a few seconds. Now we can build our streaming analytics application in Flink. How to Build a Smart Stock Streaming Analytics in X Easy Steps I can connect to Flink SQL from the command line Flink SQL Client to start exploring my Kafka and Kudu data, create temporary tables and launch some applications (Insert statements). The environment lets me see all the different catalogs available, including registry (Cloudera Cloud Schema Registry), Hive (Cloud Native Database table) and Kudu (Cloudera Real-Time Cloud Data Mart) tables. Run Flink SQL Client It's a two step process; first setup a YARN session. You may need to add your Kerberos credentials. flink-yarn-session -tm 2048 -s 2 -d Then launch the command line SQL Client. flink-sql-client embedded -e sql-env.yaml Refer to: SQL Client configuration SQL Client security Run Flink SQL Cross Catalog Query to Stocks Kafka Topic select * from registry.default_database.stocks; Cross Catalog Query to Stocks Kudu/Impala Table select * from kudu.default_database.impala::default.stocks; Default Catalog use catalog default_catalog; CREATE TABLE stockEvents ( symbol STRING, uuid STRING, ts BIGINT, dt BIGINT, datetime STRING, open STRING, close STRING, high STRING, volume STRING, low STRING, event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'stocks', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.bootstrap.servers' = 'edge2ai-1.dim.local:9092', 'format.type' = 'registry', 'format.registry.properties.schema.registry.url' = 'http://edge2ai-1.dim.local:7788/api/v1' ); show tables; Flink SQL> describe stockEvents; root |-- symbol: STRING |-- uuid: STRING |-- ts: BIGINT |-- dt: BIGINT |-- datetime: STRING |-- open: STRING |-- close: STRING |-- high: STRING |-- volume: STRING |-- low: STRING |-- event_time: TIMESTAMP(3) AS CAST(FROM_UNIXTIME(FLOOR(ts / 1000)) AS TIMESTAMP(3)) |-- WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND We added a watermark and event time pulled from our timestamp. Simple Select All Query select * from default_catalog.default_database.stockEvents; We can do some interesting queries against this table we created. Tumbling Window SELECT symbol, TUMBLE_START(event_time, INTERVAL '1' MINUTE) as tumbleStart, TUMBLE_END(event_time, INTERVAL '1' MINUTE) as tumbleEnd, AVG(CAST(high as DOUBLE)) as avgHigh FROM stockEvents WHERE symbol is not null GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), symbol; Top 3 SELECT * FROM ( SELECT * , ROW_NUMBER() OVER ( PARTITION BY window_start ORDER BY num_stocks desc ) AS rownum FROM ( SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start, symbol, COUNT(*) AS num_stocks FROM stockEvents GROUP BY symbol, TUMBLE(event_time, INTERVAL '10' MINUTE) ) ) WHERE rownum <=3; Stock Alerts INSERT INTO stockalerts /*+ OPTIONS('sink.partitioner'='round-robin') */ SELECT CAST(symbol as STRING) symbol, CAST(uuid as STRING) uuid, ts, dt, open, close, high, volume, low, datetime, 'new-high' message, 'nh' alertcode, CAST(CURRENT_TIMESTAMP AS BIGINT) alerttime FROM stocks st WHERE symbol is not null AND symbol <> 'null' AND trim(symbol) <> '' AND CAST(close as DOUBLE) > 11; Monitoring Flink Jobs Using the CSA Flink Global Dashboard, I can see all my Flink jobs running, including SQL Client jobs, disconnected Flink SQL inserts and deployed Flink applications. We can also see the data populated in the stockalerts topic. We can run a Flink SQL, Spark 3, NiFi or other applications against this data to handle alerts. That may be the next application - I may send those alerts to Iphone messages, Slack messages, a database table and a websockets app. Data Lineage and Governance We all know that NiFi has deep data lineage that can be pushed or pulled using REST, Reporting Tasks or CLI to use in audits, metrics and tracking. If I want all the governance data for my entire streaming pipeline, I will use Apache Atlas that is prewired as part of SDX in my Cloud Data Platform. References https://github.com/cloudera/flink-tutorials/tree/master/flink-sql-tutorial https://github.com/tspannhw/FlinkSQLWithCatalogsDemo https://github.com/tspannhw/ClouderaFlinkSQLForPartners/blob/main/README.md https://github.com/tspannhw/ApacheConAtHome2020/tree/main/scripts https://github.com/tspannhw/SmartWeather https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html https://www.datainmotion.dev/2020/11/flank-smart-weather-applications-with.html https://www.datainmotion.dev/2020/11/flank-smart-weather-websocket.html
... View more
Labels:
12-15-2020
08:15 AM
it may be guessing the schema wrong, that is my thought. it may not be seeing those seperate lines as separate json files. try a schema with just those 3 fields you want
... View more
12-15-2020
07:26 AM
mergerecord to PutOrc is fast PutDatabaseRecord to Hive JDBC can be fast are you using an upsert? what version of NiFi? Hive? CDP? https://github.com/tspannhw/ClouderaPublicCloudCDFWorkshop https://www.datainmotion.dev/2020/04/streaming-data-with-cloudera-data-flow.html https://community.cloudera.com/t5/Support-Questions/hive-table-loading-in-NIFI-extremely-slow/td-p/191613
... View more
12-15-2020
07:19 AM
it may be an adjustment of the jolttransformrecord what settings do you have? what reader are you using do you have a schema
... View more
12-11-2020
07:30 AM
You can make a new attribute with UpdateAttribute that merges those two together
... View more