Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to pass particular values from bolt to topology ?

How to pass particular values from bolt to topology ?

in bolt

public class ReadTopicBolt extends BaseBasicBolt {

OutputCollector _collector;

TweetBean tweetBean = new TweetBean();

public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub try { String topic = tuple.getString(0);

JSONParser jsonParser = new JSONParser();

JSONObject jsonObject = (JSONObject) jsonParser.parse(topic);

if (nullHandler.checkNull(jsonObject .get(StormTwitterProcessingPipelineConstants.CREATED_AT))) { String created_at = (String) jsonObject.get("created_at").toString();

tweetBean.setCreated_at(created_at);

SimpleDateFormat sdf = new SimpleDateFormat("EE MMM dd HH:mm:ss z yyyy", Locale.ENGLISH);

Date parsedDate = sdf.parse(created_at);

SimpleDateFormat print = new SimpleDateFormat("MMM d, yyyy HH:mm:ss"); logger.info(print.format(parsedDate));

int epoch = (int) (parsedDate.getTime() / 1000); tweetBean.setEpoch_time(epoch);

} collector.emit(new Values( jsonObject.get("created_at"))); }

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub

declarer.declare(new Fields( "created_at"));}}

In above BOLT we are able to get epoch time at created_at attribute.

HOW TO PASS EPOCH TIME FROM BOLT IN TOPOLOGY?

we have tried this.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("message-spout", kafkaSpout);

builder.setBolt("message-bolt", new ReadTopicBolt()).shuffleGrouping("message-spout");

builder.setBolt("message-bolt-epoch", new ReadTopicBolt()).shuffleGrouping("message-bolt"); builder.setBolt("message-bolt-epoch", new ReadTopicBolt()) .fieldsGrouping("message-bolt", new Fields("Epoch_time"));

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("KafkaStorm", config, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown(); } }

1 REPLY 1
Highlighted

Re: How to pass particular values from bolt to topology ?

Explorer

Output of bolts to go other bolts or other storages. For example HdfsBolt can write to HDFS. KafkaBolt can write to kafka and so on.

Don't have an account?
Coming from Hortonworks? Activate your account here