Member since
05-05-2022
20
Posts
3
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
779 | 05-11-2023 04:46 AM | |
3519 | 07-01-2022 02:55 AM |
05-11-2023
04:46 AM
1 Kudo
Ok i fixed it myself actually 🙂 In case someone is interested here's my solution: import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def ff = session.get()
if(!ff)return
def lookup = context.controllerServiceLookup
//def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == 'DBCPConnectionPool_GP_prj_gistek_preprod' }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)
session.read(ff, {inputStream ->
//def statement = "INSERT INTO publish.templates (excel_template) VALUES (?)"
sql.executeInsert "INSERT INTO publish.templates (excel_template) VALUES (${inputStream.getBytes()})"
} as InputStreamCallback)
conn?.close()
session.transfer(ff, REL_SUCCESS)
session.commit()
... View more
11-07-2022
02:42 AM
Thank you kinds sir! My problem was solved!!
... View more
08-01-2022
06:22 AM
1 Kudo
@Brenigan , 1. It depends on the context and the level of &n. In the example above, &1 return the element in the transports array (e.g. "PUSH"), while &2 returns the numeric index of that element in the array (e.g. 0). 2. &4 and &2 are numeric array indexes. outer[&4] means that the output will be in the &4 position of an array called outer. That element of the array will have and attribute called inner and the &2 position of the inner array will have two attributes, t and etc, with the specified values. Cheers, André
... View more
07-04-2022
02:00 AM
Thanks. Using examples from that cookbook, I could make it work. Here is my code which works: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
pass
def process(self, outputStream):
with open("D:\\Work\\nifi test\\custom processor input\\random_json.json") as f:
file_content = f.read()
outputStream.write(bytearray(file_content.encode('utf-8')))
flowFile = session.create()
if(flowFile != None):
flowFile = session.write(flowFile, PyOutputStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", 'input_file.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit() Next, I will figure out if I can add the local directory path as a property and read that, instead of hardcoding it in the script.
... View more
07-01-2022
08:57 AM
1 Kudo
@Brenigan Are you running your dataflow on a standalone NiFi install or a NiFi cluster install? If a multi node NiFi cluster, are all 200 FlowFiles on the same NiFi node? Does your partition_number start at 0? Do you see your FlowFiles getting routes to the overtook relationship after 10 minutes? Assuming all the following: 1. All FlowFiles are on same NiFi node 2. partition_number starts at "0" and "increments consistently by "1" 3. All FlowFiles have same filename 4. Wait relationship is route via a connection back to the EnforceOrder processor. You should be seeing: 1. All FlowFiles routed to the "wait" relationship until a FlowFile with attribute "partition_number" equal to "0" is processed which will result in that FlowFile routing to success. 2. Other FlowFiles meeting above 4 criteria will continue to loop through wait until "partition_number" attribute with value "1" is seen and routed to success. 3. If a FlowFile in incremental order is missing, all FlowFiles with a partition_number higher than the next expected integer will continue to route to wait relationship. 4. after the configured "wait timeout" any FlowFile that has been waiting this long will be routed to the "overtook" relationship. You can right click on a connection holding the FlowFiles and list the queue. From there you can select the "view details" icon to the far left to examine the FlowFiles current attributes. You should see a new attribute "EnforceOrder.expectedOrder" that contains the next expected integer value that the group this FlowFile belongs to is waiting for. You will also find your "partition_number" which will have the current integer for this FlowFile. If you have your FlowFiles distributed across multiple nodes in a NiFi cluster, you will need to get all FlowFiles with the same "group identifier" moved to the same NiFi node in order to enforce order (you can not enforce order across different nodes in a NiFi cluster). You can accomplish this by editing the connection feeding your enforceOrder processor and under settings select a "Load Balancing Strategy" of "Partition by Attribute" using the "filename" attribute that you are using as your group identifier in the Enforce Order processor. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
06-24-2022
10:15 PM
@Brenigan , The issue that you are seeing is because you are instantiating PyStreamCallback twice. You should do it once and reference that object in the subsequent calls to the session functions. The code below works as you'd expect: from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
#Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
self.length = 0
def process(self, inputStream, outputStream):
jsn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
array = json.loads(jsn) # type: dict
i = 0
while i <= 1:
root_key = list(array.keys())[0]
array = array[root_key]
i += 1
self.length = str(len(array))
def get_length_of_array(self):
return self.length
# end class
flowfile = session.get()
if(flowfile != None):
reader = PyStreamCallback()
flowfile = session.write(flowfile, reader)
flowfile = session.putAttribute(flowfile, "length", reader.get_length_of_array())
session.transfer(flowfile, REL_SUCCESS) There is a simpler way to do what you're trying to do, though. For example, say you have the following JSON object in the incoming flowfile: {
"root": {
"items": [1, 2]
}
} If you want to set the flowfile "length" attribute with the length of the "items" array, you can simply use the EvaluateJsonPath processor with the following configuration: Cheers, André
... View more
06-14-2022
02:56 AM
1 Kudo
Details you can only get through UI or API , considering web endpoint is reachable, Seems in this case NiFi service is up but UI is not available or NiFi service is completely down , can be confirm by checking its process status. So you need find way to bring back the NIFI service along with UI in minimalistic load by stopping all flows by setting nifi.flowcontroller.autoResumeState to false and start the service, by doing this nifi will start with all the queued data but it will not do any processing, this can give you the UI but since you have said that are too much data so startup will take time. If you found this response assisted with your issue, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Chandan
... View more
06-05-2022
08:27 AM
I've installed the last version of ni-fi and it solved all problems! Thank you!
... View more
05-31-2022
05:55 PM
Hi, What are you trying to do with those attributes? It doesnt make sense that you need to create an attribute for each year value
... View more
05-06-2022
11:20 AM
@Brenigan The ExtractText processor will support 1 to 40 capture groups in a Java regular expression. The user added property defines the attribute in to which the value from capture group one will be placed. The processor creates additional attribute by capture group number. so in your case you added a new property with: This is a single capture group which reads 4 digits. So in you example (9999, text) this would result in creating attributes: number = 9999 <-- alway contains value from capture group 1. number.1 = 9999 <-- the ".1" signifies the capture group the value came from. number.0 contains the entire matching java regular expression. This attribute is controlled by this property: Setting to false will stop this one from being added to your FlowFiles. To help understand this better, let's look at another example: Suppose your java regular expression looked like this with 2 capture groups instead: Also assume we had "Include Capture Group 0" set to "true" Now with same source text of "9999, text", we would expect to see these attributes added: number = 9999 <-- alway contains value from capture group 1. number.0 = 9999, text <-- The complete match from the java regular expression. number.1 = 9999 <-- The ".1" signifies the capture group the value came from number.2 = text <-- the ".2" signifies the capture group the value came from. Setting "false" for "Include Capture Group 0" would have resulted in "number.0" not being created; however, number, number.1, and number.2 would have still been created. This functionality allows this processor component to handle multiple use cases. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more