# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name: "profiler" config: topology.workers: ${profiler.workers} topology.acker.executors: ${profiler.executors} topology.worker.childopts: ${topology.worker.childopts} topology.auto-credentials: ${topology.auto-credentials} topology.message.timeout.secs: ${topology.message.timeout.secs} topology.max.spout.pending: ${topology.max.spout.pending} topology.testing.always.try.serialize: ${topology.testing.always.try.serialize} topology.fall.back.on.java.serialization: ${topology.fall.back.on.java.serialization} topology.kryo.register: ${topology.kryo.register} components: - id: "rowKeyBuilder" className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder" properties: - name: "saltDivisor" value: ${profiler.hbase.salt.divisor} configMethods: - name: "withPeriodDuration" args: [${profiler.period.duration}, "${profiler.period.duration.units}"] - id: "columnBuilder" className: "org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder" constructorArgs: - "${profiler.hbase.column.family}" - id: "hbaseMapper" className: "org.apache.metron.profiler.bolt.ProfileHBaseMapper" properties: - name: "rowKeyBuilder" ref: "rowKeyBuilder" - name: "columnBuilder" ref: "columnBuilder" # Any kafka props for the producer go here. - id: "kafkaProps" className: "java.util.HashMap" configMethods: - name: "put" args: - "value.deserializer" - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - name: "put" args: - "key.deserializer" - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - name: "put" args: - "group.id" - "profiler" - name: "put" args: - "security.protocol" - "${kafka.security.protocol}" # The fields to pull out of the kafka messages - id: "fields" className: "java.util.ArrayList" configMethods: - name: "add" args: - "value" - id: "kafkaConfig" className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: # zookeeper hosts - ref: "kafkaProps" # topic name - "${profiler.input.topic}" - "${kafka.zk}" - ref: "fields" configMethods: - name: "setFirstPollOffsetStrategy" args: - "${kafka.start}" - id: "kafkaWriterProps" className: "java.util.HashMap" configMethods: - name: "put" args: - "security.protocol" - "${kafka.security.protocol}" - id: "kafkaWriter" className: "org.apache.metron.writer.kafka.KafkaWriter" configMethods: - name: "withTopic" args: ["${profiler.output.topic}"] - name: "withZkQuorum" args: ["${kafka.zk}"] - name: "withProducerConfigs" args: [ref: "kafkaWriterProps"] - id: "kafkaEmitter" className: "org.apache.metron.profiler.bolt.KafkaEmitter" - id: "hbaseEmitter" className: "org.apache.metron.profiler.bolt.HBaseEmitter" - id: "windowDuration" className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration" constructorArgs: - ${profiler.window.duration} - "${profiler.window.duration.units}" - id: "windowLag" className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration" constructorArgs: - ${profiler.window.lag} - "${profiler.window.lag.units}" spouts: - id: "kafkaSpout" className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" constructorArgs: - ref: "kafkaConfig" bolts: - id: "splitterBolt" className: "org.apache.metron.profiler.bolt.ProfileSplitterBolt" constructorArgs: - "${kafka.zk}" - id: "builderBolt" className: "org.apache.metron.profiler.bolt.ProfileBuilderBolt" configMethods: - name: "withZookeeperUrl" args: ["${kafka.zk}"] - name: "withPeriodDuration" args: [${profiler.period.duration}, "${profiler.period.duration.units}"] - name: "withProfileTimeToLive" args: [${profiler.ttl}, "${profiler.ttl.units}"] - name: "withEmitter" args: [ref: "kafkaEmitter"] - name: "withEmitter" args: [ref: "hbaseEmitter"] - name: "withTumblingWindow" args: [ref: "windowDuration"] - name: "withLag" args: [ref: "windowLag"] - name: "withMaxNumberOfRoutes" args: [${profiler.max.routes.per.bolt}] - name: "withTimestampField" args: ["timestamp"] - id: "hbaseBolt" className: "org.apache.metron.hbase.bolt.HBaseBolt" constructorArgs: - "${profiler.hbase.table}" - ref: "hbaseMapper" configMethods: - name: "withTableProvider" args: ["${hbase.provider.impl}"] - name: "withBatchSize" args: [${profiler.hbase.batch}] - name: "withFlushIntervalSecs" args: [${profiler.hbase.flush.interval.seconds}] - id: "kafkaBolt" className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" configMethods: - name: "withMessageWriter" args: [ref: "kafkaWriter"] streams: - name: "spout -> splitter" from: "kafkaSpout" to: "splitterBolt" grouping: type: LOCAL_OR_SHUFFLE - name: "splitter -> builder" from: "splitterBolt" to: "builderBolt" grouping: type: FIELDS args: ["entity", "profile"] - name: "builder -> hbase" from: "builderBolt" to: "hbaseBolt" grouping: streamId: "hbase" type: LOCAL_OR_SHUFFLE - name: "builder -> kafka" from: "builderBolt" to: "kafkaBolt" grouping: streamId: "kafka" type: LOCAL_OR_SHUFFLE