Member since
02-01-2022
285
Posts
103
Kudos Received
60
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1182 | 05-15-2025 05:45 AM | |
| 5122 | 06-12-2024 06:43 AM | |
| 8119 | 04-12-2024 06:05 AM | |
| 5995 | 12-07-2023 04:50 AM | |
| 3299 | 12-05-2023 06:22 AM |
06-21-2023
06:40 AM
@DTM In that case you would need to use the DatabaseConnectionPool and jdbc to aws postgres. This will require permissions to allow nifi network to speak to RDS endpoint. If you cant use DBCP, you will have to put something between the RDS and nifi. For example nifi could use invokeHTTP to send/post data to an ec2 instance with some kind of API that can do the connectivity.
... View more
06-16-2023
11:26 AM
You have to create a separate case statement for each column you are trying to update similar to what is done for the shipment number.
... View more
06-16-2023
06:25 AM
@MOUROU Is your NiFi configured to support Oauth2 based user authentication? It looks more like you are using either kerberos-provider or ldap-provider fro user authentication. My suggestion to create a client certificate and use a SSLContext service for client authentication for an automated dataflow like this is because: 1. No need to obtain an token ever. 2. Certs can be created with long expiration time. 3. Tokens are NiFi node specific (same token is not valid for a different NiFi node in a the same NiFi cluster). 4. Same certs works no matter which node in the cluster the invokeHTTP connects with. Matt
... View more
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
07:10 AM
@Fredb This is a very difficult one to solve. Does anyone know what would cause the execution of the sample_Import_Load.bat to run correctly from the windows command prompt, but fail when executed via the ExecuteStreamCommand processor with these errors? This is most likely caused by permission issues. Nifi requires specific permissions against files and scripts it touches or executes from within processors. As such, the error is saying the processor does not know where any of the resources exist to run that .bat file. I do not have any experience with nifi on windows, other than to avoid it, but the solution is likely the same as other operating systems. Make sure the nifi user has full ownership of the file(s). Additionally, it is sometimes possible to find deeper errors looking at the nifi-app.log file while testing and/or setting the log level of the processor to be more aggressive.
... View more
06-13-2023
06:55 AM
1 Kudo
@wert_1311 Your error indicates that two of your roles are missing or incomplete. 1. Data Access Role (arn:aws:iam::8859X2XX911XX:role/Cloudera-datalake-admin-role) is not set up correctly. Please follow the official documentation on required policies for Data Access Role. Missing policies (chunked): arn:aws:iam::8859X2XX911XX:role/Cloudera-datalake-admin-role:s3:AbortMultipartUpload:arn:aws:s3:::cdp-my-bucket/hive_replica_functions_dir/* 2. Data Access Role (arn:aws:iam::8859X2XX911XX:role/Cloudera-ranger-audit-role) is not set up correctly. Please follow the official documentation on required policies for Data Access Role. Missing policies (chunked): arn:aws:iam::8859X2XX911XX:role/Cloudera-ranger-audit-role:s3:PutObject:arn:aws:s3:::cdp-my-bucket/ranger/audit/* Go back to the quickstart and docs, and make sure completd all the setups. Here is a link with more about the credentials https://docs.cloudera.com/cdp-public-cloud/cloud/requirements-aws/topics/mc-aws-req-credential.html There are steps in the page above describing the 2 roles which have conflicts in your error. The error indicates, in the end of each message, which policies are missing.
... View more
06-12-2023
04:48 PM
Great article @saketa 👏
... View more
06-09-2023
04:11 PM
1 Kudo
Hey @steven-matison and @Former Member thank you so much for your help It worked with StandardProxyConfigurationService controller services however I still have issues with StandardRestrictedSSLContextService controller service. Anyway, thank you so much for the help and details steps that helped me a lot. Thank you!!
... View more
06-08-2023
02:11 AM
1 Kudo
@MattWho Thanks much for clearing my confusions around on - how my jobs running automatically when i restart server.. I encountered restart couple of times but never know this feature. You are awesome!!
... View more