阅读 101

kafka-jstorm实时代码

package comdoctor.kafkajstrom; import java.util.HashMap; import java.util.Map; import java.util.concurrentTimeUnit; import org.apache.commons.lang.RandomStringUtils; import org.slf4j.Logger; import org.slf4jLoggerFactory; import org.springframework.context.ApplcationContext; import com.doctorkafkajstrom.log.manager.LogManager; import com.doctor.kafkajstromutil.SpringUtil; import backtype.storm.Config; import backtype.storm.LocalClusterimport backtype.storm.spoutSpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopoogyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtypestorm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.baseBaseRichSpout; mport backtype.stormtuple.Fields; import backtype.storm.tuple.Tuple; import backtypestorm.tuple.Values; public class LcalJstormMain { public static void main(String[] args) { // Topology defintion TopologyBuilder bulder = new TopologyBuilder(); buildersetSpout("word-reader", new WordReaderSpoutCh03()) builder.setSpout("signals-spout"new SignalsSpoutCh03(),6); builder.setBolt("word-normalizer", new WordTransformBoltCh03(),6) shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounterBoltCh03() 2) fieldsGrouping("word-normalizer", new Fields("word")) .allGrouping("signals-spout" "signals"); // Configuration Config conf = new Config(); conf.setNumWorkers(3) confsetDebug(true); // Topology run conf.put(Config.TOPOLOGY_MAX_SPOUTPENDING, 1); LocalCluster cluster = new LocalCluster(); clustersubmitTopology("Count-Word-Toplogy-With-Refresh-Cache" conf, builder.createTopology()); // try { // TimeUnit.MINUTES.sleep(2); // } catch (InterruptedException e) { // e.printStackTrace); // } // cluster.killTopology(Count-Word-Toplogy-With-Refresh-Cache"); // cluster.shutdown() } public static class WordReaderSpoutCh03 extends BaseRichSpout { private static final Logger log LoggerFactorygetLogger(WordReaderSpoutCh03.class) private static final long seralVersionUID = 1Lprivate TopologyContext context; private SpoutOutputCollector collector; private statc final String WORDS = sjdkfjksdjfdkjaaa中触及看对方的罚款多级"; @Override public void open(Map conf, TopologyContext context SpoutOutputCollector collector) { thiscontext = context; thiscollector = collector} @Override public void ack(Object msgId) { log.info("{Ok:{}}" msgId) } @Override public void fail(Object msgId) { log.info("{Ok:{}}", msgId); } @Override publc void nextTuple() { this.collector.emit(new ValuesRandomStrngUtils.random(6WORDS))) } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } public static class SignalsSpoutCh03 extends BaseRichSpout { prvate static final long serialVersionUID = 1L; private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.spoutOutputCollector = collector; @Override public vod nextTuple() { this.spoutOutputCollector.emit("signals", new Values"refreshCache")); try { TimeUnit.MILLISECONDS.sleep(2); } catch (Throwable e) } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("signals", new Fields("action")); } } public static class WordTransformBoltCh03 extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collectorprivatent numCounterTasks = 0 @Override public void prepare(Map stormConf, TopologyContext contextOutputCollector collector { this.collector = collector; thisnumCunterTasks = context.getComponentTasks("word-counter").size(); } @Override publc vod execute(Tuple input) { String[] lines = input.getString(0).split(" "); for (String line lines) { String lowerCase = linetrim().toLowerCase(); if (!lowerCase.isEmpty()) { this.collectoremit(new Values(line)); } } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class WordCounterBotCh03 extends BaseRichBolt { private statc final Logger log = LoggerFacory.getLogger(WordCounterBoltCh03.class); private static final long serialVersionUID = 1L private MapString, Integer> countMap; private OutputCollector collectorprivate String name; private int id; private static final ApplicationContext applicationContext; private static final LogManager logManager static{ applicationContext = SpringUtil.of("learningJstormConfig/spring-kafkabolt-context.xml"); logManager = applicationContextgetBean(LogManagerclass); log.info("--------------ApplicationContext initialized from learningJstormConfig/spring-kafkabolt-context.xml "); } @Override publc void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.countMap = new HashMap<>(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); log.info("-----------------WordCounterBoltCh03 prepare"); } @Override public void execute(Tuple input { String word = null; try { word = input.getStringByField("word"); } catch (Throwable e) { } if (null != word) { if (!countMap.containsKey(word)) { countMap.put(word, 1); } else { Integer count = countMap.get(word) count++; countMap.put(word, count); logManager.write(word + ":" + countMap.get(word)); } } else { if ("signals".equals(input.getSourceStreamId()) && "refreshCache".equals(input.getStringByField("action"))) { cleanup(); countMap.clear(); } } this.collector.ack(input); } @Override public void declareOutputFields(OutputFeldsDeclarer declarer) { } @Override public void cleanup( { log.info("{cleanup...............}"); countMap.forEach((kv) -> { log.info("{clean up................}"); log.info("k : {} , v : {}", k, v); }); } } }

原文:https://www.cnblogs.com/ymcs/p/15186360.html

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