Member since
06-16-2020
50
Posts
14
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
358 | 10-23-2024 11:21 AM | |
401 | 10-22-2024 07:59 AM | |
336 | 10-22-2024 07:37 AM | |
184 | 10-21-2024 09:25 AM | |
1874 | 06-16-2023 07:23 AM |
07-29-2024
07:58 AM
hey @drewski7 if you are using ranger UI to import the policies it will give you on option to override or not.
... View more
05-23-2024
11:44 PM
@drewski7 Ensure that the Kerberos tickets are being refreshed properly for the HBase REST server. Stale or expired tickets might cause intermittent authorization issues. Check the Kerberos cache to ensure that it is being updated correctly when group memberships change in LDAP. Restart the HBase REST server after making changes to the LDAP group and running the user sync to see if it resolves the inconsistency. Analyze the HBase REST server logs more thoroughly, especially the messages related to unauthorized access and Kerberos thread issues. Look for patterns or specific errors that could provide more clues. Verify the settings for ranger.plugin.hbase.policy.pollIntervalMs and ranger.plugin.hbase.authorization.cache.max.size again, and experiment with lowering the poll interval to see if it improves the responsiveness of policy changes. In the Ranger Admin UI, after running the user sync, manually refresh the policies for HBase and observe if this action has any immediate effect on the authorization behavior. Confirm that there are no discrepancies in the policies displayed in the Ranger Admin UI and the actual enforcement in HBase. Double-check the synchronization between FreeIPA LDAP and Ranger. Ensure that the user sync is not just updating the Ranger Admin UI but is also effectively communicating changes to all Ranger plugins. Review the user sync logs to verify that all changes are processed correctly without errors.
... View more
10-16-2023
02:19 PM
1 Kudo
@techNerd Clearing a processor components state requires stopping the processor before you can "clear the state". Stopped state is required because the the processor may be writing or updating state when you attempt to clear state which would cause issues. When stopped there is no need to worry about a race condition between writes and deletes. That being said, reseting the sequence number stored in state to 0 can be accomplished using the advanced UI of the UpdateAttribute processor and a special reset-seq flowfile you feed into the processor at 00:00 each day. The advance UI of the UpdateAttribute processor works like if,the,else logic. So you would set up a Rule "reset" and a condition (if), If condition is true the "Actions" are applied. If no Rules's conditions are true, the processor's non advanced UI properties are applied. UpdateAttribute properties (same as you already have): Click on "advanced" in lower left corner of processor configuration UI to open and configure Rules: Now all you need to do is setup a GenerateFlowFile processor that feeds a FlowFile into the updateAttribute processor once a day to reset seq to 0 stored in tat UpdateAttributes processor's local state. Optionally you could add a RouteOnAttribute processor after the UpdateAttribute to route out the sequence file for termination so it does not continue through your dataflow. If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
09-28-2023
12:55 AM
Thank you so much SAMSAL it's working
... View more
06-28-2023
06:19 AM
I am looking in the Kafka policies in my current Ranger Instance. There is a policy called "service_all - cluster". When I look here are the two allow conditions for this policy - However, when I run this API call to get all the policies for kafka and search for the "service_all - cluster" this is result - <policies>
<id>11</id>
<guid>dbbd8ed1-2bc6-452d-991e-28082727e3cf</guid>
<isEnabled>true</isEnabled>
<version>1</version>
<service>cm_kafka</service>
<name>service_all - cluster</name>
<policyType>0</policyType>
<policyPriority>0</policyPriority>
<description>Service Policy for all - cluster</description>
<isAuditEnabled>true</isAuditEnabled>
<resources>
<entry>
<key>cluster</key>
<value>
<values>*</values>
<isExcludes>false</isExcludes>
<isRecursive>false</isRecursive>
</value>
</entry>
</resources>
<policyItems>
<accesses>
<type>configure</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>describe</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>kafka_admin</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>create</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>idempotent_write</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>describe_configs</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>alter_configs</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>cluster_action</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>alter</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>publish</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>consume</type>
<isAllowed>true</isAllowed>
</accesses>
<accesses>
<type>delete</type>
<isAllowed>true</isAllowed>
</accesses>
<users>cruisecontrol</users>
<users>streamsmsgmgr</users>
<users>kafka</users>
<delegateAdmin>true</delegateAdmin>
</policyItems>
<policyItems>
<accesses>
<type>describe</type>
<isAllowed>true</isAllowed>
</accesses>
<users>rangerlookup</users>
<delegateAdmin>false</delegateAdmin>
</policyItems>
<serviceType>kafka</serviceType>
<options/>
<zoneName/>
<isDenyAllElse>false</isDenyAllElse>
</policies> Here you can see there are 3 extra accesses given called publish, consume, delete that aren't showing up in the user interface. Yesterday I did a whole reimport of all the policies for Kafka and it fixed the issue but after a restart of ranger this happened again. I checked the underlying database and it's consistent with the User Interface but again the API call is adding those three extra accesses. Does anyone know what happens after a restart that is causing the API call to differ from the User Interface?
... View more
Labels:
- Labels:
-
Apache Ranger
06-21-2023
07:46 AM
@drewski7, in this case, have a look at @steven-matison 's answer because that is your solution to your problem.
... View more
06-20-2023
12:56 AM
Yes correct....the file is read in the first fragment, updated and then saved into a local folder....and so on for every fragment. I want to be sure that every updare is saved without loosing data.
... View more
06-16-2023
11:26 AM
You have to create a separate case statement for each column you are trying to update similar to what is done for the shipment number.
... View more
06-16-2023
07:23 AM
@bhadraka What version of NiFi are you using? In NiFi 1.20.0, you can use ReplaceText Processor after reading in the file. Using the line-by-line evaluation mode, there is a drop down "Except-Last-Line". You could then configure it to just replace all previous lines with empty strings. Here's a screenshot of my ReplaceText processor properties.
... View more
06-14-2023
12:33 PM
I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
class GroovyProcessor implements Processor {
PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("BATCH_SIZE")
.displayName("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build()
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed are routed here')
.build()
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
String getIdentifier() { null }
JsonSlurper jsonSlurper = new JsonSlurper()
JsonOutput jsonOutput = new JsonOutput()
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession()
try {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
if (!flowFiles) return
flowFiles.each { flowFile ->
Map customAttributes = [ "mime.type": "application/json" ]
List data = null
session.read(flowFile, {
inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
} as InputStreamCallback)
data.each { entry ->
entry.VisitList.each { visit ->
Map newData = [:]
newData.put("employer", entry.employer)
newData.put("loc_id", entry.loc_id)
newData.put("topId", entry.topId)
newData.put("VisitList", [visit])
FlowFile newFlowFile = session.create()
newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
session.transfer(newFlowFile, REL_SUCCESS)
}
}
session.remove(flowFile)
}
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session.rollback(true)
throw t
}
}
}
processor = new GroovyProcessor()
... View more