1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2464 | 04-03-2024 06:39 AM | |
| 3811 | 01-12-2024 08:19 AM | |
| 2055 | 12-07-2023 01:49 PM | |
| 3042 | 08-02-2023 07:30 AM | |
| 4177 | 03-29-2023 01:22 PM |
01-26-2021
11:40 AM
Automating Starting Services in Apache NiFi and Applying Parameters Automate all the things! You can call these commands interactively or script all of them with awesome DevOps tools. @Andre Araujo and @dchaffey can tell you more about that. Enable All NiFi Services on the Canvas By running this three times, I get any stubborn ones or ones that needed something previously running. This could be put into a loop; check the status before trying again. nifi pg-list
nifi pg-status
nifi pg-get-services The NiFi CLI has interactive help available and also some good documentation: NiFi CLI Toolkit Guide /opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-enable-services -u http://edge2ai-1.dim.local:8080 --processGroupId root
/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-enable-services -u http://edge2ai-1.dim.local:8080 --processGroupId root
/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-enable-services -u http://edge2ai-1.dim.local:8080 --processGroupId root We could then start a process group if we wanted: nifi pg-start -u http://edge2ai-1.dim.local:8080 -pgid 2c1860b3-7f21-36f4-a0b8-b415c652fc62 List all process groups /opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-list -u http://edge2ai-1.dim.local:8080 List Parameters /opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi list-param-contexts -u http://edge2ai-1.dim.local:8080 -verbose Set parameters to set parameter context for a process group; you can loop to do all. pgid => parameter group id pcid => parameter context id I need to put this in a shell or Python script: /opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-set-param-context -u http://edge2ai-1.dim.local:8080 -verbose -pgid 2c1860b3-7f21-36f4-a0b8-b415c652fc62 -pcid 39f0f296-0177-1000-ffff-ffffdccb6d90 Example setupnifi.sh (Github Link) You could also use the NiFi REST API or Dan's awesome Python API NiPyApi: A Python Client SDK for Apache NiFi References DevOps: Working with Parameter Contexts NiFi Toolkit CLI No More Spaghetti Flows Report on this Apache NiFi Everything Apache Nifi Cloudera Data Platform - Using Apache NiFi REST API in the Public Cloud Using NiFi CLI to Restore NiFi Flows From Backups Automating the Building, Migration, Backup, Restore and Testing of Streaming Applications Apache NiFi Toolkit Guide An overview of Apache NiFi and Toolkit CLI deployments Automate workflow deployment in Apache NiFi with the NiFi Registry DevOps for Apache NiFi 1.7 and More
... View more
Labels:
01-21-2021
07:08 AM
Hi timothy, will you please help me with this question
... View more
01-13-2021
07:10 AM
Here are a few examples of moving Flume flows to NiFi. https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html
... View more
01-11-2021
07:11 AM
https://www.datainmotion.dev/2020/12/simple-change-data-capture-cdc-with-sql.html https://www.datainmotion.dev/2020/07/ingesting-all-weather-data-with-apache.html https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_15.html
... View more
01-06-2021
09:16 PM
Sometimes you need real CDC and you have access to transaction change logs and you use a tool like QLIK REPLICATE or GoldenGate to pump out records to Kafka and then Flink SQL or NiFi can read them and process them. Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events. Apache NiFi can do this easily for you with QueryDatabaseTableRecord. you don't need to know anything but the database connection information, table name and what field may change. NiFi will query, watch state and give you new records. Nothing is hardcoded, parameterize those values and you have a generic Any RDBMS to Any Other Store data pipeline. We are reading as records, which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for. This can be ones that NiFi infers the schema or ones we use from a Schema Registry like Cloudera's amazing Open Source Schema Registry. Let's see what data is in our PostgreSQL table? How To QueryDatabaseTableRecord -We will output Json records, but could have done Parquet, XML, CSV or AVRO) UpdateAttribute (optional) - Set a table and schema name, can do with parameters as well. MergeRecord (optional) - Let's batch these up. PutORC - Let's send these records to HDFS (which could be on bare metal disks, GCS, S3, Azure or ADLS). This will build us an external Hive table. PutORC As you can see we are looking at the prices table and checking maximum values to increment on the updated_on date and the item_id sequential key. We then output JSON records. We could then: Add-Ons Examples PutKudu PutHDFS - Send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external PutHive3Streaming - Hive 3 ACID Tables PutS3 PutAzureDataLakeStorage PutHBaseRecord PublishKafkaRecord_2_* - Send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc... PutBigQueryStreaming - Google PutCassandraRecord PutDatabaseRecord - Let's send to another JDBC datastore PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud PutElasticSearchRecord PutMongoRecord PutSolrRecord PutRecord - To many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site) PutParquet - Store to HDFS as Parquet files) You can do any number or all of these or multiple copies of each to other clouds or clusters. You can also enrichment, transformation, alerts, queries or routing. These records can be also manipulated ETL/ ELT style with Record processing in stream with options such as: QueryRecord - Use Calcite ANSI SQL to query and transform records and can also change output type JoltTransformRecord - Use JOLT against any record not just JSON) LookupRecord - To match against Lookup services like caches, Kudu, REST services, ML models, HBase and more PartitionRecord - To break up into like groups SplitRecord - To break up record groups into records UpdateRecord - Update values in fields, often paired with LookupRecord ValidateRecord - Check against a schema and check for extra fields GeoEnrichIPRecord ConvertRecord - Change between types like JSON to CSV When you use PutORC, it will give you the details on building your external table. You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA. CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT)
STORED AS ORC
LOCATION '/user/tspann/prices' Part 2 REST to Database Let's reverse this now. Sometimes you want to take data, say from a REST service and store it to a JDBC datastore. InvokeHTTP - read from a REST endpoint) PutDatabaseRecord - put JSON to our JDBC store) That's it to store data to a database. We could add some of the ETL/ ELT enrichments mentioned above or others that manipulate content. REST Output Database Connection Pool Get the REST Data PutDatabaseRecord From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data. Incrementally Streaming Slides Resources https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_15.html https://github.com/tspannhw/EverythingApacheNiFi/blob/main/README.md https://www.youtube.com/watch?v=XsL63ZQYmLE https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-1-of-3/ta-p/246623 https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-cdc-mysql-nar/1.5.0/org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL/ https://dzone.com/articles/change-data-capture-using-apache-nifi https://www.linkedin.com/pulse/building-near-real-time-big-data-lake-part-2-boris-tyukin/ https://www.qlik.com/us/data-management/nifi https://www.cloudera.com/tutorials/cdp-importing-rdbms-data-into-hive.html https://www.cdata.com/kb/tech/oracledb-jdbc-apache-nifi.rst https://nathanlabadie.com/apache-nifi-ms-sql-and-kerberos-authentication/ https://dzone.com/articles/lets-build-a-simple-ingest-to-cloud-data-warehouse https://github.com/tspannhw/EverythingApacheNiFi#etl--elt--cdc--load--ingest https://www.linkedin.com/pulse/2020-streaming-edge-ai-events-tim-spann/ https://github.com/tspannhw/ApacheConAtHome2020
... View more
Labels:
01-05-2021
09:47 PM
1 Kudo
Thank you Matt! Altering the "Max Wait Time" value was a game-changing move. I still need to improve it. But the thread problem is fixed now.
... View more
12-24-2020
01:22 PM
I believe I found a solution. I ended up writing the raw ORC files to HDFS (via PutHDFS) and then loading them into Hive internal tables (via Hive3QL). The command to load data into a Hive table from an existing file is: LOAD DATA INPATH 'hdfs:///data/orc_file_name' OVERWRITE INTO TABLE hivedatabasename.tablename
... View more
12-22-2020
11:42 AM
Amazing work here sir!
... View more
12-11-2020
07:30 AM
You can make a new attribute with UpdateAttribute that merges those two together
... View more
11-12-2020
11:19 AM
1 Kudo
[FLaNK] Smart Weather Applications with Flink SQL
Sometimes you want to acquire, route, transform, live query, and analyze all the weather data in the United States while those reports happen. With FLaNK, it's a trivial process to do.
From Kafka to Kudu for Any Schema of Any Type of Data - No Code, Two Steps
The Schema Registry has full Swagger-ized Runnable REST API Documentation. Integrate, DevOps, and Migration in a simple script.
Here's your schemas, upload, edit, and compare.
Validating Data Against a Schema With Your Approved Level of Tolerance. You want extra fields allowed, you got it:
Feed that data to beautiful visual applications running in Cloudera Machine Learning.
You like drill-down maps, you got them:
Query your data fast with Apache Hue against Apache Kudu tables through Apache Impala:
Let's ingest all the US weather stations even though they are a zipped directory of a ton of XML files:
Weather Ingest is Easy Automagically!
View All Your Topic Data Enabled by Schema Registry Even in Avro Format:
Reference:
Ingesting all weather data with Apache
Source
Build
Query
Kafka Insert
Schemas
Schemas1
Schemas2
SQL
INSERT INTO weathernj
SELECT `location`, station_id,latitude,longitude,observation_time,weather,
temperature_string, temp_f,temp_c,relative_humidity,wind_string,wind_dir,wind_degrees,wind_mph,
wind_kt, pressure_in,dewpoint_string,dewpoint_f,dewpoint_c
FROM weather
WHERE
`location` is not null and `location` <> 'null' and trim(`location`) <> '' and `location` like '%NJ';
Example Slack Output
12:56
========================================================= http://forecast.weather.gov/images/wtf/small/ovc.png Location Cincinnati/Northern Kentucky International Airport, KY Station KCVG Temperature: 49.0 F (9.4 C) Humdity: 83 Wind East at 3.5 MPH (3 KT) Overcast Dewpoint 44.1 F (6.7 C)Observed at Tue, 27 Oct 2020 11:52:00 -0400---- tracking info ---- UUID: 2cb6bd67-148c-497d-badf-dfffb4906b89 Kafka offset: 0 Kafka Timestamp: 1603818351260 =========================================================
[FLaNK] Smart Weather Websocket Application - Kafka Consumer
This is based on Koji Kawamura's excellent GIST:
As part of my Smart Weather Application, I wanted to display weather information as it arrives on a webpage using web sockets. Koji has an excellent NiFi flow that does it. I tweaked it and add some things since I am not using Zeppelin. I am hosting my webpage with NiFi as well.
We simply supply a webpage that makes a WebSocket connection to NiFi and NiFi keeps a cache in HBase to know what the client is doing. This cache is updated by consuming from Kafka. We can then feed events as they happen to the page.
Here is the JavaScript for the web page interface to WebSockets:
<script>
function sendMessage(type, payload) {
websocket.send(makeMessage(type, payload));
}
function makeMessage(type, payload) {
return JSON.stringify({
'type': type,
'payload': payload
});
}
var wsUri = "ws://edge2ai-1.dim.local:9091/test";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
sendMessage('publish', {
"message": document.getElementById("kafkamessage")
});
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
var dataPoints = JSON.parse(evt.data);
var output = document.getElementById("results");
var dataBuffer = "<p>";
for(var i=0;i<dataPoints.length;i++)
{
dataBuffer += " <img src=\"" + dataPoints[i].icon_url_base + dataPoints[i].icon_url_name + "\"> " + dataPoints[i].location +
dataPoints[i].station_id + "@" + dataPoints[i].latitude + ":" +
dataPoints[i].longitude + "@" + dataPoints[i].observation_time +
dataPoints[i].temperature_string + "," + dataPoints[i].relative_humidity + "," +
dataPoints[i].wind_string +"<br>";
}
output.innerHTML = output.innerHTML + dataBuffer + "</p><br>";
};
</script> Video Walkthrough: https://www.twitch.tv/videos/797412192?es_id=bbacb7cb39 Source Code: https://github.com/tspannhw/SmartWeather/tree/main Kafka Topic
weathernj Schema
The schema registry has a live Swagger interface to it's REST API
NiFi Flow Overview
Ingest Via REST All US Weather Data from Zipped XML
As Data Streamings In, We Can Govern It
Ingested Data is Validated Against It's Schema Then Pushed to Kafka as Avro
We consume that Kafka data in-store it in Kudu for analytics
We host a web page for our Websockets Application in NiFi with 4 simple processors.
Listen and Put Web Socket Messages Between NiFi Server and Web Application
Kafka Data is Cached for Websocket Applications
Set the Port for WebSockets via Jetty Web Server
Use HBase As Our Cache
We can monitor our Flink SQL application from the Global Flink Dashboard
We can query our Weather data store in Apache Kudu via Apache Impala through Apache Hue
Kudu Visualizations of Our Weather Data in Cloudera Visual Applications
... View more
Labels: