Member since
01-30-2021
22
Posts
0
Kudos Received
0
Solutions
11-13-2023
07:15 AM
@Arash Did you inspect the flowfile (assure its the expected format, etc) and inspect the flowfile attributes (may be more detail into the conflict). Additionally, you can set the processor log level to DEBUG and/or monitor the nifi-app.log for more details. If this worked before, and now doesnt, i would expect something to have changed in the flowfile.
... View more
11-08-2023
07:42 AM
@Arash This error is indicating an issue with your SSL Context Service "SSL Service for elasticsearch cluster". If this was previously working, perhaps the SSL cert has changed? You should check if the elasticsearch cert has been renewed, and if so, update the keystores/truststores accordingly and test again.
... View more
06-14-2023
02:26 PM
This could possibly be achieved via a InvokeScriptedProcessor but I would need to know the source and what the expected output would be. For example, taking what you posted and you want to filter on code=6 and code=8 and only have the values of "other" as individual FlowFiles, then something like this Groovy based code could achive that import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
PropertyDescriptor FILTER_CODES = new PropertyDescriptor.Builder()
.name("FILTER_CODES")
.displayName("Filter Codes")
.description("Codes to Filter On")
.required(true)
.defaultValue("6,8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE, FILTER_CODES]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
String filterCodesString = context.getProperty(FILTER_CODES).getValue()
List<Integer> filterCodes = filterCodesString.split(",").findAll { it.trim().matches("\\d+") }.collect { it as Integer }
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
session.read(flowFile, { inputStream ->
inputStream.eachLine { line ->
if (line?.trim()) {
Map dataMap = jsonSlurper.parseText(line)
if (filterCodes.contains(dataMap.code.toInteger())) {
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile,
{
outputStream -> outputStream.write(jsonOutput.toJson(dataMap.other).getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
}
} as InputStreamCallback)
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
08-25-2022
05:07 PM
@Arash , It's likely that your flow is running on multiple nodes and each node is trying to process every file. When different nodes try to write the same file at the same time you get these errors. You can change your flow to use a List-Fetch pattern. The ListHDFS processor should run only on the primary node and the output of it (the list of files) can be load-balanced across the nodes so that each node will FetchHDFS different files. Cheers, André
... View more
12-29-2021
09:31 AM
@Arash What exactly does the agent flow do? TailFile --> RemoteProcessGroup to NiFi?
... View more
06-15-2021
06:29 AM
@Arash The "nifi.remote.input.socket.port" property is used so that the receiving NiFi can support the RAW transport protocol. This port has nothing to do with where within your dataflows the transferred data is being ingested. The Remote Process Group (RPG) that exists within your MiNiFi dataflow acts as a client. It will execute a background thread every 30 seconds that connects to the target NiFi URL configured in the RPG that fetches NiFi Site-To-Site (S2S) details from the target NiFi. These S2S details include details about the target. This information includes but is not limited to the following: 1. Hostnames of all nodes in target NiFi cluster 2. Remote (RAW) input socket port if configured 3. FlowFile load on each cluster node 4. Remote input and output ports this client is authorized access to. There is no option to configure multiple remote input socket port values. RPG would not know how to use them even if there was. In your case each unique MiNiFi should have be pushing to a different Remote input port on the canvas instead of all of them sending to same input port. Second option is to use a RouteOnAttribute processor after your single Remote input port that routes data based on the "s2s.host" attribute value set on the received FlowFiles. This attribute is set by the sending RPG on each MiNiFi to the hostname of that MiNiFi. You could of course also set a unique attribute on each FlowFile via na UpdateAttribute processor on each MiNiFi also before sending through the RPG. If you found this addressed your query, please take a moment to login and click "Accept" on solutions that helped you. Thank you, Matt
... View more
05-03-2021
07:13 AM
@Arash A FlowFile consists of to parts: - FlowFile content - Content resides on disk in the content repository and not in heap memory. Some components used may need to load content in to memory to perform the function of that component. - FlowFile Attributes/metadata - FlowFiles actively queued in a connection will have their attributes/metadata held in heap memory. Swapping is the only mechanism that can move this FlowFile metadata/attribute data out of heap to swap on disk. It is important to remember that MiNiFi will only start swapping FlowFiles to disk once the swap threshold per connection reaches the configured value(default 20,000). Swap files are created in batches of 10,000. So in a smoothly running flow there should be very little, if any, swapping of FlowFile attributes/metadata happening. This should only be happening at times of data bursts. To keep heap usage down, limit the size of your connection queue backpressure object threshold. The default is 10000 which means a connection would never accumulate enough FlowFiles to trigger a swap file anyway normally (backpressure is a soft limit, so if a source processor is allowed to execute because the downstream connection is not applying backpressure yet and that source processor execution results in 30,000 FlowFiles being created, then all 30,000 are placed on downstream connection which would result in swap files being created). When you are building your dataflow via NiFi that you will use on your MiNiFi agent, be mindful of above and and look at the embedded documentation for the components you will be using in that dataflow. The embedded docs include resource consideration section under each component if there are known impacts on heap memory or cpu. Processors that merge or split FlowFiles commonly used can have an impact on heap memory if not configured wisely. Hope this helps remove some concern and provide useful insight. If you found this helpful, please take a moment to login and click accept on this solution. Matt
... View more
02-02-2021
06:41 AM
1 Kudo
@Arash In your 4 node NiFi cluster, what value do you have set in the "nifi.remote.input.host" property in the nifi.properties file for each of the 4 nodes? It should be the FQDN for each node and not be the same value on all 4 nodes. Form the host where MiNiFi is running, can all 4 of those FQDNs be resolved and reachable over the network? If not, MiNiFI RPG is only going to be able to send successfully to one FQDN it can reach. When the RPG is started it reaches out to the URL configured in the RPG to obtain S2S details from the target host. That target host collects the host details for all currently connected nodes in the cluster and communicates that back to the client (MiNiFi). If all 4 nodes report the same configured FQDN in the "nifi.remote.input.host" property, then client only knows of one FQDN to which it can send FlowFiles over Site-To-Site (S2S). To improve redundancy in the RPG, you can provide a comma separated list of URLS in the RPG configuration so if any one node is down, the RPG can try fetch S2S details from the next host in the comma separated list. Hope this helps, Matt
... View more