Member since
05-05-2022
20
Posts
3
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
815 | 05-11-2023 04:46 AM | |
3673 | 07-01-2022 02:55 AM |
05-11-2023
04:46 AM
1 Kudo
Ok i fixed it myself actually 🙂 In case someone is interested here's my solution: import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def ff = session.get()
if(!ff)return
def lookup = context.controllerServiceLookup
//def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == 'DBCPConnectionPool_GP_prj_gistek_preprod' }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)
session.read(ff, {inputStream ->
//def statement = "INSERT INTO publish.templates (excel_template) VALUES (?)"
sql.executeInsert "INSERT INTO publish.templates (excel_template) VALUES (${inputStream.getBytes()})"
} as InputStreamCallback)
conn?.close()
session.transfer(ff, REL_SUCCESS)
session.commit()
... View more
05-11-2023
01:12 AM
Hello! So I am having a problem with the script I wrote. I am trying to save content of a flowfiles I am getting, which are excel files. But I am getting an error (had to delete image due to corporate safety) Here's my script: import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def ff = session.get()
if(!ff)return
def lookup = context.controllerServiceLookup
//def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == 'DBCPConnectionPool_GP_prj_gistek_preprod' }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)
session.read(ff, {inputStream ->
def statement = "INSERT INTO publish.templates (excel_template) VALUES (?)"
def params = [inputStream]
sql.executeInsert(statement, params)} as InputStreamCallback)
conn?.close()
session.transfer(ff, REL_SUCCESS)
session.commit() I tested connection with data base by trying to insert some test values and it worked fine. Apparently there's some problems with content of a flow file, or should I decode/encode it fist? Any help would be appriciated, thank you!
... View more
Labels:
- Labels:
-
Apache NiFi
11-07-2022
02:42 AM
Thank you kinds sir! My problem was solved!!
... View more
11-07-2022
02:28 AM
So I am using SplitXml processor to split flowfile. After some transformations I am merging those flowfiles back together using MrgeContent. After mergning I have noticed that not all of the flow files merged together. e.g. I have splitted my flowfile on 100 parts and after merging I've got 2 files with 50 records of my initial flow file each. When i set Run schedule option to 10 seconds instead of 1 it merged all files together. But what if i don't know how many flowfiles i will get after split and how much time it will take for merge processor to merge it? I use flowfile name as Correlation attribute name. Thank you beforehand!
... View more
Labels:
- Labels:
-
Apache NiFi
08-01-2022
04:56 AM
Hello! Thank you for your comprehensive answer. But I sitll don't quite understand two things. 1. I am confused what does wildcard & do? Does it return only key or does it return the indices of elements as well, depending on level of &n? 2. Are [&4] and [&2] numbers like indices in arrays or strings like in dictionary "key":"value" pairs.? Thank you beforehand!
... View more
07-29-2022
03:55 AM
Hello! I've read this and this and this but explanations are pretty scarce. I understand what "&" means. But I can't wrap my head around what [&] means =\ Can someone please explain me what this [&] does applicable to this example? What exactly happening in lines? : "@1": "outer[&4].inner[&2].t",
"@(3,eventTypeCode)": "outer[&4].inner[&2].etc" JSON: {
"uID": 1000358546,
"events": [{
"eventTypeCode": "FEEDBACK",
"transports": ["PUSH", "SMS"]
}, {
"eventTypeCode": "MARKETING",
"transports": ["PUSH", "EMAIL"]
}, {
"eventTypeCode": "ORDER_STATUS",
"transports": ["SOC_VK"]
}
]
} SPEC: [{
"operation": "shift",
"spec": {
"events": {
"*": {
"transports": {
"*": {
"*": {
"@1": "outer[&4].inner[&2].t",
"@(3,eventTypeCode)": "outer[&4].inner[&2].etc"
}
}
}
}
},
"*": "&"
}
}] RESULT: {
"uID" : 1000358546,
"outer" : [ {
"inner" : [ {
"t" : "PUSH",
"etc" : "FEEDBACK"
}, {
"t" : "SMS",
"etc" : "FEEDBACK"
} ]
}, {
"inner" : [ {
"t" : "PUSH",
"etc" : "MARKETING"
}, {
"t" : "EMAIL",
"etc" : "MARKETING"
} ]
}, {
"inner" : [ {
"t" : "SOC_VK",
"etc" : "ORDER_STATUS"
} ]
} ]
} Thanks beforehand!
... View more
Labels:
- Labels:
-
Apache NiFi
07-01-2022
03:17 AM
Hello! So I am trying to figure out how to work with the EnforceOrder processor. I have 200 flowfiles and every flowfile has attribute partition_number. Many flowfiles has the same filename. I wanna use EnforceOrder to merge them accroding to filename and partition_number attribute but non of the flowfiles ever go to success relationship queue. Here's my settings: What am I doing wrong?
... View more
Labels:
- Labels:
-
Apache NiFi
07-01-2022
02:55 AM
2 Kudos
I think you should refer to Ni-Fi cookbook That is pretty much THE only instruction for making scripts for ni fi
... View more
06-24-2022
02:25 AM
Hello! I am trying to read JSON body from a flowfile and write the length of a JSON array of elements to new flowfile attribute. But somehow I am getting aforementioned error all the time. Here is my script. from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
#Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
jsn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
array = json.loads(jsn) # type: dict
i = 0
while i <= 1: # ищем вложенность второго уровня, чтобы посчитать кол-во элементов (записей в таблице)
root_key = list(array.keys())[0]
array = array[root_key]
i += 1
self.length = str(len(array))
def get_length_of_array(self):
return self.length
# end class
flowfile = session.get()
if(flowfile != None):
flowfile = session.write(flowfile, PyStreamCallback())
flowfile = session.putAttribute(flowfile, "length", PyStreamCallback().get_length_of_array())
session.transfer(flowFile, REL_SUCCESS) Can someone please explain me what am I doing wrong? Thank you beforehand!
... View more
Labels:
- Labels:
-
Apache NiFi
06-14-2022
12:33 AM
Hello! So I can't access NiFi UI because the server if probably down exactly because of amount of files that have been stuck at queue. So the question is how can I find out the size of the queue for the moment using may be logs or maybe there are some other ways to do so? Thank you beforehand!
... View more
Labels:
- Labels:
-
Apache NiFi