Member since
Kudos Received
My Accepted Solutions
Title | Views | Posted |
978 | 05-11-2023 04:46 AM | |
4104 | 07-01-2022 02:55 AM |
04:46 AM
1 Kudo
Ok i fixed it myself actually 🙂 In case someone is interested here's my solution: import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def ff = session.get()
def lookup = context.controllerServiceLookup
//def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == 'DBCPConnectionPool_GP_prj_gistek_preprod' }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn), {inputStream ->
//def statement = "INSERT INTO publish.templates (excel_template) VALUES (?)"
sql.executeInsert "INSERT INTO publish.templates (excel_template) VALUES (${inputStream.getBytes()})"
} as InputStreamCallback)
session.transfer(ff, REL_SUCCESS)
... View more
01:12 AM
Hello! So I am having a problem with the script I wrote. I am trying to save content of a flowfiles I am getting, which are excel files. But I am getting an error (had to delete image due to corporate safety) Here's my script: import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def ff = session.get()
def lookup = context.controllerServiceLookup
//def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == 'DBCPConnectionPool_GP_prj_gistek_preprod' }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn), {inputStream ->
def statement = "INSERT INTO publish.templates (excel_template) VALUES (?)"
def params = [inputStream]
sql.executeInsert(statement, params)} as InputStreamCallback)
session.transfer(ff, REL_SUCCESS)
session.commit() I tested connection with data base by trying to insert some test values and it worked fine. Apparently there's some problems with content of a flow file, or should I decode/encode it fist? Any help would be appriciated, thank you!
... View more
- Labels:
Apache NiFi
02:42 AM
Thank you kinds sir! My problem was solved!!
... View more
02:28 AM
So I am using SplitXml processor to split flowfile. After some transformations I am merging those flowfiles back together using MrgeContent. After mergning I have noticed that not all of the flow files merged together. e.g. I have splitted my flowfile on 100 parts and after merging I've got 2 files with 50 records of my initial flow file each. When i set Run schedule option to 10 seconds instead of 1 it merged all files together. But what if i don't know how many flowfiles i will get after split and how much time it will take for merge processor to merge it? I use flowfile name as Correlation attribute name. Thank you beforehand!
... View more
- Labels:
Apache NiFi
04:56 AM
Hello! Thank you for your comprehensive answer. But I sitll don't quite understand two things. 1. I am confused what does wildcard & do? Does it return only key or does it return the indices of elements as well, depending on level of &n? 2. Are [&4] and [&2] numbers like indices in arrays or strings like in dictionary "key":"value" pairs.? Thank you beforehand!
... View more
03:55 AM
Hello! I've read this and this and this but explanations are pretty scarce. I understand what "&" means. But I can't wrap my head around what [&] means =\ Can someone please explain me what this [&] does applicable to this example? What exactly happening in lines? : "@1": "outer[&4].inner[&2].t",
"@(3,eventTypeCode)": "outer[&4].inner[&2].etc" JSON: {
"uID": 1000358546,
"events": [{
"eventTypeCode": "FEEDBACK",
"transports": ["PUSH", "SMS"]
}, {
"eventTypeCode": "MARKETING",
"transports": ["PUSH", "EMAIL"]
}, {
"eventTypeCode": "ORDER_STATUS",
"transports": ["SOC_VK"]
} SPEC: [{
"operation": "shift",
"spec": {
"events": {
"*": {
"transports": {
"*": {
"*": {
"@1": "outer[&4].inner[&2].t",
"@(3,eventTypeCode)": "outer[&4].inner[&2].etc"
"*": "&"
}] RESULT: {
"uID" : 1000358546,
"outer" : [ {
"inner" : [ {
"t" : "PUSH",
"etc" : "FEEDBACK"
}, {
"t" : "SMS",
"etc" : "FEEDBACK"
} ]
}, {
"inner" : [ {
"t" : "PUSH",
"etc" : "MARKETING"
}, {
"t" : "EMAIL",
"etc" : "MARKETING"
} ]
}, {
"inner" : [ {
"t" : "SOC_VK",
"etc" : "ORDER_STATUS"
} ]
} ]
} Thanks beforehand!
... View more
- Labels:
Apache NiFi
03:17 AM
Hello! So I am trying to figure out how to work with the EnforceOrder processor. I have 200 flowfiles and every flowfile has attribute partition_number. Many flowfiles has the same filename. I wanna use EnforceOrder to merge them accroding to filename and partition_number attribute but non of the flowfiles ever go to success relationship queue. Here's my settings: What am I doing wrong?
... View more
- Labels:
Apache NiFi
02:55 AM
2 Kudos
I think you should refer to Ni-Fi cookbook That is pretty much THE only instruction for making scripts for ni fi
... View more
02:25 AM
Hello! I am trying to read JSON body from a flowfile and write the length of a JSON array of elements to new flowfile attribute. But somehow I am getting aforementioned error all the time. Here is my script. from import IOUtils
from java.nio.charset import StandardCharsets
from import StreamCallback
import json
#Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
def process(self, inputStream, outputStream):
jsn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
array = json.loads(jsn) # type: dict
i = 0
while i <= 1: # ищем вложенность второго уровня, чтобы посчитать кол-во элементов (записей в таблице)
root_key = list(array.keys())[0]
array = array[root_key]
i += 1
self.length = str(len(array))
def get_length_of_array(self):
return self.length
# end class
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, PyStreamCallback())
flowfile = session.putAttribute(flowfile, "length", PyStreamCallback().get_length_of_array())
session.transfer(flowFile, REL_SUCCESS) Can someone please explain me what am I doing wrong? Thank you beforehand!
... View more
- Labels:
Apache NiFi
12:33 AM
Hello! So I can't access NiFi UI because the server if probably down exactly because of amount of files that have been stuck at queue. So the question is how can I find out the size of the queue for the moment using may be logs or maybe there are some other ways to do so? Thank you beforehand!
... View more
- Labels:
Apache NiFi