Member since
10-22-2015
29
Posts
12
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2694 | 08-14-2017 01:35 AM | |
4041 | 05-12-2016 06:16 PM | |
4067 | 05-12-2016 05:40 PM |
12-07-2017
07:35 PM
2 Kudos
In one of the previous articles Extending SAM with Custom Processors & User Defined Functions we saw how SAM can be extended to incorporate any custom processing logic using Custom Processors & User Defined Functions. That is a very useful feature if one is writing application logic from scratch. However, there are cases where such code is already written and one wants to reuse that code. A good example of this is an existing Apache Storm Spout or Bolt that is interacting with a proprietary system. How would one be able to plug in such a Spout or Bolt in SAM? This is what this article will attempt to explain. For people who are more hands on and want to try out adding a Custom source to SAM and run it as part of a SAM application we have created an example at SAM Custom Source. Please follow the instructions in README.md to execute the steps on your existing SAM installation. For people who want to first get in to the details, continue reading on the rest of this article. You can always come back and try the example we have added. Before diving deep into the details let us define a built-in SAM component. A built-in SAM component is a source, processor or sink that is part of SAM distribution and appears in the toolbar on the left hand side of SAM application builder canvas. Examples are Kafka source, Kafka sink, Rules processor, etc. Please refer to the image below to locate the toolbar. Each such component can be used in a SAM application.. To understand how it all works together, let’s start by taking example of Kafka source. SAM is bootstrapped with a bunch of such built-in components during installation. A typical curl call to register such a component as part of bootstrap looks like below curl -sS -X POST -F topologyComponentBundle=@$data http://localhost:8080/api/v1/catalog/streams/componentbundles/SOURCE Notice the “SOURCE” at the end of the url as we are going over the example of Kafka source. For a processor or sink SOURCE should be changed to PROCESSOR or SINK respectively. Let’s go through the -F form data argument named topologyComponentBundle. It refers to a json file at Kafka Source Json is shown here as well for reference with only one property named topic. Let’s go over the json properties type SOURCE Indicates that this is a SAM source component. This will be used to determine the section(source/processor/sink) in toolbar under which this component should appear subType KAFKA This field is used to denote the external system for source/sink and type of processing for processor that this component represents. It’s value will appear under the component icon so that user can identify it. Note that for built-in components we already have their icons. If this were not a built-in component, a default icon would appear depending on if it’s a source, processor or a sink. name Kafka A human readable name to identify this component. streamingEngine STORM Indicates the streaming engine this component corresponds to. This will be used by SAM framework to support runners for different streaming engines. builtin true Indicates if this is a built-in SAM component. fieldHintProviderClass com.hortonworks.streamline.streams.cluster.bundle.impl.KafkaBundleHintProvider FQCN of the class that provides helpful suggestions for configuration fields for this component. This will be explained in detail below. transformationClass com.hortonworks.streamline.streams.layout.storm.KafkaSpoutFluxComponent FQCN of the class that provides flux transformation needed to translate this SAM component into underlying STORM Spout or Bolt. This will be explained in detail below. mavenDeps org.apache.kafka:kafka-clients:0.10.2.1,org.apache.storm:storm-kafka-client:STORM_VERSION^org.slf4j:slf4j-log4j12^log4j:log4j^org.apache.zookeeper:zookeeper^org.apache.kafka:kafka-clients Maven artifacts needed to run this component on the runner implementation(Storm cluster in this case). This will be explained in more detail below. topologyComponentUISpecification JSON A json adhering to a specification that is honored by SAM application editor canvas. This lets you capture any per instance configuration information needed for this component via GUI when you drag and drop it in a SAM application. This will be explained in more detail below. The property builtin should always be false for non built in components that one is aiming to add. Please add -F bundleJar=<path_to_jar_file>. This jar file should contain the class specified as value for transformationClass property. The first property that needs to be elaborated here is topologyComponentUISpecification. A detailed explanation is provided as part of javadocs here https://github.com/hortonworks/streamline/blob/master/common/src/main/java/com/hortonworks/streamline/common/ComponentUISpecification.java This json property is expected to be a json object that has a top level property called fields which is expected to be a json array. Each element in this array is a configuration field. Let’s take one field from Kafka source and illustrate its use. For a Kafka source, which translates to a kafka Spout under the hood in a storm topology we need the topic that the Spout will read or consume messages from. The way to express this is by adding a field called topic as below. { "uiName": "Kafka topic", "fieldName": "topic", "isOptional": false, "tooltip": "Kafka topic to read data from", "type": "enumstring", "options": [], "hint": "schema,override" } As a result you see a field in the configuration dialog for Kafka source as shown in the image below. Note that because isOptional is set to true it appears under Required tab. Because it's of type enumstring it shows a dropdown to pick values from. Whenever you have a field like that, GUI will display that field when you drop that SAM component onto a SAM application and let user input a value for that field or select a value from available options for that field, depending on the type of that field. Value entered or selected by user will be stored for that instance of the component for that SAM application such that it can later be used by underlying Storm component(Kafka Spout in this case) at runtime. Below is the table explaining each property used to describe a field. You can also check out different built-in components to get an idea of how different fields are configured. All built-in component jsons are located under the directory <SAM_HOME>/bootstrap on SAM installation. You can also check them out here https://github.com/hortonworks/streamline/tree/master/bootstrap/components uiName Name of the field that will be displayed as a text label to recognize this field when a user opens the configuration dialog for that component. fieldName Key used to store the value for this field so that value can be later retrieved to pass it to underlying streaming engine. isOptional If this field is optional configuration or not. If it is true, there will be no validation forcing the user to enter or pick a value for this field. Based on its value it will also decide if this field will appear under Optional or Required tab when you open the configuration dialog. tooltip A text that will appear when user hovers over the field in the Configuration dialog. Typically this should be some text explaining the significance of the field. type Json type of the field. For example, string, number, boolean, object, etc. This makes sure that value entered by user is correctly validated and cast into correct json type while storing. options Json array. If the type is an enumstring for example, then this array is expected to be an array of strings and user will be presented a dropdown to pick one of the values from that array. hint Comma separated string of hints. GUI already supports some built-in hints that do certain operations. For example, if hint field has a value of “schema” then value entered or picked by user for that field has to be name of a schema defined in Schema Registry. If hint field has a value of “password” then value entered by user for the field will not be shown on the form. If hint field has a value of “email” then value entered by user will be validated against a regular expression for an email id. More and more hints are added as the project evolves. To find out all the existing hints used, you can run the command grep -RH "\"hint\":" <SAM_HOME>/bootstrap Now, let’s move on to the “transformationClass” property and see how it works in conjunction with different configuration fields defined for that component. Storm runner uses Flux yaml files for deploying SAM applications as Storm topologies. For people who are not familiar with Flux here is a link to get an overview. https://github.com/hortonworks/storm/tree/HDF-3.0-maint/external/flux Each built-in SAM component will have an implementation for the interface https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/streamline/streams/layout/storm/FluxComponent.java Please note that the implementation class needs to have a public no args constructor defined so that SAM can instantiate an object of that class and call methods defined on the interface. The important methods are withConfig(Map<String, Object> config); Argument config in the signature above will be java representation of json mentioned above containing key value pairs for all the fields configured by the user in GUI. While deploying a SAM application, SAM will call this method on the implementation first thing after instantiating an object of this class so the implementation can use it later for creating Flux related objects. List<Map<String, Object>> getReferencedComponents (); getReferencedComponents should return a list of all components needed to create the top level component(Spout/Bolt). Each element in the list is a map representing the Flux yaml. Map<String, Object> getComponent (); getComponent should return a map representing the Flux yaml for the top level component. You can checkout implementations for built-in components at https://github.com/hortonworks/streamline/tree/master/streams/runners/storm/layout/src/main/java/com/hortonworks/streamline/streams/layout/storm As you might have noticed KafkaSpoutFluxComponent.java which is set as value for “transformationClass” property as mentioned before provides an implementation for the interface. Let’s move on to the next property “mavenDeps”. Different Storm Spouts and Bolts need different java classes at runtime. For example, Kafka source in SAM that translates to a Kafka Spout will need kafka-clients library since it needs all Kafka consumer classes. The way SAM lets you do that is using a feature in Storm that lets you submit additional jars to Storm by fetching them using maven coordinates during topology submission. You can check out https://github.com/hortonworks/storm/blob/HDF-3.0-maint/bin/storm.py#L272 for details of implementation. Important thing to remember here is the format expected for the value of that property. It is expected to be a comma separated string of maven artifact entries. Each maven artifact entry is of the format <maven_group_id>:<maven_artifact_id>:<maven_artifact_id_version>^[<maven_group_id_excluded>:<maven_artifact_id_excluded>]^.....followed by more exclusions if any Note that it expects the artifacts to be published either in maven central repository or hortonworks private/public repositories by default. There is also a way to add other repositories. That, however requires a change in SAM config in Ambari and restart of SAM server. Check out the image below that points to the config in Ambari. Format for the config is comma separated strings where each string is of the form <repo_name>^repo_url Last property that remains to be covered is “fieldHintProviderClass”. Please note that this property is optional. This is expected to be FQCN of the class that provides suggestions for values for different fields that are defined for this component. Check out the implementation for Kafka source at https://github.com/hortonworks/streamline/blob/master/streams/cluster/src/main/java/com/hortonworks/streamline/streams/cluster/bundle/impl/KafkaBundleHintProvider.java It is used by GUI to suggest or auto-populate values for different fields. For example, the “topic” field that we have talked about so far, you might have noticed that GUI shows a drop down for that field. For every built-in SAM component, GUI makes a call to an endpoint requesting suggested values for different fields for that component. If the component has such an implementation then it will provide with a json with entries that GUI can use. Each entry will have a key that will be the same as the value for “fieldName” property for that field so that GUI can tie it back to correct field. The value for each such entry is a suggestion. In case of “topic” field for Kafka source the implementation returns all the topics defined in Kafka cluster. So the value will be a list of strings, where each string is a topic. As a result, user can pick a topic from the list rather than typing it in. This makes it less error prone. At this point, the question arises what if one wants to have a SAM component that is not a built-in one. This is fairly straightforward. Below is a list of pre-requisites and a run down of steps needed to achieve that. Pre-requisites The SAM component you are trying to add must be an underlying Storm Spout or Bolt. SAM source has to be a Spout and SAM processor or sink has to be a Bolt. The Spout or Bolt creation should be possible using Flux yaml so that a corresponding implementation of the FluxComponent interface can be written and provided to SAM. The Spout or Bolt, must emit on an explicit output stream that is provided by SAM. It should declare only one field on that output stream with name streamline-event and the value emitted should be an object implementing the interface StreamlineEvent. The way to get the output streams assigned to it by SAM in the flux interface implementation is as below (conf is the object passed to the implementation in the withConfig method call of the interface as explained above) ((StreamlineSource)conf.get(StormTopologyLayoutConstants.STREAMLINE_COMPONENT_CONF_KEY)).getOutputStreams(); ((StreamlineProcessor)conf.get(StormTopologyLayoutConstants.STREAMLINE_COMPONENT_CONF_KEY)).getOutputStreams(); All the jars needed for running such Spout or Bolt should be available in a maven repository that is configured in SAM ambari config or installed in m2 local directory of the user running SAM server process. Steps to follow Provide an implementation of the interface FluxComponent and put that in a jar file that will be passed to curl. Note that you will need to declare a dependency on SAM SDK, SAM Storm layout and SAM common Create a json file for registering your component and make sure all the properties are defined correctly. Make sure to set builtin to false and streamingEngine to STORM. Other important ones are topologyComponentUISpecification, mavenDeps and transformationClass. For sources and processors, topologyComponentUISpecification field that has all the configuration fields should have one field that has a hint “schema” and other one that has a hint “schemaVersion,dependsOn-<field_with_hint_schema>” and the corresponding schema should exist in Schema Registry. This is how SAM will be able to figure out the schema of the event flowing out of that component when you drop it in a SAM application and enables other features on fields of the schema like applying UDFs, rules, etc. Please check out the fields topic topic and readerSchemaVersion in the json for the built in Kafka source for an example. Note that the underlying Spout or Bolt is expected to emit StreamlineEvent that adheres to the schema and version provided by the user. Also note that the sample curl has SOURCE at the end of it. If you are registering a processor or sink, make sure to replace it correctly. Register it as a SAM component using a curl. curl -sS -X POST -i -F topologyComponentBundle=@$data http://localhost:8080/api/v1/catalog/streams/componentbundles/SOURCE -FbundleJar=<path_to_jar_file> At this point the curl should return a successful response and you should see that component in the toolbar when you navigate to Application Editor Canvas. You can drop that component into a new or an existing SAM app, configure and connect it and deploy the application.
... View more
10-31-2017
07:57 PM
5 Kudos
SAM’s main goal is to let developers, operators and business analysts build, deploy, manage, monitor streaming applications easily in minutes. In this article, an aspect of SAM that deals with its extensibility will be elaborated upon. Most of the streaming use cases are already covered by the built in SAM processors like Join, Split, Rules, Aggregate, Normalization, etc. However, from our interaction with customers and other users it was realized that apart from providing an easy to use visual interface for the users to build new streaming applications, there exists custom logic that cannot be easily represented using existing processors. Extensibility, thus was always an important consideration while designing SAM. How can one hook their custom processing logic in SAM using Custom Processors(CPs) and User Defined Functions(UDFs)? This is what this article will attempt to explain. CPs and UDFs let you define new functions and custom processing in a way that prevents different people in an organization from re-writing the same code and promotes reusing it instead. A lot of common functions or processing needed at an organization level can be standardized using these features and reused by whoever needs it in the organization. They are both designed keeping in mind the principle of Write Once. Use Anywhere. For reference here is the link to SAM sdk module https://github.com/hortonworks/streamline/tree/master/streams/sdk Remaining content is broken down into 3 sections since each section addresses a different use case of extensibility: UDFs(User Defined Functions) UDAFs(User Defined Aggregate Functions) Custom Processor 1 UDFs(User Defined Functions) UDFs let you apply functions to the fields of events flowing through a SAM application. UDFs are powered by a built-in SAM processor component named Projection as shown in the image below. The image above shows where a Projection processor is located under Processor section in the toolbar on the left hand side. It is dropped onto the application and connected to a Kafka source. The image above shows the dialog that opens up when you double click on it to configure. A projection processor is a component that lets you apply UDFs on the event flowing through it and project fields making up your output schema from that Projection processor component. SAM already ships a bunch of standard built-in UDFs. However, if you want to apply a function that is not covered by built-in UDFs shipped with SAM, you can register your UDF and use it in Projection processor. Once you register the UDF, it will appear as one of the items in the drop down that has all the UDFs(built-in and the ones registered by user). As you can see the drop down in the image, it already has a bunch of standard functions like IDENTITY, UPPER, etc. Let’s say you want to write a UDF that will take in speed and time as input and compute the distance. You can register such a UDF called COMPUTE_DISTANCE for example, and it will start appearing in the drop down there. As a result of applying this UDF, you will have an event flowing out of this processor that will now also have distance computed as a new field alongside all the other fields you pick from the input event. 2 UDAFs(User Defined Aggregate Functions) UDAFs let you apply aggregate functions to the fields of events flowing through a SAM application. UDAFs are powered by a built-in SAM processor component named Aggregate as shown in the image below. The image above shows where a Aggregate processor is located under Processor section in the toolbar on the left hand side. It is dropped onto the application and connected to a Kafka source. The image above shows the dialog that opens up when you double click on it to configure. An aggregate processor is a component that lets you apply aggregate functions on the fields of events over a window of time(event or processing) and/or count. SAM already ships with a bunch of built-in UDAFs. However, if you want to perform an aggregation that is not covered by built-in UDAFs shipped with SAM, you can register your custom UDAF and use it in Aggregate processor. Once you register your custom UDAF, it will appear as one of the items in the drop down that has all the UDAFs(built-in and ones registered by user) Check out the GUI screenshots below to see all the currently registered UDFs and UDAFs and how to add or register one. Notice the Type column which denotes if its a normal or an aggregate function. You can also edit or delete any custom functions that did not ship with SAM originally. You can navigate to the page by clicking on Application Resources under the wrench icon on the left hand side. Once you click on the + icon you will be presented with a dialog shown in the below image. Most of the fields are self explanatory. Please refer to the tooltip on each field for more information. You need to make sure that you select the correct type from the drop down based on if you are trying to register an aggregate function or not. The CLASSNAME field should be FQCN of the class implementing one of the interfaces at https://github.com/hortonworks/streamline/tree/master/streams/sdk/src/main/java/com/hortonworks/streamline/streams/rule If you pick the type to be FUNCTION then by convention, a UDF implementation with a single argument will implement interface in UDF.java. For an implementation that needs more than one argument, you need to implement interfaces in UDF2, UDF3… UDFn respectively. If you pick the type to be Aggregate then by convention, a UDAF implementation with a single argument will implement the interface in UDAF.java. For an implementation with two arguments, interface in UDAF2.java needs to be implemented. Note that SAM will take care of matching the fields from event flowing into Projection or Aggregation processors to the arguments for your UDF or UDAF implementations. It will also take care of letting you project the value returned as an output field from the processor. It does so by extracting information from the arguments and return type of the implementation class provided in the jar using reflection and presenting that information to you in GUI screens while configuring the processors. 3 Custom Processor In previous two sections we covered how you can apply simple functions to individual fields that are part of an event flowing through the application using UDFs and UDAFs. This section covers the scenario where you want to take the event itself and do some processing and return a list of events as a result to emit out from that component. The idea of a Custom Processor is “Write Once. Use Anywhere”. This will be more clear as we work through an example below of a Custom Processor that will help to enrich the event flowing through it by adding information about the driver. Presumption is that there is an endpoint that exposes a service to look up using driverId and return the data needed to enrich the event. The Custom Processor implementation will expect a field named driverId in the incoming event and it will return driverName, driverDOB and driverLicenseNumber. First step to using a Custom Processor is writing one. One needs to implement the interface at https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/streamline/streams/runtime/CustomProcessorRuntime.java (or a cached version at https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/streamline/streams/runtime/CacheBackedProcessorRuntime.java if you want to cache and avoid expensive lookups per event) and create a jar file that can be uploaded during registering Custom Processor. Note that it is important to have a no args public constructor for the implementation. Important lifecycle methods of the interface are initialize and process and have signatures as below. void initialize(Map<String, Object> config); List<StreamlineEvent> process (StreamlineEvent event) throws ProcessingException; The initialize method will be called once when the Custom Processor implementation is first instantiated. It will be passed all the key-value pairs that were entered by the user in GUI and are needed for configuring this instance of the Custom Processor implementation. In our example, it will have a value for the key serviceEndpoint. The process method is where all the custom processing logic lies. It is passed a StreamlineEvent as an argument and is expected to return a list of StreamlineEvents that it wants to emit. StreamlineEvent is another interface which is an abstraction of the event flowing through SAM. It is a part of SAM SDK module at https://github.com/hortonworks/streamline/blob/master/streams/sdk/src/main/java/com/hortonworks/streamline/streams/StreamlineEvent.java Now that we have a jar with the implementation. Let’s go through the registration process first. To register a Custom Processor click on the Application Resources under the Wrench menu items as shown in the image below. If you already have any Custom Processors registered you will see them in a table. It will let you Edit or Delete them. Edit and Add GUI screens are very similar to each other. Let’s go through the Add GUI that opens up when you click on the + icon as shown below. Most of the fields are self explanatory. However, the important ones are NAME, CLASSNAME, INPUTSCHEMA and OUTPUTSCHEMA. NAME is used to uniquely identify this Custom Processor. CLASSNAME is FQCN of the class implementing the interface as mentioned above. INPUTSCHEMA is the schema of the incoming event that is expected by this Custom Processor implementation. So we are essentially saying that it expects a mandatory driverId field of type long. OUTPUTSCHEMA is the schema of the events that this implementation returns. We are essentially saying that every event returned by calling process method on this implementation will have driverName, driverDOB and driverLicenseNumber(all mandatory and of type string) fields. Please check out the image below that shows a dialog box to add CONFIG FIELDS. It lets you add configuration fields that you can supply values for while configuring an instance of this Custom Processor in a SAM application. Once you click Ok and successfully save the Custom Processor, it will appear as a component under the toolbar on the left hand side and you can use it in a SAM application. Check out the image below. This is how we get the “Write Once. Use Anywhere” capability. The idea is that as long as the semantics of the custom processing and the schemas stay the same one can configure a Custom Processor differently depending on how it needs to be used in a given context(context here being SAM application). For our purpose, let’s assume that there are two data centres - one in EAST and other one in WEST. Web service endpoint for EAST and WEST are different. Let’s now go through the configuration of this Custom Processor within a SAM application. As you can see in the image above, the Custom Processor is connected to a Kafka source. On double clicking this Custom Processor a dialog box will open up that will allow to configure this Custom Processor and a couple of other things. You can see that the value for Service Endpoint field points to the one for EAST coast since this SAM application is presumed to process data from the east coast data center. You will also see a section called INPUT SCHEMA MAPPING. This is used to map fields from schema of events emitted from previous component that semantically are the same as the fields defined in the input schema for this Custom Processor. For our case, field names are exactly the same. Which is why you see a mapping from driverId -> driverId. However, imagine a scenario where you were also using this Custom Processor to enrich data with driver information from another source. This source has the field named as driverIdentifier instead of driverId. This mapping allows you to tell SAM that driverIdentifier is the same as driverId. SAM will take care of passing the driverIdentifier field as driverId to the Custom Processor implementation. You can choose a mapping for all such fields defined in the input schema for a Custom Processor. The other section that you might have noticed is the OUTPUT FIELDS section. It lets you pick all the fields that you want to include in the event that is passed on to the next component from this Custom Processor component. This is what will make up the fields on the output schema that is shown on the vertical section named Output on the right hand side of the configuration dialog in the image above. The fields that you can pick to create the output schema can be any field from input schema of the event flowing into this Custom Processor component or output schema defined for this Custom Processor. This lets you carry forward any fields from the original input event. As you can see in the image, all fields from the original event are picked. We also pick the 3 fields driverName, driverDOB and driverLicenseNumber defined as part of Custom Processor output schema. Once you click Ok and save the SAM application you are ready to deploy the SAM application that now uses a Custom Processor. Next article that illustrates how SAM can be extended to add Custom Sources/Sinks can be found at Extending SAM with Custom Sources/Sinks Check it out!
... View more
Labels:
03-20-2017
10:29 PM
Hi Vincent. What is the version of storm-kafka that you are using to build your topology? Can you try to upgrade it to the same version as mentioned by @Jungtaek Lim above?
... View more