Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Storm trident partitionPersist not updating state

New Contributor

I am trying to run a very simple topology with a spout that generates random data and partitionPersist to store the data into SQL.

However the StateUpdater.updateState never gets called. Am I missing something?

Originally my project reads from kafka, performs some aggregations and transformations and then saves the results into SQL. It was not working so I made this test, the simplest one that I could think of and it still doest not work.

Is there a simple demo that I can execute locally storing results into SQL?

I am using storm version 1.1.0 from HDP 2.6.1

Here are the logs (please see classes below to follow them):

PEEK: UserSpout - DECLAREOUTPUTFIELDS
PEEK: UserSpout - GETCOMPONENTCONFIGURATION
PEEK: UserSpout - GETCOMPONENTCONFIGURATION
PEEK: UserSpout - DECLAREOUTPUTFIELDS
PEEK: JdbcStateFactoryMet - MAKESTATE
PEEK: UserSpout - OPEN
PEEK: UserSpout - NEXTTUPLE - [1, peter, 1504128640671]
PEEK: UserSpout - NEXTTUPLE - [2, bob, 1504128640671]
PEEK: UserSpout - NEXTTUPLE - [2, bob, 1504128640671]
...

Here are the classes:

The spout:

public class UserSpout implements IRichSpout {    
    private SpoutOutputCollector collector;
    private static final List<Values> rows = Lists.newArrayList(
            new Values(1, "peter", System.currentTimeMillis()),
            new Values(2, "bob", System.currentTimeMillis()),
            new Values(3, "alice", System.currentTimeMillis()));

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void close() {
    }

    public void nextTuple() {
        try {
            Thread.sleep(500);
            final Random rand = new Random();
            final Values row = rows.get(rand.nextInt(rows.size() - 1));
            this.collector.emit(row);
            Thread.yield();
        } catch (InterruptedException e) {
        }
    }

    public void ack(Object msgId) {
    }

    public void fail(Object msgId) {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user_id", "user_name", "create_date"));
    }

    public void activate() {
    }

    public void deactivate() {
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

The main:

public class MainClass {

    public static void main(String[] args) throws Exception {
        TridentTopology topology = new TridentTopology();

        topology.newStream("userSpout", new UserSpout())
                .partitionPersist(new JdbcStateFactoryMet(), new Fields("user_id", "create_date"), new JdbcUpdaterTest());

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TEST", new Config(), topology.build());
    }

    public static class JdbcStateFactoryMet implements StateFactory {

        private static final Logger LOG = LoggerFactory.getLogger(JdbcStateFactoryMet.class);

        @Override
        public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
            LOG.info("PEEK: JdbcStateFactoryMet - MAKESTATE");
            return new JdbcStateMet();
        }
    }

    public static class JdbcStateMet implements State {

        private static final Logger LOG = LoggerFactory.getLogger(JdbcStateMet.class);

        @Override
        public void beginCommit(Long aLong) {
            LOG.info("PEEK: JdbcStateMet - beginCommit");
        }

        @Override
        public void commit(Long aLong) {
            LOG.info("PEEK: JdbcStateMet - commit");
        }

        public void updateState(List<TridentTuple> tuples) {
            LOG.info("PEEK: JdbcStateMet - UPDATESTATE");
        }
    }

    public static class JdbcUpdaterTest extends BaseStateUpdater<JdbcStateMet> {

        private static final Logger LOG = LoggerFactory.getLogger(JdbcUpdaterTest.class);

        @Override
        public void updateState(JdbcStateMet jdbcState, List<TridentTuple> tuples, TridentCollector collector) {
            LOG.info("PEEK: JdbcUpdaterMet - UPDATESTATE");
            jdbcState.updateState(tuples);
        }
    }
}

The maven POM:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>Test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


    <name>${project.artifactId}</name>

    <repositories>
        <repository>
            <id>hortonworks</id>
            <url>http://repo.hortonworks.com/content/repositories/releases/</url>
        </repository>
        <repository>
            <id>hortonworks-Group</id>
            <url>http://repo.hortonworks.com/content/groups/public</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>http://repository.apache.org/content/repositories/releases/</url>
        </repository>
        <repository>
            <id>clojars</id>
            <url>http://clojars.org/repo/</url>
        </repository>
    </repositories>

    <properties>
        <storm.version>1.1.0.2.6.1.0-129</storm.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-jdbc</artifactId>
            <version>${storm.version}</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>    
</project>
1 REPLY 1

New Contributor

I have been able to run it locally but it still does not trigger the persistState in the cluster. Here is my latest test with its logs. I think it should be clearer than the one above. In this case it consumes data from Kafka and uses a partitionPersist.

public class Main {
    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.registerSerialization(AvroPackage.class, AvroPackageSerializer.class);
        conf.registerSerialization(AvroRecord.class, AvroRecordSerializer.class);
        conf.setMessageTimeoutSecs(60);

        TridentTopology topology = new TridentTopology();

