Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Rising Star

SAM’s main goal is to let developers, operators and business analysts build, deploy, manage, monitor streaming applications easily in minutes. In this article, an aspect of SAM that deals with its extensibility will be elaborated upon. Most of the streaming use cases are already covered by the built in SAM processors like Join, Split, Rules, Aggregate, Normalization, etc. However, from our interaction with customers and other users it was realized that apart from providing an easy to use visual interface for the users to build new streaming applications, there exists custom logic that cannot be easily represented using existing processors. Extensibility, thus was always an important consideration while designing SAM. How can one hook their custom processing logic in SAM using Custom Processors(CPs) and User Defined Functions(UDFs)? This is what this article will attempt to explain.

CPs and UDFs let you define new functions and custom processing in a way that prevents different people in an organization from re-writing the same code and promotes reusing it instead. A lot of common functions or processing needed at an organization level can be standardized using these features and reused by whoever needs it in the organization. They are both designed keeping in mind the principle of Write Once. Use Anywhere. For reference here is the link to SAM sdk module https://github.com/hortonworks/streamline/tree/master/streams/sdk

Remaining content is broken down into 3 sections since each section addresses a different use case of extensibility:

  1. UDFs(User Defined Functions)
  2. UDAFs(User Defined Aggregate Functions)
  3. Custom Processor

1 UDFs(User Defined Functions)

UDFs let you apply functions to the fields of events flowing through a SAM application. UDFs are powered by a built-in SAM processor component named Projection as shown in the image below.

The image above shows where a Projection processor is located under Processor section in the toolbar on the left hand side. It is dropped onto the application and connected to a Kafka source.

Screen Shot 2017-07-31 at 6.57.13 PM.png

The image above shows the dialog that opens up when you double click on it to configure. A projection processor is a component that lets you apply UDFs on the event flowing through it and project fields making up your output schema from that Projection processor component.

SAM already ships a bunch of standard built-in UDFs. However, if you want to apply a function that is not covered by built-in UDFs shipped with SAM, you can register your UDF and use it in Projection processor. Once you register the UDF, it will appear as one of the items in the drop down that has all the UDFs(built-in and the ones registered by user). As you can see the drop down in the image, it already has a bunch of standard functions like IDENTITY, UPPER, etc.

Let’s say you want to write a UDF that will take in speed and time as input and compute the distance. You can register such a UDF called COMPUTE_DISTANCE for example, and it will start appearing in the drop down there. As a result of applying this UDF, you will have an event flowing out of this processor that will now also have distance computed as a new field alongside all the other fields you pick from the input event.

2 UDAFs(User Defined Aggregate Functions)

UDAFs let you apply aggregate functions to the fields of events flowing through a SAM application. UDAFs are powered by a built-in SAM processor component named Aggregate as shown in the image below.

The image above shows where a Aggregate processor is located under Processor section in the toolbar on the left hand side. It is dropped onto the application and connected to a Kafka source.

Screen Shot 2017-07-31 at 7.44.45 PM.png

The image above shows the dialog that opens up when you double click on it to configure.

An aggregate processor is a component that lets you apply aggregate functions on the fields of events over a window of time(event or processing) and/or count. SAM already ships with a bunch of built-in UDAFs. However, if you want to perform an aggregation that is not covered by built-in UDAFs shipped with SAM, you can register your custom UDAF and use it in Aggregate processor. Once you register your custom UDAF, it will appear as one of the items in the drop down that has all the UDAFs(built-in and ones registered by user)

Check out the GUI screenshots below to see all the currently registered UDFs and UDAFs and how to add or register one. Notice the Type column which denotes if its a normal or an aggregate function. You can also edit or delete any custom functions that did not ship with SAM originally. You can navigate to the page by clicking on Application Resources under the wrench icon on the left hand side.

Screen Shot 2017-07-31 at 8.08.45 PM.png

Screen Shot 2017-07-31 at 8.10.06 PM.png

Once you click on the + icon you will be presented with a dialog shown in the below image.

Screen Shot 2017-07-31 at 8.11.25 PM.png

Most of the fields are self explanatory. Please refer to the tooltip on each field for more information. You need to make sure that you select the correct type from the drop down based on if you are trying to register an aggregate function or not. The CLASSNAME field should be FQCN of the class implementing one of the interfaces at https://github.com/hortonworks/streamline/tree/master/streams/sdk/src/main/java/com/hortonworks/stre... If you pick the type to be FUNCTION then by convention, a UDF implementation with a single argument will implement interface in UDF.java. For an implementation that needs more than one argument, you need to implement interfaces in UDF2, UDF3… UDFn respectively. If you pick the type to be Aggregate then by convention, a UDAF implementation with a single argument will implement the interface in UDAF.java. For an implementation with two arguments, interface in UDAF2.java needs to be implemented. Note that SAM will take care of matching the fields from event flowing into Projection or Aggregation processors to the arguments for your UDF or UDAF implementations. It will also take care of letting you project the value returned as an output field from the processor. It does so by extracting information from the arguments and return type of the implementation class provided in the jar using reflection and presenting that information to you in GUI screens while configuring the processors.

3 Custom Processor

In previous two sections we covered how you can apply simple functions to individual fields that are part of an event flowing through the application using UDFs and UDAFs. This section covers the scenario where you want to take the event itself and do some processing and return a list of events as a result to emit out from that component.

