Member since
02-01-2022
274
Posts
97
Kudos Received
60
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
438 | 05-15-2025 05:45 AM | |
3469 | 06-12-2024 06:43 AM | |
6094 | 04-12-2024 06:05 AM | |
4174 | 12-07-2023 04:50 AM | |
2241 | 12-05-2023 06:22 AM |
06-21-2023
06:40 AM
@DTM In that case you would need to use the DatabaseConnectionPool and jdbc to aws postgres. This will require permissions to allow nifi network to speak to RDS endpoint. If you cant use DBCP, you will have to put something between the RDS and nifi. For example nifi could use invokeHTTP to send/post data to an ec2 instance with some kind of API that can do the connectivity.
... View more
06-20-2023
06:31 AM
@Phil_I_AM You should be able to use InvokeHttp to build any REST api calls. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.InvokeHTTP/index.html The approach I recommend is to have a fully working POSTMAN api call with know url, get/post parameters, and required authentication headers. With this working call and required details, work to duplicate the setup in InvokeHttp until operational.
... View more
06-16-2023
11:26 AM
You have to create a separate case statement for each column you are trying to update similar to what is done for the shipment number.
... View more
06-16-2023
06:25 AM
@MOUROU Is your NiFi configured to support Oauth2 based user authentication? It looks more like you are using either kerberos-provider or ldap-provider fro user authentication. My suggestion to create a client certificate and use a SSLContext service for client authentication for an automated dataflow like this is because: 1. No need to obtain an token ever. 2. Certs can be created with long expiration time. 3. Tokens are NiFi node specific (same token is not valid for a different NiFi node in a the same NiFi cluster). 4. Same certs works no matter which node in the cluster the invokeHTTP connects with. Matt
... View more
06-16-2023
05:39 AM
@nuxeo-nifi Wanted to first make some suggestions to help us better respond: Include a screen shot of your entire flow Include as much detail as possible about how certain parts are completed. For example: how is the CSV processed. Indicate what you have tried or what you see "toward the end of processing" including details of what you expect. For Example: a single update statement w/ fail and success counts, or insert failures into 1 table and errors into another. Not knowing this, we have to make some assumptions that could possibly result in providing an inaccurate solution or turn the post into long drawn out dialouge, versus simple question, and direct answer/solution. Making those assumptions, I could assume at the bottom of your flow, you have a success and failure relationship. One suggestion would be to use (MergeRecord/MergeContent) to obtain the counts, then maybe replaceText to fabricate the counts into correct shape flowfile and route to an ExecuteSQL processor to execute your SQL statements. Another alternative solution could be to send errors and success to separate ExecuteSQL processors in a way that for each flowfile it just executes a SQL statement that increments the existing count. This would save the need to merge and get totals. Maybe like these in each ExecuteSQL: UPDATE table SET success = success +1 WHERE tablename ='something'
UPDATE table SET errors = errors +1 WHERE tablename ='something'
... View more
06-14-2023
03:04 PM
Does it need to be ECMA? I can probably whip something up tomorrow using Groovy.
... View more
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
07:10 AM
@Fredb This is a very difficult one to solve. Does anyone know what would cause the execution of the sample_Import_Load.bat to run correctly from the windows command prompt, but fail when executed via the ExecuteStreamCommand processor with these errors? This is most likely caused by permission issues. Nifi requires specific permissions against files and scripts it touches or executes from within processors. As such, the error is saying the processor does not know where any of the resources exist to run that .bat file. I do not have any experience with nifi on windows, other than to avoid it, but the solution is likely the same as other operating systems. Make sure the nifi user has full ownership of the file(s). Additionally, it is sometimes possible to find deeper errors looking at the nifi-app.log file while testing and/or setting the log level of the processor to be more aggressive.
... View more
06-13-2023
06:55 AM
1 Kudo
@wert_1311 Your error indicates that two of your roles are missing or incomplete. 1. Data Access Role (arn:aws:iam::8859X2XX911XX:role/Cloudera-datalake-admin-role) is not set up correctly. Please follow the official documentation on required policies for Data Access Role. Missing policies (chunked): arn:aws:iam::8859X2XX911XX:role/Cloudera-datalake-admin-role:s3:AbortMultipartUpload:arn:aws:s3:::cdp-my-bucket/hive_replica_functions_dir/* 2. Data Access Role (arn:aws:iam::8859X2XX911XX:role/Cloudera-ranger-audit-role) is not set up correctly. Please follow the official documentation on required policies for Data Access Role. Missing policies (chunked): arn:aws:iam::8859X2XX911XX:role/Cloudera-ranger-audit-role:s3:PutObject:arn:aws:s3:::cdp-my-bucket/ranger/audit/* Go back to the quickstart and docs, and make sure completd all the setups. Here is a link with more about the credentials https://docs.cloudera.com/cdp-public-cloud/cloud/requirements-aws/topics/mc-aws-req-credential.html There are steps in the page above describing the 2 roles which have conflicts in your error. The error indicates, in the end of each message, which policies are missing.
... View more