Created on 12-07-2017 07:35 PM - edited 09-16-2022 01:41 AM
In one of the previous articles Extending SAM with Custom Processors & User Defined Functions we saw how SAM can be extended to incorporate any custom processing logic using Custom Processors & User Defined Functions. That is a very useful feature if one is writing application logic from scratch. However, there are cases where such code is already written and one wants to reuse that code. A good example of this is an existing Apache Storm Spout or Bolt that is interacting with a proprietary system. How would one be able to plug in such a Spout or Bolt in SAM? This is what this article will attempt to explain.
For people who are more hands on and want to try out adding a Custom source to SAM and run it as part of a SAM application we have created an example at SAM Custom Source. Please follow the instructions in README.md to execute the steps on your existing SAM installation. For people who want to first get in to the details, continue reading on the rest of this article. You can always come back and try the example we have added.
Before diving deep into the details let us define a built-in SAM component. A built-in SAM component is a source, processor or sink that is part of SAM distribution and appears in the toolbar on the left hand side of SAM application builder canvas. Examples are Kafka source, Kafka sink, Rules processor, etc. Please refer to the image below to locate the toolbar.
Each such component can be used in a SAM application.. To understand how it all works together, let’s start by taking example of Kafka source. SAM is bootstrapped with a bunch of such built-in components during installation. A typical curl call to register such a component as part of bootstrap looks like below
curl -sS -X POST -F topologyComponentBundle=@$data http://localhost:8080/api/v1/catalog/streams/componentbundles/SOURCE
Notice the “SOURCE” at the end of the url as we are going over the example of Kafka source. For a processor or sink SOURCE should be changed to PROCESSOR or SINK respectively. Let’s go through the -F form data argument named topologyComponentBundle. It refers to a json file at Kafka Source
Json is shown here as well for reference with only one property named topic.
Let’s go over the json properties
type | SOURCE | Indicates that this is a SAM source component. This will be used to determine the section(source/processor/sink) in toolbar under which this component should appear |
subType | KAFKA | This field is used to denote the external system for source/sink and type of processing for processor that this component represents. It’s value will appear under the component icon so that user can identify it. Note that for built-in components we already have their icons. If this were not a built-in component, a default icon would appear depending on if it’s a source, processor or a sink. |
name | Kafka | A human readable name to identify this component. |
streamingEngine | STORM | Indicates the streaming engine this component corresponds to. This will be used by SAM framework to support runners for different streaming engines. |
builtin | true | Indicates if this is a built-in SAM component. |
fieldHintProviderClass | com.hortonworks.streamline.streams.cluster.bundle.impl.KafkaBundleHintProvider | FQCN of the class that provides helpful suggestions for configuration fields for this component. This will be explained in detail below. |
transformationClass | com.hortonworks.streamline.streams.layout.storm.KafkaSpoutFluxComponent | FQCN of the class that provides flux transformation needed to translate this SAM component into underlying STORM Spout or Bolt. This will be explained in detail below. |
mavenDeps | org.apache.kafka:kafka-clients:0.10.2.1,org.apache.storm:storm-kafka-client:STORM_VERSION^org.slf4j:slf4j-log4j12^log4j:log4j^org.apache.zookeeper:zookeeper^org.apache.kafka:kafka-clients | Maven artifacts needed to run this component on the runner implementation(Storm cluster in this case). This will be explained in more detail below. |
topologyComponentUISpecification | JSON | A json adhering to a specification that is honored by SAM application editor canvas. This lets you capture any per instance configuration information needed for this component via GUI when you drag and drop it in a SAM application. This will be explained in more detail below. |
The property builtin should always be false for non built in components that one is aiming to add. Please add -F bundleJar=<path_to_jar_file>. This jar file should contain the class specified as value for transformationClass property.
The first property that needs to be elaborated here is topologyComponentUISpecification. A detailed explanation is provided as part of javadocs here https://github.com/hortonworks/streamline/blob/master/common/src/main/java/com/hortonworks/streamlin...
This json property is expected to be a json object that has a top level property called fields which is expected to be a json array. Each element in this array is a configuration field. Let’s take one field from Kafka source and illustrate its use. For a Kafka source, which translates to a kafka Spout under the hood in a storm topology we need the topic that the Spout will read or consume messages from. The way to express this is by adding a field called topic as below.
{ |
"uiName": "Kafka topic", |
"fieldName": "topic", |
"isOptional": false, |
"tooltip": "Kafka topic to read data from", |
"type": "enumstring", |
"options": [], |
"hint": "schema,override" |
} |
As a result you see a field in the configuration dialog for Kafka source as shown in the image below. Note that because isOptional is set to true it appears under Required tab. Because it's of type enumstring it shows a dropdown to pick values from.
Whenever you have a field like that, GUI will display that field when you drop that SAM component onto a SAM application and let user input a value for that field or select a value from available options for that field, depending on the type of that field. Value entered or selected by user will be stored for that instance of the component for that SAM application such that it can later be used by underlying Storm component(Kafka Spout in this case) at runtime. Below is the table explaining each property used to describe a field. You can also check out different built-in components to get an idea of how different fields are configured. All built-in component jsons are located under the directory <SAM_HOME>/bootstrap on SAM installation. You can also check them out here https://github.com/hortonworks/streamline/tree/master/bootstrap/components
uiName | Name of the field that will be displayed as a text label to recognize this field when a user opens the configuration dialog for that component. |
fieldName | Key used to store the value for this field so that value can be later retrieved to pass it to underlying streaming engine. |
isOptional | If this field is optional configuration or not. If it is true, there will be no validation forcing the user to enter or pick a value for this field. Based on its value it will also decide if this field will appear under Optional or Required tab when you open the configuration dialog. |
tooltip | A text that will appear when user hovers over the field in the Configuration dialog. Typically this should be some text explaining the significance of the field. |
type | Json type of the field. For example, string, number, boolean, object, etc. This makes sure that value entered by user is correctly validated and cast into correct json type while storing. |
options | Json array. If the type is an enumstring for example, then this array is expected to be an array of strings and user will be presented a dropdown to pick one of the values from that array. |
hint | Comma separated string of hints. GUI already supports some built-in hints that do certain operations. For example, if hint field has a value of “schema” then value entered or picked by user for that field has to be name of a schema defined in Schema Registry. If hint field has a value of “password” then value entered by user for the field will not be shown on the form. If hint field has a value of “email” then value entered by user will be validated against a regular expression for an email id. More and more hints are added as the project evolves. To find out all the existing hints used, you can run the command grep -RH "\"hint\":" <SAM_HOME>/bootstrap |
Now, let’s move on to the “transformationClass” property and see how it works in conjunction with different configuration fields defined for that component.
Storm runner uses Flux yaml files for deploying SAM applications as Storm topologies. For people who are not familiar with Flux here is a link to get an overview. https://github.com/hortonworks/storm/tree/HDF-3.0-maint/external/flux
Each built-in SAM component will have an implementation for the interface https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/stre...
Please note that the implementation class needs to have a public no args constructor defined so that SAM can instantiate an object of that class and call methods defined on the interface. The important methods are
withConfig(Map<String, Object> config);
Argument config in the signature above will be java representation of json mentioned above containing key value pairs for all the fields configured by the user in GUI. While deploying a SAM application, SAM will call this method on the implementation first thing after instantiating an object of this class so the implementation can use it later for creating Flux related objects.
List<Map<String, Object>> getReferencedComponents ();
getReferencedComponents should return a list of all components needed to create the top level component(Spout/Bolt). Each element in the list is a map representing the Flux yaml.
Map<String, Object> getComponent ();
getComponent should return a map representing the Flux yaml for the top level component.
You can checkout implementations for built-in components at https://github.com/hortonworks/streamline/tree/master/streams/runners/storm/layout/src/main/java/com... As you might have noticed KafkaSpoutFluxComponent.java which is set as value for “transformationClass” property as mentioned before provides an implementation for the interface.
Let’s move on to the next property “mavenDeps”. Different Storm Spouts and Bolts need different java classes at runtime. For example, Kafka source in SAM that translates to a Kafka Spout will need kafka-clients library since it needs all Kafka consumer classes. The way SAM lets you do that is using a feature in Storm that lets you submit additional jars to Storm by fetching them using maven coordinates during topology submission. You can check out https://github.com/hortonworks/storm/blob/HDF-3.0-maint/bin/storm.py#L272 for details of implementation. Important thing to remember here is the format expected for the value of that property. It is expected to be a comma separated string of maven artifact entries. Each maven artifact entry is of the format
<maven_group_id>:<maven_artifact_id>:<maven_artifact_id_version>^[<maven_group_id_excluded>:<maven_artifact_id_excluded>]^.....followed by more exclusions if any
Note that it expects the artifacts to be published either in maven central repository or hortonworks private/public repositories by default. There is also a way to add other repositories. That, however requires a change in SAM config in Ambari and restart of SAM server. Check out the image below that points to the config in Ambari. Format for the config is comma separated strings where each string is of the form <repo_name>^repo_url
Last property that remains to be covered is “fieldHintProviderClass”. Please note that this property is optional. This is expected to be FQCN of the class that provides suggestions for values for different fields that are defined for this component. Check out the implementation for Kafka source at https://github.com/hortonworks/streamline/blob/master/streams/cluster/src/main/java/com/hortonworks/... It is used by GUI to suggest or auto-populate values for different fields. For example, the “topic” field that we have talked about so far, you might have noticed that GUI shows a drop down for that field. For every built-in SAM component, GUI makes a call to an endpoint requesting suggested values for different fields for that component. If the component has such an implementation then it will provide with a json with entries that GUI can use. Each entry will have a key that will be the same as the value for “fieldName” property for that field so that GUI can tie it back to correct field. The value for each such entry is a suggestion. In case of “topic” field for Kafka source the implementation returns all the topics defined in Kafka cluster. So the value will be a list of strings, where each string is a topic. As a result, user can pick a topic from the list rather than typing it in. This makes it less error prone.
At this point, the question arises what if one wants to have a SAM component that is not a built-in one. This is fairly straightforward. Below is a list of pre-requisites and a run down of steps needed to achieve that.
Pre-requisites
Steps to follow
curl -sS -X POST -i -F topologyComponentBundle=@$data http://localhost:8080/api/v1/catalog/streams/componentbundles/SOURCE -FbundleJar=<path_to_jar_file>
At this point the curl should return a successful response and you should see that component in the toolbar when you navigate to Application Editor Canvas. You can drop that component into a new or an existing SAM app, configure and connect it and deploy the application.