Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Cloudera Employee

Introduction

When users interact with services in CDP Public, services generate audits to detail:

  • Which resource the user is attempting to access
  • Action the user is attempting to perform (e.g. SELECT)
  • Result of the policy evaluation (i.e. Allowed or Denied)
  • The time the request was made

Administrators can inspect these audits using the Ranger Web UI. The audits are also archived to object storage as JSON-per-line files which can be easily consumed by downstream applications for monitoring purposes.

Audits include a wide range of data points that describe the nature of the request and the result of the policy evaluation (see the Apache wiki for a complete list).

In our example, we use Flink to read these files and calculate the number of denied requests made by each user within a single session.

Flink example

The complete project can be found on GitHub [link].

Clone this repository to follow along as we break down the application logic.

Data Source

The Ranger audits are stored in the Data Lake Storage Location in the following layout:

$DATALAKE_STORAGE_LOCATION/ranger/audit
|--atlas (service type)
|----atlas (service name)
|------20221013
|--------atlas_ranger_audit_test-dl-master0.test-env.xxxx-xxxx.cloudera.site.1.log
|--------atlas_ranger_audit_test-dl-master0.test-env.xxxx-xxxx.cloudera.site.log
...

Where each line in the files contain a JSON object similar to (pretty-printed):

{
  "repo":"cm_kafka",
  "reqUser":"wdyson",
  "evtTime":"2022-09-15 11:00:16.271",
  "action":"publish",
  "result":1,
  "event_count":1,
  ...
}

Set up required

CC: readme.md

To follow along with this example, you will need:

  • CDP Environment
  • Streaming Analytics Data Hub cluster

You must also ensure that Maven is installed on your developer machine. Once installed, the application can be compiled and packaged using the following command:

$ mvn package

This command will generate a JAR file under ./target/auditsession-*-jar-with-dependencies.jar – we will need this later.

Next, you will need to create the following properties file with parameters that match your environment (replace the values marked with <...>):

audit.path=<base_audit_path_starting_with_one_of_s3a://,abfs://,gs://>
audit.poll=240
session.duration=600
session.output=kafka
kafka.topic=<topic_name>
kafka.bootstrap.servers=<bootstrap_servers>
kafka.security.protocol=SASL_SSL
kafka.sasl.kerberos.service.name=kafka

Save this file with a meaningful filename like flink-audit-sessions.properties.

Finally, you will need to download a copy of your Kerberos keytab from the Cloudera Control Plane – we will configure Flink to use this keytab for authentication:

https://docs.cloudera.com/data-hub/cloud/access-clusters/topics/dh-retrieving-keytabs.html

Copy the JAR file, properties file and keytab file to the management node of your Streaming Analytics Data Hub cluster.

At this point, we're ready to start the example application. SSH onto the management node of the Streaming Analytics Data Hub cluster and run the following command (replacing the values marked with <...>);

$ flink run \
-yD security.kerberos.login.keytab=<path_to_keytab> \
-yD security.kerberos.login.principal=<full_kerberos_principal> \
-yD security.kerberos.login.contexts=KafkaClient \
auditsession-*-jar-with-dependencies.jar \
<properties_file>

If everything was configured correctly, the application should appear in the Flink Web UI – accessible from the Data Hub page in the Cloudera Control Plane – and begin to push results to the configured Kafka topic.

To understand what's going on, we'll break down and explain each part of the application in the sections below:

Read Ranger audits from object storage

CC: App.java

Before we can analyse the audits, we need to load them from object storage. Flink allows you to read files line-by-line in object storage using the StreamExecutionEnvironment.readFile class method like so:

ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);

String auditPath = params.getRequired(PARAM_AUDIT_FS_PATH);
int auditPollMs = params.getInt(PARAM_AUDIT_FS_POLL_SECONDS)*1000;

FileInputFormat<String> format = new TextInputFormat(new Path(auditPath));
format.setNestedFileEnumeration(true);

DataStream<String> rawAudits = env
.readFile(format, auditPath, FileProcessingMode.PROCESS_CONTINUOUSLY, auditPollMs);

This produces a DataStream of String objects (rawAudits) – the lines from the files in the audit directory ordered by modification time.

As the audit files are stored in different sub-directories, we must enable enumeration (recursive listing) with format.setNestedFileEnumeration.

We've used the FileProcessingMode.PROCESS_CONTINUOUSLY mode here. In this mode, Flink will periodically poll the path for new audit files.

Parsing the raw audits

CC: App.java Audit.java

In our project, we have a simple POJO Java class called Audit which contains the fields included in a typical Ranger audit – the name of these fields match the names used in the JSON objects (more details on the fields can be found in the Apache wiki).

The Audit class also includes a static method called Audit.fromJson that uses Jackson to parse the JSON-encoded audits into an Audit object, returning null on failure.

