1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1978 | 04-03-2024 06:39 AM | |
| 3122 | 01-12-2024 08:19 AM | |
| 1700 | 12-07-2023 01:49 PM | |
| 2477 | 08-02-2023 07:30 AM | |
| 3466 | 03-29-2023 01:22 PM |
03-15-2021
04:48 PM
NiFi for XML / RSS / REST Feed Ingest I want to retrieve the status from various Cloud providers and services, including Cloudera, AWS, Azure, and Google. I have found many of the available status APIs will return XML/RSS. We love that format for Apache NiFi, so let's do that. Note: If you are doing development in a non-production environment, try the new NiFi 1.13.1. If you need to run your flows in production on-premise, private cloud, or in the public cloud, then use Cloudera Flow Management. I have separated the processing module "Status" from the input, so I can pass in the input anyway I want. When I move this to a K8 environment, this will become a parameter that I pass in. Stay tuned to Cloudera releases. The flow is pretty simple to process RSS status data. We call the status URL and in the next step easily convert RSS into JSON for easier processing. I split these records and grab just the fields I like. I can easily add additional fields from my metadata for unique id, timestamp, company name, and service name. PutKudu will store my JSON records as Kudu fields at high speed. If something goes wrong, we will try again. Sometimes, the internet is down! But, without this app, how will we know??? We can run a QueryRecord processor to query live fields from the status messages and I will send Spark-related ones to my Slack channel. I can add as many ANSI SQL92 Calcite queries as I wish. It's easy. We were easily able to insert all the status messages to our 'cloudstatus' table. Now, we can query it and use it in reports, dashboards, and visual applications. I don't want to have to go to external sites to get the status alerts, so I will post key ones to a Slack channel. I want to store my status reads in a table for fast analytics and permanent storage. So, I will store it in a Kudu table with Impala on top for fast queries. CREATE TABLE cloudstatus ( `uuid` STRING, `ts` TIMESTAMP, `companyname` STRING, `servicename` STRING, `title` STRING, `description` STRING, `pubdate` STRING, `link` STRING, `guid` STRING, PRIMARY KEY (`uuid`,`ts` ) ) PARTITION BY HASH PARTITIONS 4 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); My source code is available here. In the next step, I can write some real-time dashboard with Cloudera Visual Apps, add fast queries on Kafka with Flink SQL, or write some machine learning in Cloudera Machine Learning to finish the application. Join my next live video broadcast to suggest what we do with this data next. Thanks for reading!
... View more
Labels:
03-12-2021
03:56 PM
1 Kudo
Since I could never find the snippet for it, here it goes val queryListener = new QueryExecutionListener {
var numOutputRows: Option[Long] = None
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}
spark.sqlContext.listenerManager.register(queryListener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sqlContext.listenerManager.unregister(queryListener)
}
queryListener.numOutputRows match {
case Some(num) => println(s"Wrote $num rows -- should be 500")
case => println(" the query did not succeed or there's no `numRowsWriten` metric")
} Or, alternatively, val listener = new SparkListener {
val rowsWritten: AtomicLong = new AtomicLong(0) // probably unecessary
override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
}
}
spark.sparkContext.addSparkListener(listener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sparkContext.removeSparkListener(listener)
}
println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")
... View more
02-26-2021
06:32 AM
if you add the custom timestamp to the reader and writers it should do this automatically for all timestamp fields
... View more
02-23-2021
11:28 AM
@TimothySpann thank for you ansewer Finaly i have use a postgres Table
... View more
02-23-2021
07:12 AM
1 Kudo
I have downloaded 10,000 processor flows. It could be your local workstation or firewall. Also for future notice, templates will be removed. Please use NiFi registry. https://www.datainmotion.dev/2019/11/nifi-toolkit-cli-for-nifi-110.html https://www.datainmotion.dev/2020/10/automating-building-migration-backup.html
... View more
02-23-2021
02:01 AM
I'm new to NiFi, and I'm not sure your data flow has same condition as mine, but I have a same issue of being occurred the same exception that you mentioned. I'm using Oracle 11g XE, there was no invalid query nor invalid data. In addition, I had another problem with the Oracle session of PutSQL been locked when I let a lot of flowfile flow to PutSQL processor, e.g., 5,000 flowfile in 0.5 sec. I have spent all day long to fix this problem today modifying almost every single properties of all processors connected to the flow, and even of DBCP controller service... and finally found the cause. In processor PutSQL, there is a property named 'Support Fragmented Transactions'. I don't know pretty much about this and need to know how it works, but when I have set it false, the problem was solved. And it took some time more than before. I'm not an expert of NiFi, but I hope this might be helpful for you.
... View more
02-22-2021
07:08 AM
Hi turns out it was a permissions issue, so I had created a policy to allow the user to view the data provenance inside the Process Group but for that to work you need to add a global policy to allow that user to view the data provenance. Seems like a bit of a hack to me but thats what I had to do to get it working. Thanks for your help
... View more
02-22-2021
07:07 AM
1 Kudo
New Features of Apache NiFi 1.13.0
Check it out: https://twitter.com/pvillard31/status/1361569608327716867?s=27
Download today: Apache NiFi Downloads
Release Notes: Release Notes (Apache NiFi)
Migration: Migration Guidance
New Features
ListenFTP
UpdateHiveTable - Hive DDL changes - Hive Update Schema i.e. Data Drift i.e. Hive Schema Migration!!!!
SampleRecord - different sampling approaches to records (Interval Sampling, Probabilistic Sampling, Reservoir Sampling)
CDC updates
Kudu updates
AMQP and MQTT integration upgrades
ConsumeMQTT - readers, and writers added
HTTP access to NiFi by default is now configured to accept connections to 127.0.0.1/localhost only. If you want to allow broader access for some reason for HTTP, and you understand the security implications, you can still control that as always by changing the 'nifi.web.http.host' property in nifi.properties as always. That said, take the time to configure proper HTTPS. We offer detailed instructions and tooling to assist.
ConsumeMQTT - add record reader/writer
The ability to run NiFi with no GUI as MiNiFi/NiFi combined code base continues.
Support for Kudu dates
Updated GRPC versions
Apache Calcite update
PutDatabaseRecord update
Here is an example for NiFi ETL Flow:
Example NiFi 1.13.0 Flow:
ConsumeMQTT: now with readers
UpdateAttribute: set record.sink.name to kafka and recordreader.name to json.
SampleRecord: sample a few of the records
PutRecord: Use reader and destination service
UpdateHiveTable: new sink
Consume from MQTT and read and write to/from records.
Some example attributes from a running flow:
Connection pools for DatabaseRecordSinks can be JDBC, Hadoop, and Hive.
FreeFormTextRecordSetWriter is great for writing any format.
RecordSinkService, we will pick Kafka as our destination.
KafkaRecordSink from PutRecord
The reader will pick JSON in our example based on our UpdateAttribute; we can dynamically change this as data streams.
ReaderLookup - lets you pick a reader based on an attribute.
We have defined readers for Parquet, JSON, AVRO, XML, and CSV; no matter the type, I can automagically read it. Great for reusing code and great for cases like our new ListenFTP where you may get sent tons of different files to process. Use one FLOW!
RecordSinkService can help you make all our flows generic so you can drop in different sinks/destinations for your writers based on what the data coming in is. This is revolutionary for code reuse.
We can write our output in a custom format that could look like a document, HTML, fixed-width, a form letter, weird delimiter, or whatever you need.
Sample records using different methods.
We use the RecordSinkServiceLookup to allow us to change our sink location dynamically; we are passing in an attribute to choose Kafka.
We have pushed our data to Kafka using KafkaRecordSink. We can see our data easily in Streams Messaging Manager (SMM).
With a RecordReaderFactory, you can pick readers like the new WindowsEventLogReader.
As another output, we can UpdateHiveTable from our data and change the table as needed.
Straight From Release Notes: New Feature
[NIFI-7386] - AzureStorageCredentialsControllerService should also connect to storage emulator
[NIFI-7429] - Add Status History capabilities for system-level metrics
[NIFI-7549] - Adding Hazelcast based implementation for DistributedMapCacheClient
[NIFI-7624] - Build a ListenFTP processor
[NIFI-7745] - Add a SampleRecord processor
[NIFI-7796] - Add Prometheus metrics for total bytes received and bytes sent for components
[NIFI-7801] - Add acknowledgment check to Splunk
[NIFI-7821] - Create a Cassandra implementation of DistributedMapCacheClient
[NIFI-7879] - Create record path function for UUID v5
[NIFI-7906] - Add graph processor with the flexibility to query graph database conditioned on flowfile content and attributes
[NIFI-7989] - Add Hive "data drift" processor
[NIFI-8136] - Allow State Management to be tied to Process Session
[NIFI-8142] - Add "on conflict do nothing" feature to PutDatabaseRecord
[NIFI-8146] - Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord
[NIFI-8175] - Add a WindowsEventLogReader
An update on Cloudera Flow Management!
Cloudera Flow Management on DataHub Public Cloud
This minor update has some Schema Registry and Atlas integration updates.
What's New in Cloudera DataFlow for Data Hub 7.2.7
Supported NiFi Processors
If that wasn't enough, a new version of MiNiFi C++ Agent!
Cloudera Edge Manager 1.2.2 Release
February 15, 2021
CEM MiNiFi C++ Agent - 1.21.01 release includes:
Support for JSON output in the Consume Windows Even Log processor
Full Expression Language support on Windows
Full S3 support (List, Fetch, Get, Put)
MiNiFi C++ download locations
MiNiFi C++ agent updates
Remember when you are done.
... View more
Labels:
02-22-2021
07:00 AM
most file formats can be extracted via tika https://community.cloudera.com/t5/Community-Articles/ExtractText-NiFi-Custom-Processor-Powered-by-Apache-Tika/ta-p/249392
... View more
02-22-2021
06:59 AM
Lookup the proper java regex you need https://examples.javacodegeeks.com/core-java/util/regex/list-files-with-regular-expression-filtering/ https://www.freeformatter.com/java-regex-tester.html https://regexr.com/
... View more