Member since
06-16-2020
51
Posts
14
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
475 | 10-23-2024 11:21 AM | |
479 | 10-22-2024 07:59 AM | |
421 | 10-22-2024 07:37 AM | |
241 | 10-21-2024 09:25 AM | |
1968 | 06-16-2023 07:23 AM |
12-10-2024
06:02 PM
I need to authenticate to a Kafka Broker using OAuth. I am looking at the Apache NiFi issues pages and see this URL to add support for SASL/OAUTHBEARER to the Kafka 3 processors which now have an allowable values list of SASL mechanisms - https://issues.apache.org/jira/browse/NIFI-7421 Does anyone know if this will ever be implemented? Otherwise, does anyone have any list of ideas I could use to achieve this requirement? @MattWho @SAMSAL
... View more
Labels:
- Labels:
-
Apache NiFi
11-04-2024
06:02 AM
The problem was my StandardRestrictedSSLContextService didn't include a keystore. For some reason, I thought it was just one way SSL communication. Once I added the keystore to the ContextService it was authenticating correctly! Thanks @MattWho !
... View more
10-30-2024
06:30 PM
All, I have two separate NiFi instance and I want to push provenance events from one instance to another. Source NiFi's hostname is nifi8443 and Target NiFi's hostname is nifi8444. Both of these instances are running over HTTPS. On the "Target NiFi" I set up an input port called Test. On source NiFi, here's what my SiteToSiteProvenanceReportingTask looks like. Here's a couple things I did. I realized that the reporting task on NiFi8443 is essentially acting like an client and NiFi8444 is acting as the server. I added the NiFi8444 server certificate into a truststore.p12 file on NiFi8443 and created the corresponding SSLContextService to point to that Truststore file. I resolved the SSL issue and then I was reading on the target node (ie. NiFi8444) I had to make an identity for the full DN of the source (ie NiFi8443) certificate. So on NiFI8444, I created a new user called this - CN=nifi8443, O=NiFi Server, L=San Francisco, ST=California, C=US Then I added that identity to two policies. The first one is retrieve-site-to-site - And the second one is the input port - I thought that was all the requirements however, I am currently getting this error when I start the reporting task. SiteToSiteProvenanceReportingTask[id=d9ece17e-0192-1000-9050-2a4a5a2f9e02] Unable to refresh remote group peers due to: response code 401:Unauthorized with explanation: null It seems like it has to do with the identity from NiFi8443 not properly being authenticated correctly. But when I run keytool -list -v -keystore nifi8443.p12 on the server certificate I get this - @MattWho @SAMSAL - Do you know why I getting a 401 unauthorized error?
... View more
Labels:
- Labels:
-
Apache NiFi
10-23-2024
07:00 PM
2 Kudos
@Mikkkk Please check my screenshot below. I changed the "Replacement Value Strategy" property to "Literal". I also updated the dynamic user property to this - ${field.value:replaceAll('[\\p{Punct}]', ' ')} Here's the screenshot - ${field.value}: In NiFi’s Expression Language, field.value refers to the current value of the field being processed in a record (for example, the value of the ENTITY_NAME field if that’s what you’re working on). :replaceAll('[\\p{Punct}]', ' '): This function searches the field.value for any punctuation characters (using the [\\p{Punct}] regex) and replaces each occurrence with a space. Please "Accept the solution" if it helped!
... View more
10-23-2024
11:32 AM
1 Kudo
@Shalexey Can you provide an example of what you mean by filter? The LookupRecord processor primary function is to extract one or more fields from a Record and looks up a value for those fields in a LookupService. If you are trying to do a filter you can use another NiFi processor prior to the LookupRecord processor. Here are some examples below... 1. RouteOnAttribute Routes flow files based on attribute values by evaluating expressions. It allows you to define conditions (e.g., ${status:equals('SUCCESS')}) to send data to different relationships, like success or unmatched. 2. RouteOnContent Filters flow files based on patterns within the content. This processor is ideal for detecting specific keywords, tags, or values inside text, XML, or JSON data and routing them accordingly. 3. FilterCSV Designed to filter rows in CSV files based on specific column values or positions. It removes unwanted data from the flow, only passing records that meet the defined criteria (e.g., status = 'active'). 4. QueryRecord Uses SQL queries to filter structured data (JSON, CSV, Avro) within flow files. You can select or discard records by writing SQL-like queries (e.g., SELECT * FROM FLOWFILE WHERE type = 'transaction').
... View more
10-23-2024
11:21 AM
3 Kudos
@MDTechie - Here is a provided solution for the question you asked. I usually like to Jolt Transforms step by step. Please see the Jolt Spec below along with explanations for each step. [
{
"operation": "shift",
"spec": {
"id": "id",
"categories": {
"*": {
"@": "categories[]",
"subcategories": {
"*": {
"@": "categories[]"
}
}
}
}
}
},
{
"operation": "remove",
"spec": {
"categories": {
"*": {
"subcategories": ""
}
}
}
},
{
"operation": "shift",
"spec": {
"categories": {
"*": {
"code": "categories[&1].categoryCode",
"name": "categories[&1].categoryName",
"@(2,id)": "categories[&1].individualId"
}
}
}
},
{
"operation": "shift",
"spec": {
"categories": {
"*": ""
}
}
}
] The first operation Copies the id field from the original input to the top level of the output. Iterates over the categories array and: Copies each category object to a new array called categories[]. If a category contains a subcategories array, each subcategory object is also copied into the same categories[] array, effectively flattening the nested structure. The second operation Searches within each category object in the categories[] array. Removes the subcategories field if it exists. The third operation Iterates over the categories[] array, processing each category object. Renames the code field to categoryCode and the name field to categoryName for consistency with the new output schema. Adds a new field called individualId inside each category object, using the id value from two levels up (the root of the original input). The fourth operation: Takes each object from the categories[] array and moves it to the root level. As a result, the categories array wrapper is removed, leaving a flat array of individual objects. I suggest piecing it out in a JOLT tester to understand it a little better. Hope this helps! 🙂 If you found the solution helpful, please "Accept as Solution"
... View more
10-22-2024
07:59 AM
1 Kudo
@RanjitMohapatra NiFi expects the Jolt specification to be provided as an array of objects if it's set chain. This is typical when using JoltTransformJSON because it supports multiple transformations chained together. Also, if you click on your JoltTransformJSON and click on the advanced option - It will bring up a nice Jolt Spec tester for you that is consistent with how NiFi will handle the transformation - Solution is just wrapping it in an array. [ { "operation": "shift", "spec": { "ts": "ts", "sourceDeviceId": "source.deviceId", "sourceName": "source.name", "sourceType": "source.type", "locationLat": "location.lat", "locationLon": "location.lon", "locationId": "location.id", "recordDate": "recordDate", "_class": "_class", "status": "status", "power": "power", "current": "current", "intensity": "intensity", "voltage": "voltage" } } ] Please accept the solution!
... View more
10-22-2024
07:37 AM
2 Kudos
@rajivswe_2k7 I would avoid putting a lot of data in the flowfile attributes due to memory concerns. However, what you could do is change the property in the InvokeHTTP called "Response Generation Required" from False to True No matter what the status code of the HTTP response is , it will automatically get routed to the Response relationship with the full content of the response in the flowfile's content along with the response status code in the flowfile's attribute. You could then use a RouteOnAttribute Processor to filter for certain response codes and you will have your full response in the content of the flowfile where it should be 🙂 Please accept this solution if you find it to be helpful!
... View more
10-21-2024
09:34 AM
@nifier What does your configuration for the FetchS3 processor look like? I would say make sure your pointing to the correct region in the FetchS3 processor and make sure your AWSCredentialsService or credentials in the processor are set correctly.
... View more
10-21-2024
09:25 AM
1 Kudo
@xtd --- Based on your requirements, it looks like there is some unneeded processors that you have. Here is what my flow currently looks like - 1. ExecuteSQL This will return all the rows in the source table in avro format. If you look at the attributes, by default it will return an attribute called executesql.row.count which you can use as your source count for rows. So part of the left side on your flow can be removed. 2. PutDatabaseRecord This will populate the target table with all the records. Only on success will go to the next processor. 3. CalculateRecordStats This will create a new attribute called record.count that counts the number of rows in the full record - 4. UpdateAttribute - This prepares for the PutSQL an creates the corresponding attributes - 5. PutSQL Inserts new row into the table with input_count and output_count - Using the record-oriented processors, getting the counts paradigm is somewhat not needed because populating the target table either succeeds or fails. If it succeeds you can assume the row count will be same, if it fails then you could assume nothing was populated. However, above is approach to get the counts either way 🙂 Please accept this solution if it helped!
... View more