We can use this method to parse our DataStream of raw JSON-encoded audits like so:

DataStream<Audit> audits = rawAudits
  .map((rawAudit) -> Audit.fromJson(rawAudit))
  .filter((audit) -> audit != null && audit.reqUser != null);

Note: Some audits may not include a requesting user – we omit these using the filter expression as we require this field to build a user session.

Assign event times to each audit

CC: App.java

In our application, we group audits into sessions with a 10 minute gap i.e. audits generated by a single user that have all occurred within 10 minutes of at least one other audit in the session.

Flink automatically records the time that events are read from a source system (object storage in our case) – this time is known as Processing Time,

However, suppose two audits from the same user are read by Flink within a 10 minute interval. Are we going to want to include them in the same user session? These audits may have been created months or years apart, and so, using Processing Time to generate user sessions isn't going to make sense here.

Fortunately, each audit includes the time is was created in the evtTime field – this is known as Event Time in Flink.

To register the Event Time for a stream of events, we can use the DataStream.assignTimestampsAndWatermarks class method like so:

DataStream<Audit> auditsWithTime = audits
.assignTimestampsAndWatermarks(WatermarksStrategy
.<Audit>forBoundedOutOfOrderness(Duration.ofDays(2))
.withTimestampAssigner((e, t) -> e.evtTime.getTime());

The timestamp assigner at the bottom of the expression speaks for itself, returning the evtTime field without modification. However, the WatermarkStrategy may take a bit more explaining:

Let's imagine we need to answer the following question:

"How many audits have occurred before Wednesday this week?"

If audits don't arrive in-order, how can we be sure we've received all of the audits that have occurred before Wednesday? Flink answers this question with a Watermarks. A watermark is a monotonically increasing timestamp that marks the minimum Event Time expected for new events that are yet to be read. For the question above, Flink will wait until the watermark has reached Wednesday before committing the result.

In the expression above, we've set our WatermarkStrategy to a bounded out-of-orderness of 2 days. With this strategy, Flink track the watermark for the audits 2 days behind the maximum Event Time observed so far.

This strategy is quite effective in our case as:

  1. Services write at least one audit file per day
  2. Files reads files in object storage in the date modified order

Meaning that audits will be read in roughly date order.

Create session windows using Event Time

CC: App.java DeniedCountAggregate.java WrapUserAndWindowWithCount.java UserSessionCountResult.java

In our application, we are counting the number of denied audits generated by each user within a single user session – keeping things nice and simple.

We do this by:

  1. Setting the key of the stream to the reqUser field – the requesting user that triggered the audit
  2. Windowing audits into sessions using the Event Time
  3. Aggregating the windows using DeniedCountAggregate and  andWrapUserAndWindowWithCount classes
DataStream<UserSessionCountResult> userSessionDeniedAuditCounts = auditsWithTime
.keyBy((audit) -> audit.reqUser)
.window(EventTimeSessionWindows.withGap(Time.seconds(sessionGapSeconds)))
.aggregate(new DeniedCountAggregate(), new WrapUserAndWindowWithCount())
.filter((res) -> res.count != 0);

DeniedCountAggregate implements AggregateFunction<> and it keeps a count of the number of denied audits found in the user session so far.

WrapUserAndWindowWithCount implements ProcessWindowFunction<> and it takes the final count produced for each user session and wraps it in an object that includes the requesting user and details about the user session window. This class outputs the final objects returned by the DataStream.

Write the results to Kafka

CC: App.java

Once generated, we output the results to Kafka.

 The steps required to write our final results stream to Kafka will look something like so:

Properties kafkaProps = readKafkaProperties(PARAM_KAFKA_PREFIX);

String bootstrapServers = kafkaProps.getProperty("bootstrap.servers");
String topic =kafkaProps.getProperty("topic");

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setKafkaProducerConfig(kafkaProps)
.build();

userSessionDeniedAuditCounts
.map((res) -> String.format("user='%s' denies=%d start=%d end=%d",
res.reqUser,
res.count,
res.window.getStart(),
res.window.getEnd()))
.sinkTo(sink);

Note: This example uses the new KafkaSink class, included in the latest version of the Cloudera Runtime.

Exercises

  • Modify the aggregator so that it counts the total number of Hive audits instead of denied audits
  • Filter all results that contain a count of less than 5 before writing the stream to Kafka
  • Change the test case in TestAuditSession.java to pass with the new modifications

Note: Some of these changes will cause the existing tests to fail. You can skip the tests like so:

$ mvn package -DskipTests

Resources

  • [link] Example code used in this article
  • [link] Cloudera Flink tutorials
  • [link] Flink documentation on watermarks
271 Views
0 Kudos