189 8069 5689

spring-kafka多线程顺序消费

业务场景

创新互联是一家专注于成都网站制作、做网站与策划设计,宾川网站建设哪家好?创新互联做网站,专注于网站建设10年,网设计领域的专业建站公司;建站业务涵盖:宾川等地区。宾川做网站价格咨询:028-86922220

我们公司是做共享充电宝的业务的。有一些比较大的代理商或者ka商户,他们需要了解到他们自己下面的商户的订单数据,这些订单数据需要由我们推送给他们。

大致架构为数据部门通过canal订阅订单表的数据,然后推送到kafka ,我们订阅数据部门kafka获取到代理商下商户的实时订单数据再推送给代理商。比如,代理商下商户产生了一笔订单,整个过程会产生,订单生成,订单已支付,充电宝已被取走,充电宝已归还等多种状态的订单消息,我们需要实时把这些订单消息推送给代理商。我们的业务场景需要消息的顺序推送和多线程并发消费以提高性能

kafka多线程消费方案

消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,负责完整的消息获取、消息处理

流程。如下图所示:

消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以 是一个,也可以是多个,每个线程维护专属的KafkaConsumer实例,处理消息则交由特定的线程池来 做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

这两种方案孰优孰劣呢?应该说是各有千秋。这两种方案的优缺点,我们先来看看下面这张表格。

kafka怎么保证顺序消费

保证顺序消费,需要满足如下条件

保证相同订单编号的消息需要发送到同一个分区。

@Configuration

public class SenderConfig {

@Value("${kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

@Bean

public Sender sender() {

return new Sender();

}

}

public class Sender {

@Autowired

private KafkaTemplate kafkaTemplate;

public void send(String topic, String data) {

kafkaTemplate.send(topic, data);

}

public void send(String topic, int partition, String data) {

kafkaTemplate.send(topic, partition, data);

}

}

@RunWith(SpringRunner.class)

@SpringBootTest

public class SpringKafkaApplicationTest {

private static String BATCH_TOPIC = "batch.t";

private static Integer PARTITIONS = 6;

/**

* 已支付

*/

private static Integer PAYED_STATUS = 2;

/**

* 已取走

*/

private static Integer SEND_BACK_STATUS = 3;

@Autowired

private Sender sender;

private static DelayQueue delayQueue = new DelayQueue();

@Test

public void testReceive() throws Exception {

for (int i = 1; i < 50; i++) {

Integer orderNum = 800010 + i;

Integer orderPrice = RandomUtil.randomInt(1, 20);

// 用户支付成功,订单状态为支付成功

OrderDTO order = new OrderDTO(orderNum, orderPrice, PAYED_STATUS);

// 发送支付成功订单消息到对应的kafka分区

Integer destinationPartition = orderNum % PARTITIONS;

sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(order));

// 创建任务放入延迟队列(模拟用户支付成功到取走充电宝花费的时间)

long delayTime = 200;

OrderTask orderTask = new OrderTask(delayTime, order);

delayQueue.offer(orderTask);

}

while (true) {

// 用户取走充电宝,订单状态更改为 已取走

OrderTask orderTask = (OrderTask) delayQueue.take();

OrderDTO orderDTO = orderTask.getOrderDTO();

Integer destinationPartition = orderDTO.getOrderNum() % PARTITIONS;

orderDTO.setOrderStatus(SEND_BACK_STATUS);

// 发送已取走订单消息到对应的kafka 分区

sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(orderDTO));

}

}

}

可以看出我们通过订单号对分区数进行取余,来确定该消息发送到哪一个分区,保证相同订单号的消息被发送到相同的分区。当然也可以对字符串这些进行hash ,获得hash值来对分区数取余

Integer destinationPartition=orderDTO.getOrderNum()%PARTITIONS;

保证同一个分区的消息由同一个线程来消费。

我们的业务场景需要采用多线程方案一来处理我们的业务

普通方式实现方案一

public class KafkaConsumerRunner implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer consumer;

public KafkaConsumerRunner(KafkaConsumer consumer) {

this.consumer = consumer;

}

@Override

public void run() {

try {

consumer.subscribe(Arrays.asList("topic"));

while (!closed.get()) {

// 执行消息处理逻辑

ConsumerRecords records = consumer.poll(10000);

}

} catch (Exception e) {

// Ignore exception if closing

if (!closed.get()) {

throw e;

}

} finally {

consumer.close();

}

}

/**

* Shutdown hook which can be called from a separate thread

*/

public void shutdown() {

closed.set(true);

consumer.wakeup();

}

}

spring-kafka为我们做的封装

消费者相关配置:

这里我们需要注意的是factory.setConcurrency(4)。

