Member since
02-01-2022
281
Posts
103
Kudos Received
60
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1120 | 05-15-2025 05:45 AM | |
| 4949 | 06-12-2024 06:43 AM | |
| 7921 | 04-12-2024 06:05 AM | |
| 5830 | 12-07-2023 04:50 AM | |
| 3203 | 12-05-2023 06:22 AM |
06-16-2023
06:25 AM
@MOUROU Is your NiFi configured to support Oauth2 based user authentication? It looks more like you are using either kerberos-provider or ldap-provider fro user authentication. My suggestion to create a client certificate and use a SSLContext service for client authentication for an automated dataflow like this is because: 1. No need to obtain an token ever. 2. Certs can be created with long expiration time. 3. Tokens are NiFi node specific (same token is not valid for a different NiFi node in a the same NiFi cluster). 4. Same certs works no matter which node in the cluster the invokeHTTP connects with. Matt
... View more
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more
06-14-2023
07:10 AM
@Fredb This is a very difficult one to solve. Does anyone know what would cause the execution of the sample_Import_Load.bat to run correctly from the windows command prompt, but fail when executed via the ExecuteStreamCommand processor with these errors? This is most likely caused by permission issues. Nifi requires specific permissions against files and scripts it touches or executes from within processors. As such, the error is saying the processor does not know where any of the resources exist to run that .bat file. I do not have any experience with nifi on windows, other than to avoid it, but the solution is likely the same as other operating systems. Make sure the nifi user has full ownership of the file(s). Additionally, it is sometimes possible to find deeper errors looking at the nifi-app.log file while testing and/or setting the log level of the processor to be more aggressive.
... View more
06-13-2023
06:55 AM
1 Kudo
@wert_1311 Your error indicates that two of your roles are missing or incomplete. 1. Data Access Role (arn:aws:iam::8859X2XX911XX:role/Cloudera-datalake-admin-role) is not set up correctly. Please follow the official documentation on required policies for Data Access Role. Missing policies (chunked): arn:aws:iam::8859X2XX911XX:role/Cloudera-datalake-admin-role:s3:AbortMultipartUpload:arn:aws:s3:::cdp-my-bucket/hive_replica_functions_dir/* 2. Data Access Role (arn:aws:iam::8859X2XX911XX:role/Cloudera-ranger-audit-role) is not set up correctly. Please follow the official documentation on required policies for Data Access Role. Missing policies (chunked): arn:aws:iam::8859X2XX911XX:role/Cloudera-ranger-audit-role:s3:PutObject:arn:aws:s3:::cdp-my-bucket/ranger/audit/* Go back to the quickstart and docs, and make sure completd all the setups. Here is a link with more about the credentials https://docs.cloudera.com/cdp-public-cloud/cloud/requirements-aws/topics/mc-aws-req-credential.html There are steps in the page above describing the 2 roles which have conflicts in your error. The error indicates, in the end of each message, which policies are missing.
... View more
06-12-2023
04:48 PM
Great article @saketa 👏
... View more
06-09-2023
04:11 PM
1 Kudo
Hey @steven-matison and @Former Member thank you so much for your help It worked with StandardProxyConfigurationService controller services however I still have issues with StandardRestrictedSSLContextService controller service. Anyway, thank you so much for the help and details steps that helped me a lot. Thank you!!
... View more
06-08-2023
02:11 AM
1 Kudo
@MattWho Thanks much for clearing my confusions around on - how my jobs running automatically when i restart server.. I encountered restart couple of times but never know this feature. You are awesome!!
... View more
06-07-2023
12:24 PM
2 Kudos
@SandyClouds Some clarity and additions to @cotopaul Pros and Cons: Single Node: PROs: - easy to manage. <-- Setup and managing configuration is easier since you only need to do that on one node. But in a cluster, all nodes configuration files will be almost the same (some variations in hostname properties and certificates if you secure your cluster). - easy to configure. <-- There are more configurations needed in a cluster setup, but once setup, nothing changes from the user experience when it comes to interacting with the UI. - no https required. <-- Not sure how this is a PRO. I would not recommend using an un-secure NiFi as doing so allow anyone access to your dataflows and the data being processed. You can also have an un-secure NiFi cluster while i do not recommend that either. CONs: - in case of issues with the node, you NiFi instance is down. <-- Very true, single point of failure. - it uses plenty of resources, when it needs to process data, as everything is done on a single node. Cluster: PROs: - redundancy and failover --> when a node goes down, the others will take over and process everything, meaning that you will not get affected. <-- Not complete accurate. Each node in a NiFi cluster is only aware of the data (FlowFiles) queued on that specific node. So each node works on the FlowFile present on that one node, so it is the responsibility of the dataflow designer/builder to make sure they built their dataflows in such away to ensure distribution of FlowFiles across all nodes. When a node goes down, any data FlowFiles currently queued on that down node are not going to be processed by the other nodes. However, other nodes will continue processing their data and all new data coming in to your dataflow cluster - the used resources will be split among all the nodes, meaning that you can cover more use cases as on a single node. <-- Different nodes do not share or pool resources from all nodes in the cluster. If your dataflow(s) are built correctly the volume of data (FlowFiles) being processed will be distributed across all your nodes along each node to process a smaller subset of the overall FlowFile volume. This means more resources available across yoru cluster to handle more volume. NEW -- A NiFi cluster can be accessed via any one of the member nodes. No matter which node's UI you access, you will be presented with stats for all nodes. There is a cluster UI accessible from the global menu that allows you to see a breakdown of each node. Any changes you make from the UI of any one of the member nodes will be replicated to all nodes. NEW -- Since all nodes run their own copy of the flow, a catastrophic node failure does not mean loss of all your work since the same flow.json.gz (contains everything related to your dataflows) can be retrieved from any of the other nodes in your cluster. CONs: - complex setup as it requires a Zookeeper + plenty of other config files. <-- NiFi cluster requires a multi node zookeeper setup. Zookeeper quorum is required for cluster stability and also stores cluster wide state needed for your dataflow. Zookeeper is responsible for electing a node in your cluster with the Cluster Coordinator role and Primary node role. IF a node goes down that has been assigned one of these roles, Zookeeper will elected one of the still up nodes to the role - complex to manage --> analysis will be done on X nodes instead of a single node. <-- not clear. Yes you have multiple nodes and all those nodes are producing their own set of NiFi-logs. However, if a component within your dataflow is producing bulletins (exceptions) it will report all nodes or the specific node(s) on which bulletin was produced. Cloudera offers centralized management of your NiFi cluster deployment via Cloudera Manager software. Makes deploying and managing NiFi cluster to multiple nodes easy, sets up and configures Zookeeper for you, and makes securing your NiFi easy as well by generating the needed certificates/keystores for you. Hope this helps, Matt
... View more
06-02-2023
09:00 AM
1 Kudo
Thanks STeven for your recommendation. The issue result. The invokeHttp oauth2 property looks like not able to select basic authorization from the setting or I might missing something. So have to ignore oauth2 setting and implement basic auth by sending client_id and _secret separately to get token first and then send actual payload later from another invokehttp processor. Thanks for your help
... View more
06-01-2023
12:40 PM
2 Kudos
@drewski7 You are looking for something like this, for "yesterday": ${now():minus(86400000):format('MM-dd-yyyy hh:mm:ss') } Then change 24 hours to 10 hours and add not reduce: ${now():plus(36000000):format('MM-dd-yyyy hh:mm:ss') } Let me know if this adapts to fit your use case!
... View more