1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
802 | 04-03-2024 06:39 AM | |
1551 | 01-12-2024 08:19 AM | |
787 | 12-07-2023 01:49 PM | |
1359 | 08-02-2023 07:30 AM | |
1959 | 03-29-2023 01:22 PM |
04-15-2021
06:28 AM
check disk space http://apache-nifi-users-list.2361937.n4.nabble.com/Clarifications-on-getting-flowfiles-using-FlowFileFilters-td7333.html
... View more
04-13-2021
12:47 AM
Thanks. We have decided to introduce Priority attribute & use the corresponding prioritizer for the connection.
... View more
04-12-2021
04:39 PM
Add more RAM and CPU to your NiFi server Add more NiFi servers to your cluster. How much is HUGE DATA? You will need some amount of RAM to process it. Make sure minimum JVM RAM on each node is a good size for big workloads. 32G at least.
... View more
04-12-2021
04:37 PM
Put both tables in Kafka topics and have SQL Stream Builder joing them with a simple SQL Join. or https://community.cloudera.com/t5/Support-Questions/Nifi-how-to-sql-join-two-flowfiles/td-p/298227 http://apache-nifi-users-list.2361937.n4.nabble.com/Joining-two-or-more-flow-files-and-merging-the-content-td10543.html https://medium.com/@surajnagendra/merge-csv-files-apache-nifi-21ba44e1b719
... View more
04-12-2021
05:35 AM
@Masi The exception does not appear related to Load Balance connections in NiFi. LB Connections utilize NiFi S2S in the background which does not use MySQL. Matt
... View more
04-01-2021
03:51 PM
Move all events to Kafka
... View more
03-24-2021
03:23 PM
1 Kudo
What are the column names in your table? Assuming "carId" and "carType", you can use JoltTransformJson or JoltTransformRecord with the following spec: [ { "operation": "shift", "spec": { "*": { "$": "carId", "@": "carType" } } }, { "operation": "shift", "spec": { "carId": { "*": { "@": "[&0].carId" } }, "carType": { "*": { "@": "[&0].carType" } } } } ]
... View more
03-18-2021
02:06 PM
NiFi versions are tied to Hive versions so you need to compatible one. Check with your Cloudera team to ge tthe correct version. Using PutHive3Streaming will be faster. So is just PutOrc, PutParquet or PutHDFS.
... View more
03-15-2021
04:48 PM
NiFi for XML / RSS / REST Feed Ingest I want to retrieve the status from various Cloud providers and services, including Cloudera, AWS, Azure, and Google. I have found many of the available status APIs will return XML/RSS. We love that format for Apache NiFi, so let's do that. Note: If you are doing development in a non-production environment, try the new NiFi 1.13.1. If you need to run your flows in production on-premise, private cloud, or in the public cloud, then use Cloudera Flow Management. I have separated the processing module "Status" from the input, so I can pass in the input anyway I want. When I move this to a K8 environment, this will become a parameter that I pass in. Stay tuned to Cloudera releases. The flow is pretty simple to process RSS status data. We call the status URL and in the next step easily convert RSS into JSON for easier processing. I split these records and grab just the fields I like. I can easily add additional fields from my metadata for unique id, timestamp, company name, and service name. PutKudu will store my JSON records as Kudu fields at high speed. If something goes wrong, we will try again. Sometimes, the internet is down! But, without this app, how will we know??? We can run a QueryRecord processor to query live fields from the status messages and I will send Spark-related ones to my Slack channel. I can add as many ANSI SQL92 Calcite queries as I wish. It's easy. We were easily able to insert all the status messages to our 'cloudstatus' table. Now, we can query it and use it in reports, dashboards, and visual applications. I don't want to have to go to external sites to get the status alerts, so I will post key ones to a Slack channel. I want to store my status reads in a table for fast analytics and permanent storage. So, I will store it in a Kudu table with Impala on top for fast queries. CREATE TABLE cloudstatus ( `uuid` STRING, `ts` TIMESTAMP, `companyname` STRING, `servicename` STRING, `title` STRING, `description` STRING, `pubdate` STRING, `link` STRING, `guid` STRING, PRIMARY KEY (`uuid`,`ts` ) ) PARTITION BY HASH PARTITIONS 4 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); My source code is available here. In the next step, I can write some real-time dashboard with Cloudera Visual Apps, add fast queries on Kafka with Flink SQL, or write some machine learning in Cloudera Machine Learning to finish the application. Join my next live video broadcast to suggest what we do with this data next. Thanks for reading!
... View more
Labels:
03-12-2021
03:56 PM
1 Kudo
Since I could never find the snippet for it, here it goes val queryListener = new QueryExecutionListener {
var numOutputRows: Option[Long] = None
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long Unit = {
numOutputRows = qe.executedPlan.metrics.get("numOutputRows").map(_.value)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception Unit = ()
}
spark.sqlContext.listenerManager.register(queryListener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sqlContext.listenerManager.unregister(queryListener)
}
queryListener.numOutputRows match {
case Some(num) => println(s"Wrote $num rows -- should be 500")
case => println(" the query did not succeed or there's no `numRowsWriten` metric")
} Or, alternatively, val listener = new SparkListener {
val rowsWritten: AtomicLong = new AtomicLong(0) // probably unecessary
override def onTaskEnd(taskEnd: SparkListenerTaskEnd Unit = {
rowsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
}
}
spark.sparkContext.addSparkListener(listener)
try {
// write some 500 numbers with some unecessary complex query
spark.sparkContext
.range(0, 1000, 1).toDS
.map(_ / 2)
.distinct()
.write.parquet("file:///tmp/numbers")
} finally {
spark.sparkContext.removeSparkListener(listener)
}
println(s"Wrote ${listener.rowsWritten.get()} -- should be 500")
... View more