Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3843 | 12-29-2023 09:36 AM | |
5651 | 12-28-2023 01:01 PM | |
1110 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1749 | 11-21-2023 10:56 PM |
11-22-2023
11:33 AM
2 Kudos
I you're confident the data returned is consistent and always more than 7 lines...then a quick and dirty would be a Groovy script like this. import java.nio.charset.StandardCharsets
FlowFile flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
String[] data = inputStream.readLines()
data = data.drop(7)
outputStream.write(data.join("\n").getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
... View more
11-21-2023
10:56 PM
1 Kudo
It could be NiFi only includes the core Groovy jar files and you may need to download the additional ones and add to the NiFi lib directory for the additional classes to be accessible.
... View more
11-21-2023
02:29 PM
What you posted looks like AMQP/RabbitMQ related messages. What does your Python script do and return? You might be able to build a lot of what that Python script is doing directly in NiFi so you're not having to do external calls.
... View more
11-21-2023
02:20 PM
A quick Google search shows it's possible to build a DTLS server (Listener) with Java which means it might be possible to create a custom Groovy based processor to implement it.
... View more
11-21-2023
01:56 PM
What's wrong with 1:1 relationship? If you're concerned with performance, then leveraging "Message Demarcator" for the Consume and Publish will provide the best throughput.
... View more
11-21-2023
01:50 PM
Why do you need to stop/terminate the connection? Sort of defeats how Kafka is meant to function.
... View more
10-09-2023
02:35 PM
Is this the expected output? __________________________________________________________________ P o w e r T O P ____________________________________________________________________ * * * System Information * * * PowerTOP Version v2.11-1-g7ef7f79 ran at Mon Sep 11 11:41:58 2023 Kernel Version Linux version 5.19.0-1025-aws System Name 4.11.amazon CPU Information 1 Intel(R) Xeon(R) CPU E5-2676 v3 @ 2.40GHz OS Information Ubuntu 22.04.2 LTS Target: 1 units/s System: 24.5 wakeup/s CPU: 0.1% usage GPU: 0 ops/s GFX: 0 wakeups/s VFS: 0 ops/s ____________________________________________________________________ * * * Top 10 Power Consumers * * * Usage Events/s Category Description PW Estimate 0.00% 3.8 kWork psi_avgs_work 15.1 mW 0.00% 3.6 Process [PID 493] /usr/bin/containerd 14.5 mW 0.00% 3.6 Timer tick_sched_timer 14.5 mW 0.00% 2.6 Timer hrtimer_wakeup 10.4 mW 0.00% 1.6 Process [PID 677] /usr/bin/containerd 6.46 mW 0.00% 1.2 Interrupt [4] block(softirq) 4.95 mW 0.00% 1.2 Process [PID 1150] ./prometheus --config.file=prometheus.yml 4.76 mW 0.00% 1.1 Process [PID 27] [kcompactd0] 4.37 mW 0.00% 1 Process [PID 37361] /sbin/multipathd -d -s 4.01 mW 0.00% 0.6 Process [PID 30935] ./prometheus --config.file=prometheus.yml 3.02 mW ____________________________________________________________________ * * * Processor Idle State Report * * * Package 0 C3 (pc3) 0.10% C6 (pc6) 0.00% Core 0 C3 (cc3) 38427894381.50% C6 (cc6) 34.30% CPU 0 C0 active 38426768533.80% POLL 0.00% 0.1 ms C1 99.90% 40.6 ms ____________________________________________________________________ * * * Processor Frequency Report * * * Package 0 Idle inf% Core 0 Idle inf% CPU 0 Average 2.4 GHz Idle inf% ____________________________________________________________________ * * * Overview of Software Power Consumers * * * Usage Wakeups/s GPU ops/s Disk IO/s GFX Wakeups/s Category Description PW Estimate 37.8 us/s 3.8 kWork psi_avgs_work 15.1 mW 56.0 us/s 3.6 Process [PID 493] /usr/bin/containerd 14.5 mW 49.9 us/s 3.6 Timer tick_sched_timer 14.5 mW 66.9 us/s 2.6 Timer hrtimer_wakeup 10.4 mW 88.7 us/s 1.6 Process [PID 677] /usr/bin/containerd 6.46 mW 9.4 us/s 1.2 Interrupt [4] block(softirq) 4.95 mW 15.8 us/s 1.2 Process [PID 1150] ./prometheus --config.file=prometheus.yml 4.76 mW 18.2 us/s 1.1 Process [PID 27] [kcompactd0] 4.37 mW 37.4 us/s 1 Process [PID 37361] /sbin/multipathd -d -s 4.01 mW 287.2 us/s 0.6 Process [PID 30935] ./prometheus --config.file=prometheus.yml 3.02 mW 8.7 us/s 0.7 kWork blk_mq_run_work_fn 2.78 mW 5.5 us/s 0.4 kWork wb_workfn 1.79 mW 11.1 us/s 0.3 kWork flush_memcg_stats_dwork 1.40 mW 22.7 us/s 0.25 Interrupt [3] net_rx(softirq) 1.02 mW 3.2 us/s 0.2 Process [PID 705] /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock 795 uW 2.4 us/s 0.2 kWork vmstat_shepherd 794 uW 1.8 us/s 0.2 Process [PID 14] [rcu_sched] 793 uW 0.7 us/s 0.2 Interrupt [2] net tx(softirq) 791 uW 0.5 us/s 0.2 kWork neigh_managed_work 791 uW 13.9 us/s 0.15 Process [PID 37134] sshd: ubuntu@pts/0 614 uW 4.0 us/s 0.15 Process [PID 37494] powertop --csv=powertop212.csv 599 uW 0.6 us/s 0.15 kWork blk_mq_timeout_work 593 uW 11.5 us/s 0.1 Process [PID 37372] /usr/sbin/chronyd -F 1 413 uW 2.5 us/s 0.1 Timer sched_rt_period_timer 399 uW 1.7 us/s 0.1 Timer watchdog_timer_fn 398 uW 1.4 us/s 0.1 Process [PID 31375] /usr/lib/snapd/snapd 397 uW 0.2 us/s 0.1 kWork wb_update_bandwidth_workfn 395 uW 0.1 us/s 0.1 kWork ext4_discard_work 395 uW 191.6 us/s Interrupt [48] timer0 299 uW 3.1 us/s 0.05 Process [PID 37336] /usr/libexec/packagekitd 202 uW 2.9 us/s 0.05 Process [PID 418] /usr/sbin/cron -f -P 202 uW 1.3 us/s 0.05 Process [PID 13] [ksoftirqd/0] 199 uW 0.4 us/s 0.05 kWork ext4_end_io_rsv_work 198 uW 0.1 us/s 0.05 kWork neigh_periodic_work 198 uW 0.1 us/s 0.05 kWork process_srcu 198 uW 41.9 us/s Process [PID 37337] [kworker/0:2] 0 mW 41.7 us/s Timer delayed_work_timer_fn 0 mW 40.3 us/s Interrupt [1] timer(softirq) 0 mW 28.4 us/s Process [PID 36708] [kworker/u30:0] 0 mW 14.2 us/s Timer process_timeout 0 mW 12.1 us/s Timer tcp_orphan_update 0 mW 11.0 us/s Process [PID 124] [jbd2/xvda1-8] 0 mW 10.8 us/s Process [PID 37347] /sbin/multipathd -d -s 0 mW 6.0 us/s Process [PID 37150] [kworker/u30:2] 0 mW 5.7 us/s Interrupt [9] RCU(softirq) 0 mW 5.4 us/s Timer clocksource_watchdog 0 mW 4.9 us/s Timer blk_stat_timer_fn 0 mW 4.9 us/s Interrupt [55] blkif 0 mW 4.3 us/s Process [PID 29] [khugepaged] 0 mW 3.7 us/s Process [PID 41] [kworker/0:1H] 0 mW 3.7 us/s Process [PID 746] /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock 0 mW 2.3 us/s Process [PID 15] [migration/0] 0 mW 2.2 us/s Interrupt [56] eth0 0 mW 1.3 us/s Timer mix_interrupt_randomness 0 mW 1.1 us/s Timer blk_rq_timed_out_timer 0 mW 1.0 us/s kWork vmstat_update 0 mW 0.9 us/s Timer writeout_period 0 mW 0.7 us/s Timer commit_timeout 0 mW 0.7 us/s Timer tcp_write_timer 0 mW 0.4 us/s kWork lru_add_drain_per_cpu 0 mW 0.2 us/s Timer tcp_delack_timer 0 mW 0.2 us/s kWork wq_barrier_func 0 mW 0.1 us/s Timer tcp_keepalive_timer 0 mW 0.1 us/s Timer neigh_timer_handler 0 mW 0.0 us/s kWork srcu_invoke_callbacks 0 mW 0.0 us/s Timer srcu_delay_timer 0 mW The system baseline power is estimated at: 99.1 m W ____________________________________________________________________ * * * Device Power Report * * * Usage Device Name PW Estimate 0.10% CPU misc 99.1 mW 0.10% CPU core 0 mW 0.10% DRAM 0 mW 100.00% PCI Device: Intel Corporation 82371SB PIIX3 ISA [Natoma/Triton II] 100.00% PCI Device: Intel Corporation 440FX - 82441FX PMC [Natoma] 100.00% PCI Device: Intel Corporation 82371AB/EB/MB PIIX4 ACPI 100.00% PCI Device: XenSource Inc. Xen Platform Device 100.00% PCI Device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] 100.00% PCI Device: Cirrus Logic GD 5446 1.0 pkts/s Network interface: eth0 (vif) 0.0 pkts/s nic:docker0 0.00% runtime-alarmtimer.0.auto 0.00% runtime-reg-dummy 0.00% runtime-kgdboc 0.00% runtime-platform-framebuffer.0 0.00% runtime-eisa.0 0.00% runtime-Fixed MDIO bus.0 0.00% runtime-PNP0800:00 0.00% runtime-PNP0103:00 0.00% runtime-serial8250 0.00% runtime-i8042 0.00% runtime-pcspkr ____________________________________________________________________ * * * Process Device Activity * * * Process Device systemd /dev/autofs /dev/kmsg /dev/rfkill acpid /dev/input/event0 /dev/input/event1 /dev/input/event2 systemd-logind /dev/input/event0 /dev/input/event1 /dev/input/event2 systemd-journal /dev/kmsg /dev/kmsg multipathd /dev/mapper/control /dev/mapper/control ____________________________________________________________________ * * * AHCI ALPM Residency Statistics - Not supported on this macine * * * ____________________________________________________________________ * * * Software Settings in Need of Tuning * * * Description Script VM writeback timeout echo '1500' > '/proc/sys/vm/dirty_writeback_centisecs' Runtime PM for PCI Device Intel Corporation 82371SB PIIX3 ISA [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.0/power/control' Runtime PM for PCI Device Intel Corporation 440FX - 82441FX PMC [Natoma] echo 'auto' > '/sys/bus/pci/devices/0000:00:00.0/power/control' Runtime PM for PCI Device Intel Corporation 82371AB/EB/MB PIIX4 ACPI echo 'auto' > '/sys/bus/pci/devices/0000:00:01.3/power/control' Runtime PM for PCI Device XenSource Inc. Xen Platform Device echo 'auto' > '/sys/bus/pci/devices/0000:00:03.0/power/control' Runtime PM for PCI Device Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/power/control' Runtime PM for port ata1 of PCI device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/ata1/power/control' Runtime PM for port ata2 of PCI device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/ata2/power/control' Runtime PM for PCI Device Cirrus Logic GD 5446 echo 'auto' > '/sys/bus/pci/devices/0000:00:02.0/power/control' ____________________________________________________________________ * * * Untunable Software Issues * * * Description ____________________________________________________________________ * * * Optimal Tuned Software Settings * * * Description NMI watchdog should be turned off
... View more
10-09-2023
11:51 AM
1 Kudo
InvokeScriptedProcessor is the closest you'll get to a native (NAR) NiFi processor from my experience. With it, you do NOT need to define all three relationships...if your code handles all possible problems correctly, you could just have "success" or other for your relationship. From what I've seen you do need to have at least one relationship if you're modifying the FlowFile or creating new ones and require session.transfer to send it to that relationship. You don't need to transfer the original...if you read the original FlowFile and create a new one or several new ones you can dispose of the original with session.remove(your_orginal_flow_file)
... View more
10-08-2023
10:09 AM
Is this custom processor something you're writing in Java and will be a NAR or Groovy? Ultimately, what is it this custom processor is trying to achieve?
... View more
10-07-2023
09:48 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab. "Script Body" below: ''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)
... View more