189 8069 5689

Flinkkafka定制技巧-创新互联

动态路由:
方案1: 定制一个特殊的KafkaDynamicSink,内嵌多个原生的FlinkKafkaProducer,每个对应一个下游的KAFKA队列
在OPEN方法中读取所有KAFKA渠道配置并构建FlinkKafkaProducer并构建一个Map: kafka channelId -> FlinkKafkaProducer

重载INVOKE方法
根据路由规则找到当前流数据对应所有的ChannelId (允许多个),再从MAP中获取对 FlinkKafkaProducer 并调用其INVOKE方法

核心代码:
public class DynamicKafkaSink extends RichSinkFunction {
   @Override
   public void open(Configuration parameters) throws Exception {
      List allChannels = channelRepository.getAll();
      for(ChannelModel nextChannel: allChannels) {
         FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010)channelFactory.createChannelProducer(nextChannel,
         FlinkKafkaProducer010.class, Collections.emptyMap());
         nextProducer.setRuntimeContext(this.getRuntimeContext());
         nextProducer.open(parameters);
         producers.put(nextChannel.getChannelId(), nextProducer);
      }
   }

   @Override
   public void invoke(IN value) throws Exception {
      List channelIds = channelRouteStrategy.route(value);
      for (String nextChannelId: channelIds) {
         FlinkKafkaProducer010 nextProducer = producers.get(nextChannelId);
         nextProducer.invoke(converted);
      }
   }

}

注意:
Map不能在构造函数中初始化,而要在OPEN方法中初始化,FLINK分布式特性决定了构造函数和OPEN不在同一个JVM里执行
类级别的变量需要可序列化,否则需要声明为TRANSIENT

每个新构建的FlinkKafkaProducer需要先调用
setRuntimeContext(this.getRuntimeContext())
再调用open 方法才能被使用

优点:
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔离性更好

缺陷:
所有的FlinkKafkaProducer只在OPEN的时候创建一次,后面如果添加了新的KAFKA队列无法被动态感知并路由
更改了FlinkKafkaProducer创建和初始化的过程,从MAIN函数中转到了KafkaDynamicSink的OPEN方法里,未经过全面测试,可能存在问题

方案2:方案1的升级版,利用FLINK SPLIT STREAM的特性,根据路由规则将原生数据流分成多个,每个子数据流对应一个下游KAFKA队列
在FLINK Main 函数中读取所有KAFKA渠道配置并构建FlinkKafkaProducer并构建一个Map: kafka channelId -> FlinkKafkaProducer
在输入流上构建一个SplitStream, OutputSelector 中根据路由逻辑返回一组ChannelId
遍历Map,对于Map中的每个Key (ChannelID) 调用 SplitStream 的 select方法获取对应的分支流数据,然后路由到对应的 FlinkKafkaProducer

核心代码:
public static void main(String[] args) {
   List allChannels = channelRepository.getAll();
   for(ChannelModel nextChannel: allChannels) {
      FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010)channelFactory.createChannelProducer(nextChannel,
      FlinkKafkaProducer010.class, Collections.emptyMap());
      nextProducer.setRuntimeContext(this.getRuntimeContext());
      nextProducer.open(parameters);
      producers.put(nextChannel.getChannelId(), nextProducer);
   }

   DataStreamSource source = ....
   SplitStream splitStream = source.split(new OutputSelector() {

      @Override
      public Iterable select(String value) {
         List channelIds = channelRouteStrategy.route(value);
         return channeIds;
      }
   });

   for(String nextChannel: producers.keySet()) {
      FlinkKafkaProducer010 target = producers.get(nextChannel);
      splitStream.select(nextChannel).addSink(target);
   }
}

优点:
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔离性更好
完全利用FLINK原生的特性,更加简洁优雅,解决了方案1的第二点不足

缺陷:
所有的FlinkKafkaProducer只在MAIN函数中创建一次,后面如果添加了新的KAFKA队列无法被动态感知并路由

方案3: 利用FLINK的 KeyedSerializationSchema中的getTargetTopic函数,KeyedSerializationSchema 除了将对象转化Kafka ProducerRecord
的键值对之外还可以动态指定Topic
在FLINK Main 函数中将输入流通过flatMap 转化为 Tuple2, 其中key 是目标所属的Topic, value 是原生数据
实现一个KeyedSerializationSchema作为构造函数传给FlinkKafkaProducer,重载getTargetTopic方法: 返回 tuple2.f0

