这两天公司要学习kafka,结合之前的storm,做了一个简单的集成,之前也参考了网上的例子一些例子,发现或多或少都有一些问题。所以自己做了一个。
成都创新互联公司于2013年成立,先为鱼台等服务建站,鱼台等地企业,进行企业商务咨询服务。为鱼台企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
这个是网上其他人遇到的问题,给摘录一下,防止以后自己和大家出现:
基本场景是应用出现错误,发送日志到kafka的某个topic,storm订阅该topic,然后进行后续处理。场景非常简单,但是在学习过程中,遇到一个奇怪的异常情况:使用KafkaSpout读取topic数据时,没有向ZK写offset数据,致使每次都从头开始读取。纠结了两天,终于碰巧找到原因:应该使用BaseBasicBolt
作为bolt的父类,而不是BaseRichBolt
。
基本订阅 :
基本场景:订阅kafka的某个topic,然后在读取的消息前加上自定义的字符串,然后写回到kafka另外一个topic。 从Kafka读取数据的Spout使用storm.kafka.KafkaSpout,向Kafka写数据的Bolt使用storm.kafka.bolt.KafkaBolt。中间进行进行数据处理的Bolt定义为TopicMsgBolt。
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import java.util.Properties; public class TopicMsgTopology { public static void main(String[] args) throws Exception { // 配置Zookeeper地址 BrokerHosts brokerHosts = new ZkHosts("localhost:2181"); // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root1", "topicMsgTopology"); // 配置KafkaBolt中的kafka.broker.properties Config conf = new Config(); Properties props = new Properties(); // 配置Kafka broker地址 props.put("metadata.broker.list", "localhost:9092"); // serializer.class为消息的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); // 配置KafkaBolt生成的topic conf.put("topic", "msgTopic2"); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("msgSentenceBolt", (IBasicBolt) new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout"); builder.setBolt("msgKafkaBolt", new KafkaBolt()).shuffleGrouping("msgSentenceBolt"); if (args.length == 0) { String topologyName = "kafkaTopicTopology"; LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology(topologyName); cluster.shutdown(); } else { conf.setNumWorkers(1); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } } }
storm.kafka.ZkHosts构造方法的参数是zookeeper标准配置地址的形式
storm.kafka.SpoutConfig构造方法第一个参数为上述的storm.kafka.ZkHosts对象,第二个为待订阅的topic名称,第三个参数zkRoot为写读取topic时的偏移量offset数据的节点(zk node),第四个参数为该节点上的次级节点名(有个地方说这个是spout的id)。 backtype.storm.Config对象是配置storm的topology(拓扑)所需要的基础配置。 backtype.storm.spout.SchemeAsMultiScheme的构造方法输入的参数是订阅kafka数据的处理参数,这里的MessageScheme是自定义的,代码如下:
import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.util.List; public class MessageScheme implements Scheme { private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class); @Override public List