Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
674 | 02-23-2024 01:14 AM | |
862 | 01-26-2024 01:31 AM | |
590 | 11-22-2023 12:28 AM | |
1343 | 11-22-2023 12:10 AM | |
1556 | 11-06-2023 12:44 AM |
10-19-2023
04:50 AM
I resolved the issue. I had DepartmentId in my avro schema however, I was getting DepartmentID in the input. Schemas are case sensitive. Now, that the string values are appended in the DepartmentID column, I am using RouteOnContent processors and using ${DepartmentID:equals('Undefined')} to match if the DepartmentID' values are of undefined or null type. It is not identifying the "Undefined" values. is the regex statement correct? Content of flow file is: [ { "TenantId" : "FF369226-0EDA-4D0D-B48C-6B27F404ECCB", "DepartmentID" : "Undefined", "Name": "xyz" } ]
... View more
10-18-2023
11:53 PM
1 Kudo
@Fanxxx, How I would do the first POC: 1) GetMongoRecord: execute the count on the first table. Using the property "Query Output Attribute" you save that value directly as an attribute. 2) connected to the success queue another GetMongoRecord: execute the count on the second table. Using the property "Query Output Attribute" you save that value directly as an attribute. 3) connected to the success queue an RouteOnAttribute: here you define a rules --> if count1=count2, do what you want to do, otherwise call the logic for the insert, as you said. (using NiFi Expression language: ${attribute1:equals(${attribute2})} )
... View more
10-12-2023
11:02 AM
@s300570 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
10-12-2023
04:07 AM
Thank you for the reply, but my main concern is to have large file before input to HDFS.
... View more
10-11-2023
01:03 PM
Hello! I have the same problem, do you know how to set in NiFi, the version of the JDK to use?
... View more
10-11-2023
06:44 AM
1 Kudo
Again, you answer is a little bit vague and it does not provide any information whatsoever, so therefor you answer will be quite generic. You can use any Processor you want to extract the data from your source database: GenerateTableFetch+ExecuteSQLRecord, ExecuteSQLRecord, ExecuteSQL, QueryDatabaseTable, QueryDatabaseTableRecord. In all of them you will have to define a DBCP Connection Service so that you can connect to your database. In the processors with Record, you can define the type of the output you will see (Avro, Parquet, JSON, etc) Afterwards, you do whatever processing you need and assuming that you will use an RestAPI, you can use the InvokeHTTP Processor to call that API Endpoint, with whatever parameters you require. Take note that if those parameters are inside the flowfile, you will need to extract them as attributes, meaning that you will have to add some extra processing.
... View more
10-09-2023
02:35 PM
Is this the expected output? __________________________________________________________________ P o w e r T O P ____________________________________________________________________ * * * System Information * * * PowerTOP Version v2.11-1-g7ef7f79 ran at Mon Sep 11 11:41:58 2023 Kernel Version Linux version 5.19.0-1025-aws System Name 4.11.amazon CPU Information 1 Intel(R) Xeon(R) CPU E5-2676 v3 @ 2.40GHz OS Information Ubuntu 22.04.2 LTS Target: 1 units/s System: 24.5 wakeup/s CPU: 0.1% usage GPU: 0 ops/s GFX: 0 wakeups/s VFS: 0 ops/s ____________________________________________________________________ * * * Top 10 Power Consumers * * * Usage Events/s Category Description PW Estimate 0.00% 3.8 kWork psi_avgs_work 15.1 mW 0.00% 3.6 Process [PID 493] /usr/bin/containerd 14.5 mW 0.00% 3.6 Timer tick_sched_timer 14.5 mW 0.00% 2.6 Timer hrtimer_wakeup 10.4 mW 0.00% 1.6 Process [PID 677] /usr/bin/containerd 6.46 mW 0.00% 1.2 Interrupt [4] block(softirq) 4.95 mW 0.00% 1.2 Process [PID 1150] ./prometheus --config.file=prometheus.yml 4.76 mW 0.00% 1.1 Process [PID 27] [kcompactd0] 4.37 mW 0.00% 1 Process [PID 37361] /sbin/multipathd -d -s 4.01 mW 0.00% 0.6 Process [PID 30935] ./prometheus --config.file=prometheus.yml 3.02 mW ____________________________________________________________________ * * * Processor Idle State Report * * * Package 0 C3 (pc3) 0.10% C6 (pc6) 0.00% Core 0 C3 (cc3) 38427894381.50% C6 (cc6) 34.30% CPU 0 C0 active 38426768533.80% POLL 0.00% 0.1 ms C1 99.90% 40.6 ms ____________________________________________________________________ * * * Processor Frequency Report * * * Package 0 Idle inf% Core 0 Idle inf% CPU 0 Average 2.4 GHz Idle inf% ____________________________________________________________________ * * * Overview of Software Power Consumers * * * Usage Wakeups/s GPU ops/s Disk IO/s GFX Wakeups/s Category Description PW Estimate 37.8 us/s 3.8 kWork psi_avgs_work 15.1 mW 56.0 us/s 3.6 Process [PID 493] /usr/bin/containerd 14.5 mW 49.9 us/s 3.6 Timer tick_sched_timer 14.5 mW 66.9 us/s 2.6 Timer hrtimer_wakeup 10.4 mW 88.7 us/s 1.6 Process [PID 677] /usr/bin/containerd 6.46 mW 9.4 us/s 1.2 Interrupt [4] block(softirq) 4.95 mW 15.8 us/s 1.2 Process [PID 1150] ./prometheus --config.file=prometheus.yml 4.76 mW 18.2 us/s 1.1 Process [PID 27] [kcompactd0] 4.37 mW 37.4 us/s 1 Process [PID 37361] /sbin/multipathd -d -s 4.01 mW 287.2 us/s 0.6 Process [PID 30935] ./prometheus --config.file=prometheus.yml 3.02 mW 8.7 us/s 0.7 kWork blk_mq_run_work_fn 2.78 mW 5.5 us/s 0.4 kWork wb_workfn 1.79 mW 11.1 us/s 0.3 kWork flush_memcg_stats_dwork 1.40 mW 22.7 us/s 0.25 Interrupt [3] net_rx(softirq) 1.02 mW 3.2 us/s 0.2 Process [PID 705] /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock 795 uW 2.4 us/s 0.2 kWork vmstat_shepherd 794 uW 1.8 us/s 0.2 Process [PID 14] [rcu_sched] 793 uW 0.7 us/s 0.2 Interrupt [2] net tx(softirq) 791 uW 0.5 us/s 0.2 kWork neigh_managed_work 791 uW 13.9 us/s 0.15 Process [PID 37134] sshd: ubuntu@pts/0 614 uW 4.0 us/s 0.15 Process [PID 37494] powertop --csv=powertop212.csv 599 uW 0.6 us/s 0.15 kWork blk_mq_timeout_work 593 uW 11.5 us/s 0.1 Process [PID 37372] /usr/sbin/chronyd -F 1 413 uW 2.5 us/s 0.1 Timer sched_rt_period_timer 399 uW 1.7 us/s 0.1 Timer watchdog_timer_fn 398 uW 1.4 us/s 0.1 Process [PID 31375] /usr/lib/snapd/snapd 397 uW 0.2 us/s 0.1 kWork wb_update_bandwidth_workfn 395 uW 0.1 us/s 0.1 kWork ext4_discard_work 395 uW 191.6 us/s Interrupt [48] timer0 299 uW 3.1 us/s 0.05 Process [PID 37336] /usr/libexec/packagekitd 202 uW 2.9 us/s 0.05 Process [PID 418] /usr/sbin/cron -f -P 202 uW 1.3 us/s 0.05 Process [PID 13] [ksoftirqd/0] 199 uW 0.4 us/s 0.05 kWork ext4_end_io_rsv_work 198 uW 0.1 us/s 0.05 kWork neigh_periodic_work 198 uW 0.1 us/s 0.05 kWork process_srcu 198 uW 41.9 us/s Process [PID 37337] [kworker/0:2] 0 mW 41.7 us/s Timer delayed_work_timer_fn 0 mW 40.3 us/s Interrupt [1] timer(softirq) 0 mW 28.4 us/s Process [PID 36708] [kworker/u30:0] 0 mW 14.2 us/s Timer process_timeout 0 mW 12.1 us/s Timer tcp_orphan_update 0 mW 11.0 us/s Process [PID 124] [jbd2/xvda1-8] 0 mW 10.8 us/s Process [PID 37347] /sbin/multipathd -d -s 0 mW 6.0 us/s Process [PID 37150] [kworker/u30:2] 0 mW 5.7 us/s Interrupt [9] RCU(softirq) 0 mW 5.4 us/s Timer clocksource_watchdog 0 mW 4.9 us/s Timer blk_stat_timer_fn 0 mW 4.9 us/s Interrupt [55] blkif 0 mW 4.3 us/s Process [PID 29] [khugepaged] 0 mW 3.7 us/s Process [PID 41] [kworker/0:1H] 0 mW 3.7 us/s Process [PID 746] /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock 0 mW 2.3 us/s Process [PID 15] [migration/0] 0 mW 2.2 us/s Interrupt [56] eth0 0 mW 1.3 us/s Timer mix_interrupt_randomness 0 mW 1.1 us/s Timer blk_rq_timed_out_timer 0 mW 1.0 us/s kWork vmstat_update 0 mW 0.9 us/s Timer writeout_period 0 mW 0.7 us/s Timer commit_timeout 0 mW 0.7 us/s Timer tcp_write_timer 0 mW 0.4 us/s kWork lru_add_drain_per_cpu 0 mW 0.2 us/s Timer tcp_delack_timer 0 mW 0.2 us/s kWork wq_barrier_func 0 mW 0.1 us/s Timer tcp_keepalive_timer 0 mW 0.1 us/s Timer neigh_timer_handler 0 mW 0.0 us/s kWork srcu_invoke_callbacks 0 mW 0.0 us/s Timer srcu_delay_timer 0 mW The system baseline power is estimated at: 99.1 m W ____________________________________________________________________ * * * Device Power Report * * * Usage Device Name PW Estimate 0.10% CPU misc 99.1 mW 0.10% CPU core 0 mW 0.10% DRAM 0 mW 100.00% PCI Device: Intel Corporation 82371SB PIIX3 ISA [Natoma/Triton II] 100.00% PCI Device: Intel Corporation 440FX - 82441FX PMC [Natoma] 100.00% PCI Device: Intel Corporation 82371AB/EB/MB PIIX4 ACPI 100.00% PCI Device: XenSource Inc. Xen Platform Device 100.00% PCI Device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] 100.00% PCI Device: Cirrus Logic GD 5446 1.0 pkts/s Network interface: eth0 (vif) 0.0 pkts/s nic:docker0 0.00% runtime-alarmtimer.0.auto 0.00% runtime-reg-dummy 0.00% runtime-kgdboc 0.00% runtime-platform-framebuffer.0 0.00% runtime-eisa.0 0.00% runtime-Fixed MDIO bus.0 0.00% runtime-PNP0800:00 0.00% runtime-PNP0103:00 0.00% runtime-serial8250 0.00% runtime-i8042 0.00% runtime-pcspkr ____________________________________________________________________ * * * Process Device Activity * * * Process Device systemd /dev/autofs /dev/kmsg /dev/rfkill acpid /dev/input/event0 /dev/input/event1 /dev/input/event2 systemd-logind /dev/input/event0 /dev/input/event1 /dev/input/event2 systemd-journal /dev/kmsg /dev/kmsg multipathd /dev/mapper/control /dev/mapper/control ____________________________________________________________________ * * * AHCI ALPM Residency Statistics - Not supported on this macine * * * ____________________________________________________________________ * * * Software Settings in Need of Tuning * * * Description Script VM writeback timeout echo '1500' > '/proc/sys/vm/dirty_writeback_centisecs' Runtime PM for PCI Device Intel Corporation 82371SB PIIX3 ISA [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.0/power/control' Runtime PM for PCI Device Intel Corporation 440FX - 82441FX PMC [Natoma] echo 'auto' > '/sys/bus/pci/devices/0000:00:00.0/power/control' Runtime PM for PCI Device Intel Corporation 82371AB/EB/MB PIIX4 ACPI echo 'auto' > '/sys/bus/pci/devices/0000:00:01.3/power/control' Runtime PM for PCI Device XenSource Inc. Xen Platform Device echo 'auto' > '/sys/bus/pci/devices/0000:00:03.0/power/control' Runtime PM for PCI Device Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/power/control' Runtime PM for port ata1 of PCI device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/ata1/power/control' Runtime PM for port ata2 of PCI device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/ata2/power/control' Runtime PM for PCI Device Cirrus Logic GD 5446 echo 'auto' > '/sys/bus/pci/devices/0000:00:02.0/power/control' ____________________________________________________________________ * * * Untunable Software Issues * * * Description ____________________________________________________________________ * * * Optimal Tuned Software Settings * * * Description NMI watchdog should be turned off
... View more
10-09-2023
12:36 AM
1 Kudo
Well the only advise I can give you is to write your processor and see what errors you have and come back with them. Nobody can write your processor if only you know your requirements. What I can suggest you though, is to have a look at the following examples, as they might assist you with what you are trying to achieve: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148
... View more
10-07-2023
09:48 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab. "Script Body" below: ''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)
... View more