Created 08-30-2017 09:59 PM
I am trying to run a very simple topology with a spout that generates random data and partitionPersist to store the data into SQL.
However the StateUpdater.updateState never gets called. Am I missing something?
Originally my project reads from kafka, performs some aggregations and transformations and then saves the results into SQL. It was not working so I made this test, the simplest one that I could think of and it still doest not work.
I am using storm version 1.1.0 from HDP 2.6.1
Here are the logs (please see classes below to follow them):
PEEK: UserSpout - DECLAREOUTPUTFIELDS PEEK: UserSpout - GETCOMPONENTCONFIGURATION PEEK: UserSpout - GETCOMPONENTCONFIGURATION PEEK: UserSpout - DECLAREOUTPUTFIELDS PEEK: JdbcStateFactoryMet - MAKESTATE PEEK: UserSpout - OPEN PEEK: UserSpout - NEXTTUPLE - [1, peter, 1504128640671] PEEK: UserSpout - NEXTTUPLE - [2, bob, 1504128640671] PEEK: UserSpout - NEXTTUPLE - [2, bob, 1504128640671] ...
Here are the classes:
The spout:
public class UserSpout implements IRichSpout { private SpoutOutputCollector collector; private static final List<Values> rows = Lists.newArrayList( new Values(1, "peter", System.currentTimeMillis()), new Values(2, "bob", System.currentTimeMillis()), new Values(3, "alice", System.currentTimeMillis())); public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void close() { } public void nextTuple() { try { Thread.sleep(500); final Random rand = new Random(); final Values row = rows.get(rand.nextInt(rows.size() - 1)); this.collector.emit(row); Thread.yield(); } catch (InterruptedException e) { } } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user_id", "user_name", "create_date")); } public void activate() { } public void deactivate() { } public Map<String, Object> getComponentConfiguration() { return null; } }
The main:
public class MainClass { public static void main(String[] args) throws Exception { TridentTopology topology = new TridentTopology(); topology.newStream("userSpout", new UserSpout()) .partitionPersist(new JdbcStateFactoryMet(), new Fields("user_id", "create_date"), new JdbcUpdaterTest()); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TEST", new Config(), topology.build()); } public static class JdbcStateFactoryMet implements StateFactory { private static final Logger LOG = LoggerFactory.getLogger(JdbcStateFactoryMet.class); @Override public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { LOG.info("PEEK: JdbcStateFactoryMet - MAKESTATE"); return new JdbcStateMet(); } } public static class JdbcStateMet implements State { private static final Logger LOG = LoggerFactory.getLogger(JdbcStateMet.class); @Override public void beginCommit(Long aLong) { LOG.info("PEEK: JdbcStateMet - beginCommit"); } @Override public void commit(Long aLong) { LOG.info("PEEK: JdbcStateMet - commit"); } public void updateState(List<TridentTuple> tuples) { LOG.info("PEEK: JdbcStateMet - UPDATESTATE"); } } public static class JdbcUpdaterTest extends BaseStateUpdater<JdbcStateMet> { private static final Logger LOG = LoggerFactory.getLogger(JdbcUpdaterTest.class); @Override public void updateState(JdbcStateMet jdbcState, List<TridentTuple> tuples, TridentCollector collector) { LOG.info("PEEK: JdbcUpdaterMet - UPDATESTATE"); jdbcState.updateState(tuples); } } }
The maven POM:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Test</groupId> <artifactId>test</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <name>${project.artifactId}</name> <repositories> <repository> <id>hortonworks</id> <url>http://repo.hortonworks.com/content/repositories/releases/</url> </repository> <repository> <id>hortonworks-Group</id> <url>http://repo.hortonworks.com/content/groups/public</url> </repository> <repository> <id>apache</id> <url>http://repository.apache.org/content/repositories/releases/</url> </repository> <repository> <id>clojars</id> <url>http://clojars.org/repo/</url> </repository> </repositories> <properties> <storm.version>1.1.0.2.6.1.0-129</storm.version> <log4j.version>1.2.17</log4j.version> </properties> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>compile</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-jdbc</artifactId> <version>${storm.version}</version> <scope>compile</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
Created 09-01-2017 04:41 PM
I have been able to run it locally but it still does not trigger the persistState in the cluster. Here is my latest test with its logs. I think it should be clearer than the one above. In this case it consumes data from Kafka and uses a partitionPersist.
public class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { Config conf = new Config(); conf.registerSerialization(AvroPackage.class, AvroPackageSerializer.class); conf.registerSerialization(AvroRecord.class, AvroRecordSerializer.class); conf.setMessageTimeoutSecs(60); TridentTopology topology = new TridentTopology(); topology.newStream("userSpout", new KafkaTridentSpoutOpaque<>(getConfiguration())).parallelismHint(1) .map(t -> { LOG.info("PEEK: " + t.getFields() + " VALUES= " + t); return new Values(1L); }, new Fields("FIELD1")) .partitionPersist(new JdbcStateFactoryMet(), new Fields("FIELD1"), new JdbcUpdaterMet()); // Uncomment for local cluster test // LocalCluster cluster = new LocalCluster(); // cluster.submitTopology("processing-TOPIC1", conf, topology.build()); // Thread.sleep(80000); // cluster.killTopology("processing-TOPIC1"); // cluster.shutdown(); // System.exit(0); StormSubmitter.submitTopology("processing-TOPIC1", conf, topology.build()); } private static KafkaSpoutConfig<String, byte[]> getConfiguration() { String bootstrapServers = "***:6667,***:6667,***:6667"; return new KafkaSpoutConfig.Builder<>( bootstrapServers, StringDeserializer.class, ByteArrayDeserializer.class, "TOPIC1") .setGroupId("storm") .setOffsetCommitPeriodMs(5_000) .setMaxUncommittedOffsets(1_000_000) .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST) .build(); } private static class JdbcUpdaterMet extends BaseStateUpdater<JdbcStateMet> { private static final Logger LOG = LoggerFactory.getLogger(JdbcUpdaterMet.class); @Override public void updateState(JdbcStateMet jdbcState, List<TridentTuple> tuples, TridentCollector collector) { LOG.info("PEEK JdbcUpdaterMet updateState"); jdbcState.updateState(tuples, collector); } } public static class JdbcStateFactoryMet implements StateFactory { private static final Logger LOG = LoggerFactory.getLogger(JdbcStateFactoryMet.class); @Override public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { LOG.info("PEEK JdbcStateFactoryMet makeState"); JdbcStateMet state = new JdbcStateMet(); return state; } } public static class JdbcStateMet implements State { private static final Logger LOG = LoggerFactory.getLogger(JdbcStateMet.class); protected JdbcStateMet() { LOG.info("PEEK JdbcStateMet"); } @Override public void beginCommit(Long aLong) { LOG.info("PEEK JdbcStateMet beginCommit"); } @Override public void commit(Long aLong) { LOG.info("PEEK JdbcStateMet commit"); } public void updateState(List<TridentTuple> tuples, TridentCollector collector) { LOG.info("PEEK JdbcStateMet updateState"); } } }
The logs look like the following in the server (missing the Persist State calls):
2017-09-01 17:25:31.435 o.a.s.s.o.a.c.f.s.ConnectionStateManager Thread-11-spout-userSpout-executor[6 6]-EventThread [INFO] State change: CONNECTED 2017-09-01 17:25:45.042 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19784, null, [B@20489cf1] 2017-09-01 17:25:45.044 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19309, null, [B@590db609] 2017-09-01 17:25:45.045 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19785, null, [B@3af3523f] 2017-09-01 17:25:45.706 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19785, null, [B@e9f8278] 2017-09-01 17:25:45.707 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19310, null, [B@6ad4284e] 2017-09-01 17:25:45.708 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19786, null, [B@5e5d66e3] 2017-09-01 17:25:46.704 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19311, null, [B@7e1fa5bd] 2017-09-01 17:25:46.705 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19787, null, [B@ce16444] 2017-09-01 17:25:46.707 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19786, null, [B@47bcabce] 2017-09-01 17:25:48.010 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19312, null, [B@7263a7a4] 2017-09-01 17:25:48.011 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19788, null, [B@71d2d618] 2017-09-01 17:25:48.014 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19787, null, [B@76164f22] 2017-09-01 17:25:48.698 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19313, null, [B@54d668f5] 2017-09-01 17:25:48.700 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19789, null, [B@3be7ab4c] 2017-09-01 17:25:48.700 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19788, null, [B@5ce2c83e]
... And in Local cluster (working as expected):
15023 [Thread-26-spout-userSpout-executor[5 5]-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 18843 [Thread-28-b-0-executor[4 4]] INFO Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19790, null, [B@5aa9ce75] 18850 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet beginCommit 18850 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState 18850 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet updateState 18850 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet commit 19516 [Thread-28-b-0-executor[4 4]] INFO Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19791, null, [B@56579121] 19523 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet beginCommit 19523 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState 19523 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet updateState 19523 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet commit 20517 [Thread-28-b-0-executor[4 4]] INFO Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19792, null, [B@3dabd4e4] 20528 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet beginCommit 20528 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState 20528 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet updateState 20528 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet commit 21513 [Thread-28-b-0-executor[4 4]] INFO Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19793, null, [B@2995ba54] 21520 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet beginCommit 21520 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState 21520 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet updateState 21520 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet commit 22511 [Thread-28-b-0-executor[4 4]] INFO Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19794, null, [B@3d5b1375] 22518 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet beginCommit 22518 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState 22518 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet updateState 22518 [Thread-28-b-0-executor[4 4]] INFO Main$JdbcStateMet - PEEK JdbcStateMet commit <br>