The idea of a Custom Processor is “Write Once. Use Anywhere”. This will be more clear as we work through an example below of a Custom Processor that will help to enrich the event flowing through it by adding information about the driver. Presumption is that there is an endpoint that exposes a service to look up using driverId and return the data needed to enrich the event. The Custom Processor implementation will expect a field named driverId in the incoming event and it will return driverName, driverDOB and driverLicenseNumber.

First step to using a Custom Processor is writing one. One needs to implement the interface at https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/stre... (or a cached version at https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/stre... if you want to cache and avoid expensive lookups per event) and create a jar file that can be uploaded during registering Custom Processor. Note that it is important to have a no args public constructor for the implementation. Important lifecycle methods of the interface are initialize and process and have signatures as below.

void initialize(Map<String, Object> config);

List<StreamlineEvent> process (StreamlineEvent event) throws ProcessingException;

The initialize method will be called once when the Custom Processor implementation is first instantiated. It will be passed all the key-value pairs that were entered by the user in GUI and are needed for configuring this instance of the Custom Processor implementation. In our example, it will have a value for the key serviceEndpoint.

The process method is where all the custom processing logic lies. It is passed a StreamlineEvent as an argument and is expected to return a list of StreamlineEvents that it wants to emit. StreamlineEvent is another interface which is an abstraction of the event flowing through SAM. It is a part of SAM SDK module at https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/stre...

Now that we have a jar with the implementation. Let’s go through the registration process first. To register a Custom Processor click on the Application Resources under the Wrench menu items as shown in the image below.

Screen Shot 2017-08-01 at 12.15.35 PM.png

If you already have any Custom Processors registered you will see them in a table. It will let you Edit or Delete them. Edit and Add GUI screens are very similar to each other. Let’s go through the Add GUI that opens up when you click on the + icon as shown below.

Screen Shot 2017-08-01 at 12.25.40 PM.png

Screen Shot 2017-08-01 at 12.25.57 PM.png

Most of the fields are self explanatory. However, the important ones are NAME, CLASSNAME, INPUTSCHEMA and OUTPUTSCHEMA.

  • NAME is used to uniquely identify this Custom Processor.
  • CLASSNAME is FQCN of the class implementing the interface as mentioned above.
  • INPUTSCHEMA is the schema of the incoming event that is expected by this Custom Processor implementation. So we are essentially saying that it expects a mandatory driverId field of type long.
  • OUTPUTSCHEMA is the schema of the events that this implementation returns. We are essentially saying that every event returned by calling process method on this implementation will have driverName, driverDOB and driverLicenseNumber(all mandatory and of type string) fields.

Please check out the image below that shows a dialog box to add CONFIG FIELDS.

Screen Shot 2017-08-01 at 12.37.21 PM.png

It lets you add configuration fields that you can supply values for while configuring an instance of this Custom Processor in a SAM application. Once you click Ok and successfully save the Custom Processor, it will appear as a component under the toolbar on the left hand side and you can use it in a SAM application. Check out the image below.

This is how we get the “Write Once. Use Anywhere” capability. The idea is that as long as the semantics of the custom processing and the schemas stay the same one can configure a Custom Processor differently depending on how it needs to be used in a given context(context here being SAM application). For our purpose, let’s assume that there are two data centres - one in EAST and other one in WEST. Web service endpoint for EAST and WEST are different. Let’s now go through the configuration of this Custom Processor within a SAM application. As you can see in the image above, the Custom Processor is connected to a Kafka source. On double clicking this Custom Processor a dialog box will open up that will allow to configure this Custom Processor and a couple of other things.

Screen Shot 2017-08-01 at 1.25.55 PM.png

You can see that the value for Service Endpoint field points to the one for EAST coast since this SAM application is presumed to process data from the east coast data center. You will also see a section called INPUT SCHEMA MAPPING. This is used to map fields from schema of events emitted from previous component that semantically are the same as the fields defined in the input schema for this Custom Processor. For our case, field names are exactly the same. Which is why you see a mapping from driverId -> driverId. However, imagine a scenario where you were also using this Custom Processor to enrich data with driver information from another source. This source has the field named as driverIdentifier instead of driverId. This mapping allows you to tell SAM that driverIdentifier is the same as driverId. SAM will take care of passing the driverIdentifier field as driverId to the Custom Processor implementation. You can choose a mapping for all such fields defined in the input schema for a Custom Processor.

The other section that you might have noticed is the OUTPUT FIELDS section. It lets you pick all the fields that you want to include in the event that is passed on to the next component from this Custom Processor component. This is what will make up the fields on the output schema that is shown on the vertical section named Output on the right hand side of the configuration dialog in the image above. The fields that you can pick to create the output schema can be any field from input schema of the event flowing into this Custom Processor component or output schema defined for this Custom Processor. This lets you carry forward any fields from the original input event. As you can see in the image, all fields from the original event are picked. We also pick the 3 fields driverName, driverDOB and driverLicenseNumber defined as part of Custom Processor output schema. Once you click Ok and save the SAM application you are ready to deploy the SAM application that now uses a Custom Processor.

Next article that illustrates how SAM can be extended to add Custom Sources/Sinks can be found at Extending SAM with Custom Sources/Sinks Check it out!

1,846 Views