Member since
06-14-2023
95
Posts
33
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3843 | 12-29-2023 09:36 AM | |
5653 | 12-28-2023 01:01 PM | |
1111 | 12-27-2023 12:14 PM | |
558 | 12-08-2023 12:47 PM | |
1749 | 11-21-2023 10:56 PM |
12-11-2023
04:07 PM
@edtech Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
... View more
12-08-2023
01:52 PM
I don't see EvaluateJsonPath give options so it might be something you'd have to handle on your own...personally I'd do it via a Groovy scripted processor for greater control and performance.
... View more
12-08-2023
01:45 PM
If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted. import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void initialize(ProcessorInitializationContext context) {
log = context.logger
}
Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>()
relationships.add(REL_FAILURE)
relationships.add(REL_SUCCESS)
return relationships
}
Collection<ValidationResult> validate(ValidationContext context) {
}
PropertyDescriptor getPropertyDescriptor(String name) {
}
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
List<PropertyDescriptor> getPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>()
descriptors.add(BATCH_SIZE)
return Collections.unmodifiableList(descriptors)
}
String getIdentifier() {
}
void onScheduled(ProcessContext context) throws ProcessException {
}
void onUnscheduled(ProcessContext context) throws ProcessException {
}
void onStopped(ProcessContext context) throws ProcessException {
}
void setLogger(ComponentLog logger) {
}
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { FlowFile flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
flowFile = session.write(flowFile, { inputStream, outputStream ->
List<Map> data = jsonSlurper.parse(inputStream)
data = data.collect { Map resouce ->
Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
[
"Name": tags.Name,
"Owner": tags.Owner,
"ResourceId": resouce.ResourceId,
"Resourcename": resouce.ResourceId.split("/").last(),
"Tags": resouce.Tags
]
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAllAttributes(flowFile, customAttributes)
session.transfer(flowFile, REL_SUCCESS)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
12-08-2023
01:07 PM
What processor are you using to send the data?
... View more
12-08-2023
12:47 PM
A similar question was recently asked. Kafka connections are meant to be persistent. If you want to handle what you're asking, you'll have to custom build a solution that monitors the queues and stop/starts the processors. All of these can be achieved via NiFi REST API.
... View more
12-07-2023
04:05 PM
Have you tried playing with these settings? Or perhaps an instance with faster disk since that might be the bottleneck?
... View more
12-05-2023
04:41 AM
@SAMSAL , Thank you. this works.
... View more
12-01-2023
08:57 AM
@Jisson I don't see ExecuteStream in the thread dump provided. Let's clarify first what you mean by "stuck"... When the processor is in this "stuck" state, does the processor indicate that it has an active thread? A NiFi processor will show a small number in its upper right corner when it has an active thread(s). Below example shows an ExecuteStreamCommand processor with "1" active thread: If your processor has no active threads, it is not stuck/hung. It is simply does not have a thread to execute the command. This could happen if all thread from the max timer driven thread pool in NiFi are already being used by other components. We would call this a thread starved processor. If your CPU load average is good, you could increase the size of the thread pool to see if that helps. NiFi out-of-the-box sets the "Maximum Timer Driven Thread Count" Pool to 10. You can change this from the NiFi Ui --> global menu (upper right corner) --> Controller Settings --> General tab. If your processor does show an active threads, i'd expect to see that thread in the thread dump. Also keep in mind that a single thread dump is not very useful. A thread may not be HUNG, but rather long running for example. So getting a series of thread dumps spread out to compare would allow you to see if the thread stack is changing over time indicating not hung but slow. In the case of your ExecuteStreamCommand processor, it is calling a custom python script and the waits for the return from that script. Then comes the challenge is the thread dump indicates it is waiting on your python script return to figure out why your python scripts is hanging or taking a very long time all of a sudden. Not something that can be troubleshot through NiFi. Hope this helps you in your troubleshooting journey. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
12-01-2023
08:21 AM
@SAMSAL The managed Authorizer uses the file-access-policy-provider (generates the authorizations.xml if it does no already exist) and then a user-group-provider. In your case that would make most sense to be the ldap-user-group-provider. You may also want to use the Composite-configurable-user-group-provider (configure it with ldap-user-group-provider and file-user-group-provider). Having both a file based provider and ldap provider allows sycning of ldap users and groups form ldap automatically as well as the file provider allowing you to manually add non ldap user/client identities for authorization as well. Non ldap client/user identities might be certifcate based clients like other NiFi nodes/instance, etc.. Within the file-access-policy-provider you define the initial admin identity. That user identity could be set to your ldap user account identity. Then on first start up with managed provider, it generates the authorizations.xml file seeded with the policies necessary for that initial admin user identity to act as admin. So you could skip the single-user-provider step. Matt
... View more
11-30-2023
12:26 AM
yea I tried using KeyScanOptions as well but the same error
... View more