这个是配置主要是设置KafkaConsumer的数量,最大为topic 的分区数。当然你如果设置的值超过topic 分区数,spring-kafka 还是只会为我们创建最大分区数的KafkaConsumer数量,也就是创建KafkaConsumer数量能少于分区数,但不会超过分区数。少于分区数的话,一个KafkaConsumer会消费多个分区的数据,保证所有的分区数据都有对应的KafkaConsumer来进行消费;但不会出现多个KafkaConsumer消费同一个分区的情况,因为如果是这样也就无法保证消息的顺序消费机制。

一般情况下如果数据量较大,我们需要把此值设置为topic分区数,这样一个KafkaConsumer消费一个分区的数据,提高数据的并发消费能力。

@Configuration

@EnableKafka

public class ReceiverConfig {

@Value("${kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map consumerConfigs() {

Map props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");

// maximum records per poll

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

return props;

}

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean(name = "kafkaListenerContainerFactory")

public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// enable batch listening

factory.setBatchListener(true);

factory.setConcurrency(4);

return factory;

}

@Bean

public Receiver receiver() {

return new Receiver();

}

}

Receiver 代码

public class Receiver {

@Autowired

private PushOrderService pushOrderService;

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

private static final String BATCH_TOPIC = "batch.t";

@KafkaListener(topics = BATCH_TOPIC, containerFactory = "kafkaListenerContainerFactory")

public void receivePartitions(List data,

@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,

@Header(KafkaHeaders.OFFSET) List offsets) {

for (int i = 0; i < data.size(); i++) {

Long threadId = Thread.currentThread().getId();

// 向第三方推送订单消息

String orderStr = data.get(i);

pushOrderService.pushOrderToPlatform(orderStr);

OrderDTO orderDTO = JSONUtil.toBean(orderStr, OrderDTO.class);

LOGGER.info("推送订单消息成功,订单号为:{},状态为:{},分区为{},处理线程为:{}", orderDTO.getOrderNum(), orderDTO.getOrderStatus(), partitions.get(i), threadId);

}

}

}

/**

* 模拟网络推送订单信息给第三方平台

*/

@Service

public class PushOrderService {

/**

* 已支付

*/

private static Integer PAYED_STATUS = 2;

public void pushOrderToPlatform(String orderString) {

// 模拟网络推送订单信息给第三方平台(同步推送)

OrderDTO orderDTO = JSONUtil.toBean(orderString, OrderDTO.class);

// 已支付 订单消息

if (orderDTO.getOrderStatus().equals(PAYED_STATUS)) {

ThreadUtil.sleep(500);

} else {

// 已取走 订单消息

ThreadUtil.sleep(200);

}

}

}

测试结果: 无锡做人流手术多少钱 http://www.ytsg029.com/

16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800014,状态为:2,分区为4,处理线程为:67

16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800012,状态为:2,分区为2,处理线程为:66

16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800020,状态为:2,分区为4,处理线程为:67

16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800018,状态为:2,分区为2,处理线程为:66

16:17:48.035 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800026,状态为:2,分区为4,处理线程为:67

16:17:48.036 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800024,状态为:2,分区为2,处理线程为:66

16:17:48.537 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800015,状态为:2,分区为5,处理线程为:67

16:17:48.539 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800016,状态为:2,分区为0,处理线程为:66

16:17:49.044 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800022,状态为:2,分区为0,处理线程为:66

16:17:49.045 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800021,状态为:2,分区为5,处理线程为:67

16:17:49.546 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800013,状态为:2,分区为3,处理线程为:67

16:17:49.547 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800028,状态为:2,分区为0,处理线程为:66

16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800019,状态为:2,分区为3,处理线程为:67

16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800011,状态为:2,分区为1,处理线程为:66

16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800025,状态为:2,分区为3,处理线程为:67

16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800017,状态为:2,分区为1,处理线程为:66

16:17:51.060 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800023,状态为:2,分区为1,处理线程为:66

16:17:51.576 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800034,状态为:2,分区为0,处理线程为:66

16:17:51.579 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800031,状态为:2,分区为3,处理线程为:67

16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800032,状态为:2,分区为4,处理线程为:70

16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800027,状态为:2,分区为5,处理线程为:72

16:17:52.079 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800040,状态为:2,分区为0,处理线程为:66

16:17:52.083 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800037,状态为:2,分区为3,处理线程为:67

16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800038,状态为:2,分区为4,处理线程为:70

16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800033,状态为:2,分区为5,处理线程为:72

16:17:52.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800046,状态为:2,分区为0,处理线程为:66

16:17:52.588 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800043,状态为:2,分区为3,处理线程为:67

16:17:52.589 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800044,状态为:2,分区为4,处理线程为:70

16:17:52.590 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800039,状态为:2,分区为5,处理线程为:72

16:17:53.089 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800052,状态为:2,分区为0,处理线程为:66

16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800050,状态为:2,分区为4,处理线程为:70

