Member since
06-16-2020
51
Posts
14
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
597 | 10-23-2024 11:21 AM | |
573 | 10-22-2024 07:59 AM | |
517 | 10-22-2024 07:37 AM | |
298 | 10-21-2024 09:25 AM | |
2046 | 06-16-2023 07:23 AM |
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-05-2023
06:30 AM
@drewski7 The two posts below have solutions to query the json: https://community.cloudera.com/t5/Community-Articles/Running-SQL-on-FlowFiles-using-QueryRecord-Processor-Apache/ta-p/246671 https://community.cloudera.com/t5/Support-Questions/QueryRecord-processor-issue-with-nested-JSON/td-p/338556 Here a the NiFI doc with more high level SQL info: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/latest/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html You can find other examples here in the community just search something like: queryrecord + json.
... View more
06-01-2023
12:40 PM
2 Kudos
@drewski7 You are looking for something like this, for "yesterday": ${now():minus(86400000):format('MM-dd-yyyy hh:mm:ss') } Then change 24 hours to 10 hours and add not reduce: ${now():plus(36000000):format('MM-dd-yyyy hh:mm:ss') } Let me know if this adapts to fit your use case!
... View more
06-01-2023
06:31 AM
1 Kudo
@drewski7 I think the solution you are looking for is to use one of the alternative data stores that lives outside of NiFi with the map cache. The options are Redis, Hbase, Cassandra, Couchbase: They will give you greater control and they are preferred for production and large volumes.
... View more
05-02-2023
08:36 AM
I am running trying to run GetSolr Query in NiFi for all the ranger audits being produced. There's a ton of data in the Solr collection. I can't configure any higher amounts of memory footprint due to the VM Solr is running on but I wanted to know if I could configure NiFi more efficiently. Below is a screenshot of what I have.
... View more
Labels:
- Labels:
-
Apache NiFi
04-21-2023
03:42 AM
Hello @drewski7 The error you're seeing suggests that Ranger Admin can't find the "XXPolicy" during the import process. This can happen due to a corrupted policy file or an issue with Ranger Admin. Restarting Ranger Admin temporarily fixes the issue, but it's not a permanent solution. To troubleshoot the issue, you can check the policy file for errors, look at the Ranger Admin logs for any warnings, verify that the policy exists in the Ranger database, and ensure that Ranger Admin is configured correctly. If you can't identify the root cause, you can contact Cloudera support for assistance. I hope this will help you.
... View more
04-12-2023
07:52 AM
2 Kudos
@drewski7 The removal of quotes from the "command arguments" is expected behavior in the ExecuteStreamCommand processor. This processor was introduced to NiFi more than 10 years ago and was originally designed for a more minimal scope of work including the expectation that FlowFile content would be passed to the script/command being executed. As time passed on the use cases that were trying to be solutioned via the ExecuteStreamCommand expanded; however, handling those use case would potential break users already implemented and working dataflow. So rather then change that default behavior, a new property "Command Arguments Strategy" was added with the original "Command Arguments Property" as the default (legacy method) and a new "Dynamic property arguments" option. This change is part of this JIra and implemented as of Apache NiFi 1.10: https://issues.apache.org/jira/browse/NIFI-3221 In your use case, you'll want to switch to using the "Dynamic property arguments". This will then require you to click on the "+" to add a new dynamic property. The property names MUST use this format: command.argument.<num> So in your case you might try something like: command.argument.1 = -X POST -H referer:${Referer} -H 'Content-Type: application/json' -d '{"newTopics": [{"name":"testing123","numPartitions":3,"replicationFactor":3}], "allTopicNames":["testing123"]}' --negotiate -u : -b /tmp/cookiejar.txt -c /tmp/cookiejar.txt http://SMM-HOSTNAME:8585/api/v1/admin/topics If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
04-10-2023
06:28 AM
Switching to a new version of NiFi solved the problem.
... View more
01-24-2023
03:15 AM
Dear @drewski7 please use this api to get the result. curl -X POST "https://dev-cm.sever.com:7183/api/v43/clusters/cluster-name/services/service/commands/restart" -H "accept: application/json" -k -u admin:pssword replace api v43 version with ur version and cluster name. if you dont have tls change 7183 with 7180 and remove -k option
... View more
12-17-2022
07:44 PM
Yes to allow anonymos user you can define {USER} in ranger nifi policy and you will not see these deny operation.
... View more
- « Previous
- Next »