Member since
06-16-2020
50
Posts
14
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
307 | 10-23-2024 11:21 AM | |
344 | 10-22-2024 07:59 AM | |
304 | 10-22-2024 07:37 AM | |
164 | 10-21-2024 09:25 AM | |
1840 | 06-16-2023 07:23 AM |
06-28-2023
06:19 AM
I am looking in the Kafka policies in my current Ranger Instance. There is a policy called "service_all - cluster". When I look here are the two allow conditions for this policy - However, when I run this API call to get all the policies for kafka and search for the "service_all - cluster" this is result - <policies>
<id>11</id>
<guid>dbbd8ed1-2bc6-452d-991e-28082727e3cf</guid>
<isEnabled>true</isEnabled>
<version>1</version>
<service>cm_kafka</service>
<name>service_all - cluster</name>
<policyType>0</policyType>
<policyPriority>0</policyPriority>
<description>Service Policy for all - cluster</description>
<isAuditEnabled>true</isAuditEnabled>
<resources>
<entry>
<key>cluster</key>
<value>
<values>*</values>
<isExcludes>false</isExcludes>
<isRecursive>false</isRecursive>
</value>
</entry>
</resources>
<policyItems>
<accesses>
<type>configure</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>describe</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>kafka_admin</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>create</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>idempotent_write</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>describe_configs</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>alter_configs</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>cluster_action</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>alter</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>publish</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>consume</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>delete</type>
<isAllowed>true</isAllowed>
</accesses>
<users>cruisecontrol</users>
<users>streamsmsgmgr</users>
<users>kafka</users>
<delegateAdmin>true</delegateAdmin>
</policyItems>
<policyItems>
<accesses>
<type>describe</type>
<isAllowed>true</isAllowed>
</accesses>
<users>rangerlookup</users>
<delegateAdmin>false</delegateAdmin>
</policyItems>
<serviceType>kafka</serviceType>
<options/>
<zoneName/>
<isDenyAllElse>false</isDenyAllElse>
</policies> Here you can see there are 3 extra accesses given called publish, consume, delete that aren't showing up in the user interface. Yesterday I did a whole reimport of all the policies for Kafka and it fixed the issue but after a restart of ranger this happened again. I checked the underlying database and it's consistent with the User Interface but again the API call is adding those three extra accesses. Does anyone know what happens after a restart that is causing the API call to differ from the User Interface?
... View more
Labels:
- Labels:
-
Apache Ranger
06-21-2023
07:14 AM
@cotopaul - It's taking in JSON and writing to Parquet and only doing literal value replacements (ie. adding 5 fields to each record). 3 of those fields is just adding in attribute values and literal values to each record and the other two is doing minor date manipulation (ie converting dates to epoch).
... View more
06-21-2023
05:19 AM
@steven-matison - Thanks for response. If we were to just scope it to looking at the UpdateRecord processor for example, are there any things from an infrastrucutre or configuration stand point you know of to make it more efficient assuming that I can't scale up or tune processor concurrency?
... View more
06-20-2023
05:54 AM
I have been using multiple record oriented processors ConvertRecord, UpdateRecord, etc. in various parts of my flow. For example, my UpdateRecord processor takes about 16 seconds to read in a 30MB flowfile, add some fields to each record and convert that data to parquet. I want to improve performance such that this takes a lot less time. My infrastructure that I am working on currently is a 2 node e2-standard-4 cluster in GCP with a Centos7 operating system. These two instances have 4 vCPUs and 16 GB RAM and for each repository (content, provenance, flowfile) I have separate SSD persistent drives. A lot of the configs in NiFi are the defaults but what recommendations would anyone recommend either from an infrastructure or NiFi standpoint to improve performance on these processors.
... View more
Labels:
- Labels:
-
Apache NiFi
06-16-2023
07:23 AM
@bhadraka What version of NiFi are you using? In NiFi 1.20.0, you can use ReplaceText Processor after reading in the file. Using the line-by-line evaluation mode, there is a drop down "Except-Last-Line". You could then configure it to just replace all previous lines with empty strings. Here's a screenshot of my ReplaceText processor properties.
... View more
06-16-2023
07:05 AM
@Ray82 - I tested out @SAMSAL solution quickly and it worked for me. Make sure in QueryRecord you are referencing the right attribute names.
... View more
06-14-2023
06:57 AM
1 Kudo
@Dracile - Instead I recommend using ForkRecord Processor. Like @steven-matison mentioned, create a Record Reader and Record Writer and add another property with the record path /VisitList, Mode - Split, Include Parent Fields to True. This will result in the next flowfile looking like this - [ {
"employer" : "98765",
"loc_id" : "312",
"topId" : "Management",
"VisitList" : [ {
"S1" : "HR",
"S2" : "Accountant"
} ]
}, {
"employer" : "98765",
"loc_id" : "312",
"topId" : "Management",
"VisitList" : [ {
"S1" : "Manager",
"S2" : "Sr. Manager"
} ]
} ] Then you could split on $ using a SplitJson processor or even better continue using more record-oriented processors for better performance 🙂
... View more
06-02-2023
11:51 AM
Let's say I have a JSON that looks similiar to this [{ "time": "", "value": 92.72, "longitude": "128.732855", "latitude": "-16.1135755", "gathered_time": "1685730397038", "metadata": null }, { "time": "", "value": 92.7, "longitude": "128.732855", "latitude": "-16.1135755", "gathered_time": "1685730397038", "metadata": null }, { "time": "", "value": { "nil": null }, "longitude": "128.732855", "latitude": "-16.1135755", "gathered_time": "1685730397038", "metadata": { "TVPMeasurementMetadata": { "qualifier": { "title": null, "href": null } } } }] How would I use a record oriented processor to filter out values that have a the key value equal to this - {"value": { "nil": null } In the example above it the last record in the JSON array would be filtered out so the expected output would look something similar to this - [{ "time": "", "value": 92.72, "longitude": "128.732855", "latitude": "-16.1135755", "gathered_time": "1685730397038", "metadata": null }, { "time": "", "value": 92.7, "longitude": "128.732855", "latitude": "-16.1135755", "gathered_time": "1685730397038", "metadata": null }] I don't understand how to do filters. I read about it in the RecordPath Guide but would love to see an example.
... View more
Labels:
- Labels:
-
Apache NiFi
06-01-2023
11:16 AM
Is there a way to convert use expression language to convert the current time to a specific timezone. Here is what I have currently but that's GMT not GMT+10:00. ${now():format("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", "GMT")}
... View more
Labels:
- Labels:
-
Apache NiFi