核心代码:
class DynaRouteSerializationSchema implements KeyedSerializationSchema {

   String getTargetTopic(T element) {
      Tuple2 tuple = (Tuple2)element;
      return tuple.f0;
   }
}

public static void main(String[] args) {
   DataStreamSource source = ....
   DataStream> converted = source
   .flatMap(new RichFlatMapFunction>() {
      @Override
      public void flatMap(T value, Collector> out)
      throws Exception {
         List channelIds = channelRouteStrategy.route(value);
         for(String nextChannel: channelIds) {
            out.collect(Tuple2.valueOf(nextChannel, value));
         }
      }
   });

}

优点:
完全利用FLINK原生的特性,代码量非常少
新增加的TOPIC也可以被路由到,不需要启停流处理

缺陷:
无法像前两个方案实现Broker级别的路由,只能做到Topic级别的路由

断流功能:

创新互联建站沟通电话:13518219792,为您提供成都网站建设网页设计及定制高端网站建设服务,创新互联建站网页制作领域10多年,包括楼梯护栏等多个行业拥有丰富的网站推广经验,选择创新互联建站,为企业保驾护航。

有时系统升级或者其他组件不可用,需要暂时停止KAFKA PRODUCER
FLINK 原生机制:
被动反压:
Kafka09Fetcher 包含了一根独立的 KafkaConsumerThread,从KAFKA中读取数据,再交给HANDOVER
HANDOVER可以理解为一个大小为1的队列, Kafka09Fetcher 再从队列中获取并处理数据,一旦当处理速度变慢,KafkaConsumerThread
无法将数据写入HANDOVER, 线程就会被阻塞

另外KeyedDeserializationSchema定义了一个isEndOfStream方法,如果返回true, Kafka09Fetcher就会停止循环并退出,导致整个流处理结束

设计思路:

SignalService: 注册SignalListener, 利用Curator TreeCache 监听一个Zookeeper 路径获取起动/停止流处理的信号量

SignalListener: 接收ZOOKEEPER变更信息的回调接口

PausableKafkaFetcher: 继承Flink原生的KafkaFetcher, 监听到信号变化阻塞ConsumerThread的处理

PausableKafkaConsumer: 继承Flink原生的KafkaConsumer, 创建PausableKafkaFetcher

核心代码:

public class PausableKafkaFetcher extends Kafka010Fetcher implements SignalListener {

    private final ReentrantLock pauseLock = new ReentrantLock(true);

    private final Condition pauseCond = pauseLock.newCondition();

   private volatile boolean paused = false;

  public void onSignal(String path, String value) {

      try {

           pauseLock.lockInterruptibly();

      } catch(InterruptedException e) {

      }

      try {

          if (SIGNAL_PAUSE.equals(value)) {

              paused = true;

          } else if (SIGNAL_START.equals(value)) {

              paused = false;

      }

      pauseCond.signal();

      }

     finally {

          pauseLock.unlock();

    }

  }

  protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception {

     super.emitRecord(record, partition, offset, consumerRecord);

     pauseLock.lockInterruptibly();

     try {

        while (paused) {

            pauseCond.await();

        }

     } finally {

        pauseLock.unlock();

     }

 }

}

public class PausableKafkaConsumer extends FlinkKafkaConsumer010 {

    public void open(Configuration configuration) {

        signalService = ZKSignalService.getInstance();

       signalService.initialize(zkConfig);

    }

   public void cancel() {

        super.cancel();

        unregisterSignal();

   }

   public void close() {

      super.close();

       unregisterSignal();

    }

   private void unregisterSignal() {

       if (signalService != null) {

           String fullPath = WATCH_PREFIX + "/" + watchPath;

           signalService.unregisterSignalListener(fullPath);

        }

    }

    protected AbstractFetcher createFetcher(...) throws Exception {

       PausableKafkaFetcher fetcher = new PausableKafkaFetcher<> (...);

       if (signalService != null) {

           String fullPath = WATCH_PREFIX + "/" + watchPath;

           signalService.registerSignalListener(fullPath, fetcher);

       }

       return fetcher

    }

}

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


标题名称:Flinkkafka定制技巧-创新互联
链接URL:http://cdxtjz.cn/article/epces.html

其他资讯