16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800049,状态为:2,分区为3,处理线程为:67

16:17:53.095 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800045,状态为:2,分区为5,处理线程为:72

16:17:53.591 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800058,状态为:2,分区为0,处理线程为:66

16:17:53.592 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800056,状态为:2,分区为4,处理线程为:70

16:17:53.593 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800055,状态为:2,分区为3,处理线程为:67

16:17:53.600 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800051,状态为:2,分区为5,处理线程为:72

16:17:53.795 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800016,状态为:3,分区为0,处理线程为:66

16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800013,状态为:3,分区为3,处理线程为:67

16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800014,状态为:3,分区为4,处理线程为:70

16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800020,状态为:3,分区为4,处理线程为:70

16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800019,状态为:3,分区为3,处理线程为:67

16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800022,状态为:3,分区为0,处理线程为:66

16:17:54.101 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800057,状态为:2,分区为5,处理线程为:72

16:17:54.205 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800026,状态为:3,分区为4,处理线程为:70

16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800025,状态为:3,分区为3,处理线程为:67

16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800028,状态为:3,分区为0,处理线程为:66

16:17:54.306 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800015,状态为:3,分区为5,处理线程为:72

16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800037,状态为:3,分区为3,处理线程为:67

16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800034,状态为:3,分区为0,处理线程为:66

16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800032,状态为:3,分区为4,处理线程为:70

16:17:54.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800021,状态为:3,分区为5,处理线程为:72

16:17:54.614 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800031,状态为:3,分区为3,处理线程为:67

16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800046,状态为:3,分区为0,处理线程为:66

16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800038,状态为:3,分区为4,处理线程为:70

16:17:54.711 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800027,状态为:3,分区为5,处理线程为:72

16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800043,状态为:3,分区为3,处理线程为:67

16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800040,状态为:3,分区为0,处理线程为:66

16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800056,状态为:3,分区为4,处理线程为:70

16:17:54.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800033,状态为:3,分区为5,处理线程为:72

16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800044,状态为:3,分区为4,处理线程为:70

16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800052,状态为:3,分区为0,处理线程为:66

16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800055,状态为:3,分区为3,处理线程为:67

16:17:55.118 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800039,状态为:3,分区为5,处理线程为:72

16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800050,状态为:3,分区为4,处理线程为:70

16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800049,状态为:3,分区为3,处理线程为:67

16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800058,状态为:3,分区为0,处理线程为:66

16:17:55.321 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800057,状态为:3,分区为5,处理线程为:72

16:17:55.525 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800051,状态为:3,分区为5,处理线程为:72

16:17:55.728 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800045,状态为:3,分区为5,处理线程为:72

16:17:55.735 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800029,状态为:2,分区为1,处理线程为:66

16:17:55.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800030,状态为:2,分区为2,处理线程为:67

16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800036,状态为:2,分区为2,处理线程为:67

16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800035,状态为:2,分区为1,处理线程为:66

16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800042,状态为:2,分区为2,处理线程为:67

16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800041,状态为:2,分区为1,处理线程为:66

16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800047,状态为:2,分区为1,处理线程为:66

16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800048,状态为:2,分区为2,处理线程为:67

16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800053,状态为:2,分区为1,处理线程为:66

16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800054,状态为:2,分区为2,处理线程为:67

16:17:57.953 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800012,状态为:3,分区为2,处理线程为:67

16:17:58.159 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800018,状态为:3,分区为2,处理线程为:67

16:17:58.256 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800059,状态为:2,分区为1,处理线程为:66

16:17:58.361 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800024,状态为:3,分区为2,处理线程为:67

16:17:58.457 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800011,状态为:3,分区为1,处理线程为:66

16:17:58.566 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800048,状态为:3,分区为2,处理线程为:67

16:17:58.662 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800017,状态为:3,分区为1,处理线程为:66

16:17:58.771 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800042,状态为:3,分区为2,处理线程为:67

16:17:58.868 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800023,状态为:3,分区为1,处理线程为:66

16:17:58.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800030,状态为:3,分区为2,处理线程为:67

16:17:59.073 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800029,状态为:3,分区为1,处理线程为:66

16:17:59.177 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800036,状态为:3,分区为2,处理线程为:67

16:17:59.279 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800041,状态为:3,分区为1,处理线程为:66

16:17:59.383 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800054,状态为:3,分区为2,处理线程为:67

16:17:59.481 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800035,状态为:3,分区为1,处理线程为:66

16:17:59.685 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800053,状态为:3,分区为1,处理线程为:66

16:17:59.891 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800047,状态为:3,分区为1,处理线程为:66

16:18:00.092 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800059,状态为:3,分区为1,处理线程为:66

完整代码


网页标题:spring-kafka多线程顺序消费
网站地址:http://cdxtjz.cn/article/ggdpsg.html

其他资讯