189 8069 5689

IBatchSpoutAPI怎么使用

这篇文章主要介绍“IBatchSpout API怎么使用”,在日常操作中,相信很多人在IBatchSpout API怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”IBatchSpout API怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联建站长期为1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为鹿邑企业提供专业的成都网站设计、成都网站建设,鹿邑网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。

IBatchSpout是storm trident推出的一种可以批量发射的Spout。非事务性,基本的spout

1:Map getComponentConfiguration();定义配置,可以用backtype.storm.Config。

2:void open(Map conf, TopologyContext context); Spout的初始化方法 ,参数conf即是getComponentConfiguration定义的配置

3:Fields getOutputFields(); 声明输出的fields

4:void emitBatch(long batchId, TridentCollector collector); 批量发射tuple,本次的批次号为batchId

5:void ack(long batchId);批次号为batchId的数据处理成功

6:  void close();

一个例子

package storm.projectA;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IBatchSpout{
 /**
  * 
  */
 private static final long serialVersionUID = 1L;
 private long maxBatchSize;//每批次最大的数量
 private BufferedReader br;//源文件流
 HashMap>> batches = new HashMap>>();//保存发送过的所有数据,以便于重复发送
 /**
  * @param conf 配置
  * @param context 
  */
 @Override
 public void open(Map conf, TopologyContext context) {
  String filePath = (String)conf.get("filePath");
  maxBatchSize = (Long)conf.get("maxBatchSize");
  try {
   br = new BufferedReader(new FileReader(filePath));
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }
 }
 /*** spout的发送方法
  * @param batchId 批次id
  * @param collector 批量发射器
  */
 @Override
 public void emitBatch(long batchId, TridentCollector collector) {
  List> batch = batches.get(batchId);
  if (batch == null) {
   batch = new ArrayList>();
   for (int i = 0; i < maxBatchSize; i++) {
    try {
     String line = br.readLine();
     if(line == null){
      break;
     }
     batch.add(new Values(line));
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  for(List list : batch){
            collector.emit(list);
        }
 }
 @Override
 public void ack(long batchId) {
  batches.remove(batchId);
 }
 /**
  * close 方法
  */
 @Override
 public void close() {
  if(br!=null){
   try {
    br.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
  
 }
 @Override
 public Map getComponentConfiguration() {
  Config conf = new Config();
  //最大并行度 本地模式设置为1
  conf.setMaxTaskParallelism(1);
  conf.put("filePath", "D:\\aaa.txt");
  conf.put("maxBatchSize", 2);
  return conf;
 }
 /**
  * 输出的fileds
  */
 @Override
 public Fields getOutputFields() {
  return new Fields("sentence");
 }
}

到此,关于“IBatchSpout API怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


网页题目:IBatchSpoutAPI怎么使用
文章网址:http://cdxtjz.cn/article/pdiisd.html

联系我们

您好HELLO!
感谢您来到成都网站建设公司,若您有合作意向,请您为我们留言或使用以下方式联系我们, 我们将尽快给你回复,并为您提供真诚的设计服务,谢谢。
  • 电话:028- 86922220 18980695689
  • 商务合作邮箱:631063699@qq.com
  • 合作QQ: 532337155
  • 成都网站设计地址:成都市青羊区锣锅巷31号五金站写字楼6楼

小谭建站工作室

成都小谭网站建设公司拥有多年以上互联网从业经验的团队,始终保持务实的风格,以"帮助客户成功"为已任,专注于提供对客户有价值的服务。 我们已为众企业及上市公司提供专业的网站建设服务。我们不只是一家网站建设的网络公司;我们对营销、技术、管理都有自己独特见解,小谭建站采取“创意+综合+营销”一体化的方式为您提供更专业的服务!

小谭观点

相对传统的成都网站建设公司而言,小谭是互联网中的网站品牌策划,我们精于企业品牌与互联网相结合的整体战略服务。
我们始终认为,网站必须注入企业基因,真正使网站成为企业vi的一部分,让整个网站品牌策划体系变的深入而持久。