        topology.newStream("userSpout", new KafkaTridentSpoutOpaque<>(getConfiguration())).parallelismHint(1)
                .map(t -> {
                    LOG.info("PEEK: " + t.getFields() + " VALUES= " + t);
                    return new Values(1L);
                }, new Fields("FIELD1"))
                .partitionPersist(new JdbcStateFactoryMet(), new Fields("FIELD1"), new JdbcUpdaterMet());


// Uncomment for local cluster test
//        LocalCluster cluster = new LocalCluster();
//        cluster.submitTopology("processing-TOPIC1", conf, topology.build());
//        Thread.sleep(80000);
//        cluster.killTopology("processing-TOPIC1");
//        cluster.shutdown();
//        System.exit(0);

        StormSubmitter.submitTopology("processing-TOPIC1", conf, topology.build());
    }

    private static KafkaSpoutConfig<String, byte[]> getConfiguration() {
        String bootstrapServers = "***:6667,***:6667,***:6667";
        return new KafkaSpoutConfig.Builder<>(
                bootstrapServers, StringDeserializer.class, ByteArrayDeserializer.class, "TOPIC1")
                .setGroupId("storm")
                .setOffsetCommitPeriodMs(5_000)
                .setMaxUncommittedOffsets(1_000_000)
                .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
                .build();
    }

    private static class JdbcUpdaterMet extends BaseStateUpdater<JdbcStateMet> {

        private static final Logger LOG = LoggerFactory.getLogger(JdbcUpdaterMet.class);

        @Override
        public void updateState(JdbcStateMet jdbcState, List<TridentTuple> tuples, TridentCollector collector) {
            LOG.info("PEEK JdbcUpdaterMet updateState");
            jdbcState.updateState(tuples, collector);
        }
    }

    public static class JdbcStateFactoryMet implements StateFactory {

        private static final Logger LOG = LoggerFactory.getLogger(JdbcStateFactoryMet.class);

        @Override
        public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
            LOG.info("PEEK JdbcStateFactoryMet makeState");
            JdbcStateMet state = new JdbcStateMet();
            return state;
        }
    }

    public static class JdbcStateMet implements State {

        private static final Logger LOG = LoggerFactory.getLogger(JdbcStateMet.class);

        protected JdbcStateMet() {
            LOG.info("PEEK JdbcStateMet");
        }

        @Override
        public void beginCommit(Long aLong) {
            LOG.info("PEEK JdbcStateMet beginCommit");
        }

        @Override
        public void commit(Long aLong) {
            LOG.info("PEEK JdbcStateMet commit");
        }

        public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
            LOG.info("PEEK JdbcStateMet updateState");
        }
    }
}

The logs look like the following in the server (missing the Persist State calls):

2017-09-01 17:25:31.435 o.a.s.s.o.a.c.f.s.ConnectionStateManager Thread-11-spout-userSpout-executor[6 6]-EventThread [INFO] State change: CONNECTED
2017-09-01 17:25:45.042 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19784, null, [B@20489cf1]
2017-09-01 17:25:45.044 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19309, null, [B@590db609]
2017-09-01 17:25:45.045 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19785, null, [B@3af3523f]
2017-09-01 17:25:45.706 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19785, null, [B@e9f8278]
2017-09-01 17:25:45.707 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19310, null, [B@6ad4284e]
2017-09-01 17:25:45.708 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19786, null, [B@5e5d66e3]
2017-09-01 17:25:46.704 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19311, null, [B@7e1fa5bd]
2017-09-01 17:25:46.705 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19787, null, [B@ce16444]
2017-09-01 17:25:46.707 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19786, null, [B@47bcabce]
2017-09-01 17:25:48.010 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19312, null, [B@7263a7a4]
2017-09-01 17:25:48.011 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19788, null, [B@71d2d618]
2017-09-01 17:25:48.014 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19787, null, [B@76164f22]
2017-09-01 17:25:48.698 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 1, 19313, null, [B@54d668f5]
2017-09-01 17:25:48.700 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19789, null, [B@3be7ab4c]
2017-09-01 17:25:48.700 Main Thread-15-b-0-executor[5 5] [INFO] PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 0, 19788, null, [B@5ce2c83e]

... And in Local cluster (working as expected):

15023 [Thread-26-spout-userSpout-executor[5 5]-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
18843 [Thread-28-b-0-executor[4 4]] INFO  Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19790, null, [B@5aa9ce75]
18850 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet beginCommit
18850 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState
18850 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet updateState
18850 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet commit
19516 [Thread-28-b-0-executor[4 4]] INFO  Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19791, null, [B@56579121]
19523 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet beginCommit
19523 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState
19523 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet updateState
19523 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet commit
20517 [Thread-28-b-0-executor[4 4]] INFO  Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19792, null, [B@3dabd4e4]
20528 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet beginCommit
20528 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState
20528 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet updateState
20528 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet commit
21513 [Thread-28-b-0-executor[4 4]] INFO  Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19793, null, [B@2995ba54]
21520 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet beginCommit
21520 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState
21520 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet updateState
21520 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet commit
22511 [Thread-28-b-0-executor[4 4]] INFO  Main - PEEK: [topic, partition, offset, key, value] VALUES= [TOPIC1, 2, 19794, null, [B@3d5b1375]
22518 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet beginCommit
22518 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcUpdaterMet - PEEK JdbcUpdaterMet updateState
22518 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet updateState
22518 [Thread-28-b-0-executor[4 4]] INFO  Main$JdbcStateMet - PEEK JdbcStateMet commit
<br>
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.