Member since
06-14-2023
90
Posts
27
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3301 | 12-29-2023 09:36 AM | |
4485 | 12-28-2023 01:01 PM | |
963 | 12-27-2023 12:14 PM | |
454 | 12-08-2023 12:47 PM | |
1435 | 11-21-2023 10:56 PM |
11-22-2023
09:52 PM
^ I've attached the image above. this is how the data looks. I want to clean the first 7 rows and let the 8th row (header row) be first.
... View more
11-22-2023
02:46 PM
Yes, turns out the groovy-yaml module is an optional module in Groovy 3, however groovy-yaml-3.0.17.jar is actually included in nifi-scripting-nar-1.23.2 but the ExecuteScript processor when started throws the exception I mentioned. Possibly this indicates there are other dependencies that would need to be explicitly included in the classpath (e.g. jackson-databind, jackson-dataformat-yaml, groovy-json) I was hoping to use YamlSlurper followed by JsonBuilder to do YAML to JSON conversions. In the end I used the Jackson YamlFactory and ObjectMapper to achieve the desired result.
... View more
11-21-2023
01:56 PM
What's wrong with 1:1 relationship? If you're concerned with performance, then leveraging "Message Demarcator" for the Consume and Publish will provide the best throughput.
... View more
10-09-2023
02:35 PM
Is this the expected output? __________________________________________________________________ P o w e r T O P ____________________________________________________________________ * * * System Information * * * PowerTOP Version v2.11-1-g7ef7f79 ran at Mon Sep 11 11:41:58 2023 Kernel Version Linux version 5.19.0-1025-aws System Name 4.11.amazon CPU Information 1 Intel(R) Xeon(R) CPU E5-2676 v3 @ 2.40GHz OS Information Ubuntu 22.04.2 LTS Target: 1 units/s System: 24.5 wakeup/s CPU: 0.1% usage GPU: 0 ops/s GFX: 0 wakeups/s VFS: 0 ops/s ____________________________________________________________________ * * * Top 10 Power Consumers * * * Usage Events/s Category Description PW Estimate 0.00% 3.8 kWork psi_avgs_work 15.1 mW 0.00% 3.6 Process [PID 493] /usr/bin/containerd 14.5 mW 0.00% 3.6 Timer tick_sched_timer 14.5 mW 0.00% 2.6 Timer hrtimer_wakeup 10.4 mW 0.00% 1.6 Process [PID 677] /usr/bin/containerd 6.46 mW 0.00% 1.2 Interrupt [4] block(softirq) 4.95 mW 0.00% 1.2 Process [PID 1150] ./prometheus --config.file=prometheus.yml 4.76 mW 0.00% 1.1 Process [PID 27] [kcompactd0] 4.37 mW 0.00% 1 Process [PID 37361] /sbin/multipathd -d -s 4.01 mW 0.00% 0.6 Process [PID 30935] ./prometheus --config.file=prometheus.yml 3.02 mW ____________________________________________________________________ * * * Processor Idle State Report * * * Package 0 C3 (pc3) 0.10% C6 (pc6) 0.00% Core 0 C3 (cc3) 38427894381.50% C6 (cc6) 34.30% CPU 0 C0 active 38426768533.80% POLL 0.00% 0.1 ms C1 99.90% 40.6 ms ____________________________________________________________________ * * * Processor Frequency Report * * * Package 0 Idle inf% Core 0 Idle inf% CPU 0 Average 2.4 GHz Idle inf% ____________________________________________________________________ * * * Overview of Software Power Consumers * * * Usage Wakeups/s GPU ops/s Disk IO/s GFX Wakeups/s Category Description PW Estimate 37.8 us/s 3.8 kWork psi_avgs_work 15.1 mW 56.0 us/s 3.6 Process [PID 493] /usr/bin/containerd 14.5 mW 49.9 us/s 3.6 Timer tick_sched_timer 14.5 mW 66.9 us/s 2.6 Timer hrtimer_wakeup 10.4 mW 88.7 us/s 1.6 Process [PID 677] /usr/bin/containerd 6.46 mW 9.4 us/s 1.2 Interrupt [4] block(softirq) 4.95 mW 15.8 us/s 1.2 Process [PID 1150] ./prometheus --config.file=prometheus.yml 4.76 mW 18.2 us/s 1.1 Process [PID 27] [kcompactd0] 4.37 mW 37.4 us/s 1 Process [PID 37361] /sbin/multipathd -d -s 4.01 mW 287.2 us/s 0.6 Process [PID 30935] ./prometheus --config.file=prometheus.yml 3.02 mW 8.7 us/s 0.7 kWork blk_mq_run_work_fn 2.78 mW 5.5 us/s 0.4 kWork wb_workfn 1.79 mW 11.1 us/s 0.3 kWork flush_memcg_stats_dwork 1.40 mW 22.7 us/s 0.25 Interrupt [3] net_rx(softirq) 1.02 mW 3.2 us/s 0.2 Process [PID 705] /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock 795 uW 2.4 us/s 0.2 kWork vmstat_shepherd 794 uW 1.8 us/s 0.2 Process [PID 14] [rcu_sched] 793 uW 0.7 us/s 0.2 Interrupt [2] net tx(softirq) 791 uW 0.5 us/s 0.2 kWork neigh_managed_work 791 uW 13.9 us/s 0.15 Process [PID 37134] sshd: ubuntu@pts/0 614 uW 4.0 us/s 0.15 Process [PID 37494] powertop --csv=powertop212.csv 599 uW 0.6 us/s 0.15 kWork blk_mq_timeout_work 593 uW 11.5 us/s 0.1 Process [PID 37372] /usr/sbin/chronyd -F 1 413 uW 2.5 us/s 0.1 Timer sched_rt_period_timer 399 uW 1.7 us/s 0.1 Timer watchdog_timer_fn 398 uW 1.4 us/s 0.1 Process [PID 31375] /usr/lib/snapd/snapd 397 uW 0.2 us/s 0.1 kWork wb_update_bandwidth_workfn 395 uW 0.1 us/s 0.1 kWork ext4_discard_work 395 uW 191.6 us/s Interrupt [48] timer0 299 uW 3.1 us/s 0.05 Process [PID 37336] /usr/libexec/packagekitd 202 uW 2.9 us/s 0.05 Process [PID 418] /usr/sbin/cron -f -P 202 uW 1.3 us/s 0.05 Process [PID 13] [ksoftirqd/0] 199 uW 0.4 us/s 0.05 kWork ext4_end_io_rsv_work 198 uW 0.1 us/s 0.05 kWork neigh_periodic_work 198 uW 0.1 us/s 0.05 kWork process_srcu 198 uW 41.9 us/s Process [PID 37337] [kworker/0:2] 0 mW 41.7 us/s Timer delayed_work_timer_fn 0 mW 40.3 us/s Interrupt [1] timer(softirq) 0 mW 28.4 us/s Process [PID 36708] [kworker/u30:0] 0 mW 14.2 us/s Timer process_timeout 0 mW 12.1 us/s Timer tcp_orphan_update 0 mW 11.0 us/s Process [PID 124] [jbd2/xvda1-8] 0 mW 10.8 us/s Process [PID 37347] /sbin/multipathd -d -s 0 mW 6.0 us/s Process [PID 37150] [kworker/u30:2] 0 mW 5.7 us/s Interrupt [9] RCU(softirq) 0 mW 5.4 us/s Timer clocksource_watchdog 0 mW 4.9 us/s Timer blk_stat_timer_fn 0 mW 4.9 us/s Interrupt [55] blkif 0 mW 4.3 us/s Process [PID 29] [khugepaged] 0 mW 3.7 us/s Process [PID 41] [kworker/0:1H] 0 mW 3.7 us/s Process [PID 746] /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock 0 mW 2.3 us/s Process [PID 15] [migration/0] 0 mW 2.2 us/s Interrupt [56] eth0 0 mW 1.3 us/s Timer mix_interrupt_randomness 0 mW 1.1 us/s Timer blk_rq_timed_out_timer 0 mW 1.0 us/s kWork vmstat_update 0 mW 0.9 us/s Timer writeout_period 0 mW 0.7 us/s Timer commit_timeout 0 mW 0.7 us/s Timer tcp_write_timer 0 mW 0.4 us/s kWork lru_add_drain_per_cpu 0 mW 0.2 us/s Timer tcp_delack_timer 0 mW 0.2 us/s kWork wq_barrier_func 0 mW 0.1 us/s Timer tcp_keepalive_timer 0 mW 0.1 us/s Timer neigh_timer_handler 0 mW 0.0 us/s kWork srcu_invoke_callbacks 0 mW 0.0 us/s Timer srcu_delay_timer 0 mW The system baseline power is estimated at: 99.1 m W ____________________________________________________________________ * * * Device Power Report * * * Usage Device Name PW Estimate 0.10% CPU misc 99.1 mW 0.10% CPU core 0 mW 0.10% DRAM 0 mW 100.00% PCI Device: Intel Corporation 82371SB PIIX3 ISA [Natoma/Triton II] 100.00% PCI Device: Intel Corporation 440FX - 82441FX PMC [Natoma] 100.00% PCI Device: Intel Corporation 82371AB/EB/MB PIIX4 ACPI 100.00% PCI Device: XenSource Inc. Xen Platform Device 100.00% PCI Device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] 100.00% PCI Device: Cirrus Logic GD 5446 1.0 pkts/s Network interface: eth0 (vif) 0.0 pkts/s nic:docker0 0.00% runtime-alarmtimer.0.auto 0.00% runtime-reg-dummy 0.00% runtime-kgdboc 0.00% runtime-platform-framebuffer.0 0.00% runtime-eisa.0 0.00% runtime-Fixed MDIO bus.0 0.00% runtime-PNP0800:00 0.00% runtime-PNP0103:00 0.00% runtime-serial8250 0.00% runtime-i8042 0.00% runtime-pcspkr ____________________________________________________________________ * * * Process Device Activity * * * Process Device systemd /dev/autofs /dev/kmsg /dev/rfkill acpid /dev/input/event0 /dev/input/event1 /dev/input/event2 systemd-logind /dev/input/event0 /dev/input/event1 /dev/input/event2 systemd-journal /dev/kmsg /dev/kmsg multipathd /dev/mapper/control /dev/mapper/control ____________________________________________________________________ * * * AHCI ALPM Residency Statistics - Not supported on this macine * * * ____________________________________________________________________ * * * Software Settings in Need of Tuning * * * Description Script VM writeback timeout echo '1500' > '/proc/sys/vm/dirty_writeback_centisecs' Runtime PM for PCI Device Intel Corporation 82371SB PIIX3 ISA [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.0/power/control' Runtime PM for PCI Device Intel Corporation 440FX - 82441FX PMC [Natoma] echo 'auto' > '/sys/bus/pci/devices/0000:00:00.0/power/control' Runtime PM for PCI Device Intel Corporation 82371AB/EB/MB PIIX4 ACPI echo 'auto' > '/sys/bus/pci/devices/0000:00:01.3/power/control' Runtime PM for PCI Device XenSource Inc. Xen Platform Device echo 'auto' > '/sys/bus/pci/devices/0000:00:03.0/power/control' Runtime PM for PCI Device Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/power/control' Runtime PM for port ata1 of PCI device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/ata1/power/control' Runtime PM for port ata2 of PCI device: Intel Corporation 82371SB PIIX3 IDE [Natoma/Triton II] echo 'auto' > '/sys/bus/pci/devices/0000:00:01.1/ata2/power/control' Runtime PM for PCI Device Cirrus Logic GD 5446 echo 'auto' > '/sys/bus/pci/devices/0000:00:02.0/power/control' ____________________________________________________________________ * * * Untunable Software Issues * * * Description ____________________________________________________________________ * * * Optimal Tuned Software Settings * * * Description NMI watchdog should be turned off
... View more
10-09-2023
11:51 AM
1 Kudo
InvokeScriptedProcessor is the closest you'll get to a native (NAR) NiFi processor from my experience. With it, you do NOT need to define all three relationships...if your code handles all possible problems correctly, you could just have "success" or other for your relationship. From what I've seen you do need to have at least one relationship if you're modifying the FlowFile or creating new ones and require session.transfer to send it to that relationship. You don't need to transfer the original...if you read the original FlowFile and create a new one or several new ones you can dispose of the original with session.remove(your_orginal_flow_file)
... View more
10-08-2023
10:59 PM
Its in Java. It basically needs to split a record based on some input fields, and then also filter records.
... View more
10-07-2023
09:48 PM
Created this Python ExecuteScript NiFi processor that extracts the files of a ZipFile (including those in subdirectories) into individual FlowFiles. It all happens inside of NiFi and not fully tested but it worked with a simple example in my lab. "Script Body" below: ''' Extract Zip Files '''
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback
import zipfile
from io import BytesIO
class PyInputStreamCallback(InputStreamCallback):
''' InputStream Callback '''
def __init__(self):
self.zip_file = None
def process(self, input_stream):
''' Process our InputStream '''
zip_buffer = BytesIO(IOUtils.toByteArray(input_stream))
self.zip_file = zipfile.ZipFile(zip_buffer, "r")
class PyOutputStreamCallback(OutputStreamCallback):
''' OutputStream Callback '''
def __init__(self, file):
self.file = file
def process(self, output_stream):
''' Process our OutputStream '''
output_stream.write(self.file.read())
flow_file = session.get()
if flow_file:
input_stream_callback = PyInputStreamCallback()
output_stream_callback = PyOutputStreamCallback
session.read(flow_file, input_stream_callback)
zip_filename = flow_file.getAttribute("filename")
zip_file = input_stream_callback.zip_file
for name in (name for name in zip_file.namelist() if not name.endswith("/")):
new_flow_file = session.create()
new_flow_file = session.putAttribute(new_flow_file, "filename", name)
new_flow_file = session.putAttribute(new_flow_file, "zip_filename", zip_filename)
new_flow_file = session.write(
new_flow_file,
output_stream_callback(zip_file.open(name))
)
session.transfer(new_flow_file, REL_SUCCESS)
zip_file.close()
session.remove(flow_file)
... View more
09-18-2023
01:11 PM
Forgot the ";" in the replacement value $1($2='$3');
... View more
09-18-2023
12:45 PM
@Kumar_ Check the configuration value boxes to see if you added a line return. If the configuration pop-up box shows more then just a "1" line then you have a lone return at the end of line "1". Click on line to and hit delete to get rid of the line return if present. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-16-2023
04:35 PM
I would do this with a Groovy based InvokeScriptedProcessor Using this code: import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
List outputData = []
data.each { order ->
outputData.add("${order.orderId} ${order.orderName}")
order.orderItems.each { orderItem ->
outputData.add("${orderItem.orderItemId} ${orderItem.orderItemName}")
}
}
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(outputData.join('\n').getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
session.transfer(newFlowFile, REL_SUCCESS)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor() Don't let all that code scare you when the part that's doing the formatting is only these lines: This is the generated output:
... View more