Created on 10-14-2022 12:30 PM
When users interact with services in CDP Public, services generate audits to detail:
Administrators can inspect these audits using the Ranger Web UI. The audits are also archived to object storage as JSON-per-line files which can be easily consumed by downstream applications for monitoring purposes.
Audits include a wide range of data points that describe the nature of the request and the result of the policy evaluation (see the Apache wiki for a complete list).
In our example, we use Flink to read these files and calculate the number of denied requests made by each user within a single session.
The complete project can be found on GitHub [link].
Clone this repository to follow along as we break down the application logic.
The Ranger audits are stored in the Data Lake Storage Location in the following layout:
$DATALAKE_STORAGE_LOCATION/ranger/audit
|--atlas (service type)
|----atlas (service name)
|------20221013
|--------atlas_ranger_audit_test-dl-master0.test-env.xxxx-xxxx.cloudera.site.1.log
|--------atlas_ranger_audit_test-dl-master0.test-env.xxxx-xxxx.cloudera.site.log
...
Where each line in the files contain a JSON object similar to (pretty-printed):
{
"repo":"cm_kafka",
"reqUser":"wdyson",
"evtTime":"2022-09-15 11:00:16.271",
"action":"publish",
"result":1,
"event_count":1,
...
}
CC: readme.md
To follow along with this example, you will need:
You must also ensure that Maven is installed on your developer machine. Once installed, the application can be compiled and packaged using the following command:
$ mvn package
This command will generate a JAR file under ./target/auditsession-*-jar-with-dependencies.jar – we will need this later.
Next, you will need to create the following properties file with parameters that match your environment (replace the values marked with <...>):
audit.path=<base_audit_path_starting_with_one_of_s3a://,abfs://,gs://>
audit.poll=240
session.duration=600
session.output=kafka
kafka.topic=<topic_name>
kafka.bootstrap.servers=<bootstrap_servers>
kafka.security.protocol=SASL_SSL
kafka.sasl.kerberos.service.name=kafka
Save this file with a meaningful filename like flink-audit-sessions.properties.
Finally, you will need to download a copy of your Kerberos keytab from the Cloudera Control Plane – we will configure Flink to use this keytab for authentication:
https://docs.cloudera.com/data-hub/cloud/access-clusters/topics/dh-retrieving-keytabs.html
Copy the JAR file, properties file and keytab file to the management node of your Streaming Analytics Data Hub cluster.
At this point, we're ready to start the example application. SSH onto the management node of the Streaming Analytics Data Hub cluster and run the following command (replacing the values marked with <...>);
$ flink run \
-yD security.kerberos.login.keytab=<path_to_keytab> \
-yD security.kerberos.login.principal=<full_kerberos_principal> \
-yD security.kerberos.login.contexts=KafkaClient \
auditsession-*-jar-with-dependencies.jar \
<properties_file>
If everything was configured correctly, the application should appear in the Flink Web UI – accessible from the Data Hub page in the Cloudera Control Plane – and begin to push results to the configured Kafka topic.
To understand what's going on, we'll break down and explain each part of the application in the sections below:
CC: App.java
Before we can analyse the audits, we need to load them from object storage. Flink allows you to read files line-by-line in object storage using the StreamExecutionEnvironment.readFile class method like so:
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
String auditPath = params.getRequired(PARAM_AUDIT_FS_PATH);
int auditPollMs = params.getInt(PARAM_AUDIT_FS_POLL_SECONDS)*1000;
FileInputFormat<String> format = new TextInputFormat(new Path(auditPath));
format.setNestedFileEnumeration(true);
DataStream<String> rawAudits = env
.readFile(format, auditPath, FileProcessingMode.PROCESS_CONTINUOUSLY, auditPollMs);
This produces a DataStream of String objects (rawAudits) – the lines from the files in the audit directory ordered by modification time.
As the audit files are stored in different sub-directories, we must enable enumeration (recursive listing) with format.setNestedFileEnumeration.
We've used the FileProcessingMode.PROCESS_CONTINUOUSLY mode here. In this mode, Flink will periodically poll the path for new audit files.
CC: App.java Audit.java
In our project, we have a simple POJO Java class called Audit which contains the fields included in a typical Ranger audit – the name of these fields match the names used in the JSON objects (more details on the fields can be found in the Apache wiki).
The Audit class also includes a static method called Audit.fromJson that uses Jackson to parse the JSON-encoded audits into an Audit object, returning null on failure.
We can use this method to parse our DataStream of raw JSON-encoded audits like so:
DataStream<Audit> audits = rawAudits
.map((rawAudit) -> Audit.fromJson(rawAudit))
.filter((audit) -> audit != null && audit.reqUser != null);
Note: Some audits may not include a requesting user – we omit these using the filter expression as we require this field to build a user session.
CC: App.java
In our application, we group audits into sessions with a 10 minute gap i.e. audits generated by a single user that have all occurred within 10 minutes of at least one other audit in the session.
Flink automatically records the time that events are read from a source system (object storage in our case) – this time is known as Processing Time,
However, suppose two audits from the same user are read by Flink within a 10 minute interval. Are we going to want to include them in the same user session? These audits may have been created months or years apart, and so, using Processing Time to generate user sessions isn't going to make sense here.
Fortunately, each audit includes the time is was created in the evtTime field – this is known as Event Time in Flink.
To register the Event Time for a stream of events, we can use the DataStream.assignTimestampsAndWatermarks class method like so:
DataStream<Audit> auditsWithTime = audits
.assignTimestampsAndWatermarks(WatermarksStrategy
.<Audit>forBoundedOutOfOrderness(Duration.ofDays(2))
.withTimestampAssigner((e, t) -> e.evtTime.getTime());
The timestamp assigner at the bottom of the expression speaks for itself, returning the evtTime field without modification. However, the WatermarkStrategy may take a bit more explaining:
Let's imagine we need to answer the following question:
"How many audits have occurred before Wednesday this week?"
If audits don't arrive in-order, how can we be sure we've received all of the audits that have occurred before Wednesday? Flink answers this question with a Watermarks. A watermark is a monotonically increasing timestamp that marks the minimum Event Time expected for new events that are yet to be read. For the question above, Flink will wait until the watermark has reached Wednesday before committing the result.
In the expression above, we've set our WatermarkStrategy to a bounded out-of-orderness of 2 days. With this strategy, Flink track the watermark for the audits 2 days behind the maximum Event Time observed so far.
This strategy is quite effective in our case as:
Meaning that audits will be read in roughly date order.
CC: App.java DeniedCountAggregate.java WrapUserAndWindowWithCount.java UserSessionCountResult.java
In our application, we are counting the number of denied audits generated by each user within a single user session – keeping things nice and simple.
We do this by:
DataStream<UserSessionCountResult> userSessionDeniedAuditCounts = auditsWithTime
.keyBy((audit) -> audit.reqUser)
.window(EventTimeSessionWindows.withGap(Time.seconds(sessionGapSeconds)))
.aggregate(new DeniedCountAggregate(), new WrapUserAndWindowWithCount())
.filter((res) -> res.count != 0);
DeniedCountAggregate implements AggregateFunction<> and it keeps a count of the number of denied audits found in the user session so far.
WrapUserAndWindowWithCount implements ProcessWindowFunction<> and it takes the final count produced for each user session and wraps it in an object that includes the requesting user and details about the user session window. This class outputs the final objects returned by the DataStream.
CC: App.java
Once generated, we output the results to Kafka.
The steps required to write our final results stream to Kafka will look something like so:
Properties kafkaProps = readKafkaProperties(PARAM_KAFKA_PREFIX);
String bootstrapServers = kafkaProps.getProperty("bootstrap.servers");
String topic =kafkaProps.getProperty("topic");
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setKafkaProducerConfig(kafkaProps)
.build();
userSessionDeniedAuditCounts
.map((res) -> String.format("user='%s' denies=%d start=%d end=%d",
res.reqUser,
res.count,
res.window.getStart(),
res.window.getEnd()))
.sinkTo(sink);
Note: This example uses the new KafkaSink class, included in the latest version of the Cloudera Runtime.
Note: Some of these changes will cause the existing tests to fail. You can skip the tests like so:
$ mvn package -DskipTests