消息中间件kafka之消费者相关操作

一、Consumer初始化配置

/** * producer 配置 * @return */
@Bean
public KafkaProducer<String, String> kafkaProducer() {
   
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.26:9092");
  properties.put(ProducerConfig.ACKS_CONFIG, "all");
  properties.put(ProducerConfig.RETRIES_CONFIG, "0");
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
  properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  return new KafkaProducer<>(properties);
}

二、测试简单的消费消息

这里为了不需要等待,我们异步的开启一个线程去消费消息。

/** * @author long */
@Slf4j
@RestController
@RequestMapping("/consumer")
public class ConsumerController {
   

    @Autowired
    private KafkaConsumer kafkaConsumer;


    /** * 简单消费者 */
    @GetMapping("/simple")
    public String simple(@RequestParam("topic_name") String topicName) {
   
        kafkaConsumer.subscribe(Arrays.asList(topicName));
        new Thread(() -> {
   
            while (true) {
   
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                for (ConsumerRecord<String, String> record : records) {
   
                    log.info("消费信息:patition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
                }
            }
        }, "获取消息线程 => ").start();
        return "success";

    }

}

启动项目之后,先访问localhost:8080/consumer/simple?topic_name=new_long_topic_34这个接口,让这个异步的线程启动起来。

这时运行发送消息的接口:localhost:8080/producer/send2?topicName=new_long_topic_34&num=100,发送一百条消息。

我们上面的这种消费方法,使用的是自动提交的方法。实际中使用不推荐使用。

三、手动提交

/** * 手动提交 * @param topicName * @return */
    @GetMapping("/commit")
    public String commit(@RequestParam("topic_name") String topicName) {
   
        kafkaConsumer.subscribe(Arrays.asList(topicName));
        new Thread(() -> {
   
            while (true) {
   
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                for (ConsumerRecord<String, String> record : records) {
   
                    // TODO 处理数据保存到数据库
                    log.info("消费信息入库:patition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
                    // TODO 处理失败的话,则回滚,不要提交offset
                }
                // TODO 成功的话,手动提交offset
                kafkaConsumer.commitAsync();
            }
        }, "获取消息线程 => ").start();
        return "success";
    }

四、Consumer Group

这里面我们需要注意:

  1. 单个分区的消息只能由ConsumerGroup中某个Consumer消费

  2. ConsumerGroup中的一个Consumer可以对应多个分片

  3. 一个分片不可以被ConsumerGroup中的多个Consumer消费,多出的只能闲置

  4. ConsumerPartition中消费消息是顺序,默认从开头开始消费

  5. 单个ConsumerGroup会消费所有的Partition中的消息。

六、单个Partition提交

可以控制单个partition手动提交,方便使用多线程进行消息消费;并且可以对多个partition提交进行控制。

@GetMapping("/single")
public String singlePartition(@RequestParam("topic_name") String topicName) {
   
  kafkaConsumer.subscribe(Arrays.asList(topicName));
  new Thread(() -> {
   
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
    for (TopicPartition partition : records.partitions()) {
   
      List<ConsumerRecord<String, String>> recordList = records.records(partition);
      for (ConsumerRecord<String, String> record : recordList) {
   
        log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
      }
      long offset = recordList.get(recordList.size() - 1).offset();
      Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
      offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
      kafkaConsumer.commitSync(offsetMap);
    }
  }, "获取消息线程<single> => ").start();
  return "success";
}

针对上面的代码,可以发现对于只包含一个partition的时候,上面的写法是有点啰嗦的,我们还有另一种写法:

@GetMapping("/single_2")
public String singlePartition2(@RequestParam("topic_name") String topicName) {
   
  TopicPartition partition = new TopicPartition(topicName, 0);
  // 订阅topic中的某一个partition
  kafkaConsumer.assign(Arrays.asList(partition));
  new Thread(() -> {
   
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
    List<ConsumerRecord<String, String>> recordList = records.records(partition);
    for (ConsumerRecord<String, String> record : recordList) {
   
      log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
    }
    long offset = recordList.get(recordList.size() - 1).offset();
    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
    offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
    kafkaConsumer.commitSync(offsetMap);
  }, "获取消息线程<single> => ").start();
  return "success";
}

七、Consumer多线程并发处理

关于多线程并发处理,常用的是这样两种线程模型:

第一种是:对于数据进行异步处理,适用于对于数据一致性要求不高,不是用于处理业务,这种情况就是为了快速消费,不管是不是成功,偏向于日志这种的;

第二种是:一个partition对应一个consumer,多用户处理业务使用。

在上面使用异步线程也是一种多线程使用方式,另一种大家也能猜到肯定是使用线程池(推荐)。

@Bean("longThreadPool")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
   
  ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
  // 核心线程数
  poolTaskExecutor.setCorePoolSize(10);
  // 最大线程数
  poolTaskExecutor.setMaxPoolSize(15);
  // 缓冲对列
  poolTaskExecutor.setQueueCapacity(100);
  // 允许线程空闲时间60s
  poolTaskExecutor.setKeepAliveSeconds(60);
  // 线程池前缀
  poolTaskExecutor.setThreadNamePrefix("consumer-pool");
  // 线程池对拒绝任务的处理策略
  poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  // 关闭线程池的时候,是否等待当前任务执行完成
  poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  // 等待当前任务完成的超时时间60秒,否则会造成阻塞
  poolTaskExecutor.setAwaitTerminationSeconds(60);
  poolTaskExecutor.initialize();
  return poolTaskExecutor;
}

使用直接注入就可以:

@Autowired
private ThreadPoolTaskExecutor longThreadPool;

可以使用@Async的进行处理,不做演示了,下面演示下对第一种模型的处理:

处理consumer的消费消息

@GetMapping("/simple")
public String simple(@RequestParam("topic_name") String topicName) {
   
  kafkaConsumer.subscribe(Arrays.asList(topicName));
  new Thread(() -> {
   
    while (true) {
   
      ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
      for (ConsumerRecord<String, String> record : records) {
   
        // 对提交的数据进行处理
        longThreadPool.execute(() -> {
   
          log.info("消费信息:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
        });
      }
    }
  }, "获取消息线程 => ").start();
  return "success";
}

八、Consumer设置offset

consumer提供了一个seek的函数,可以设置我们开始的offset位置开始消费。设置offset:多用于回滚和重复消费。

我们本地的消费的最后offset399,测试的我们从350开始。

@GetMapping("/single_2")
public String singlePartition2(@RequestParam("topic_name") String topicName) {
   
  TopicPartition partition = new TopicPartition(topicName, 0);
  // 订阅topic中的某一个partition
  kafkaConsumer.assign(Arrays.asList(partition));
  new Thread(() -> {
   
    kafkaConsumer.seek(partition, 350);
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
    List<ConsumerRecord<String, String>> recordList = records.records(partition);
    for (ConsumerRecord<String, String> record : recordList) {
   
      log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
    }
    long offset = recordList.get(recordList.size() - 1).offset();
    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
    offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
    kafkaConsumer.commitSync(offsetMap);
  }, "获取消息线程<single> => ").start();
  return "success";
}

从这里我们也可以知道,kafka是不会丢弃消息的。

九、Consumer限流操作

一般情况,系统不会给kafka客户端,提供太多的资源,有时候会出现数据峰值,把kafka打死,所以这个时候限流就很重要了。

一般来说:当处理的数据量达到某个阈值时暂停消费,低于阈值时则恢复消费,这就可以让Consumer保持一定的速率去消费数据,从而避免流量剧增时将Consumer给压垮。

这里我们利用Guava的限流器对Consumer进行限流。

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>29.0-jre</version>
</dependency>

代码实现:

@GetMapping("/rate_limiter")
public String rateLimiter(@RequestParam("topic_name") String topicName) {
   
  TopicPartition p0 = new TopicPartition(topicName, 0);
  TopicPartition p1 = new TopicPartition(topicName, 1);
  kafkaConsumer.assign(Arrays.asList(p0, p1));
  new Thread(() -> {
   
    while (true) {
   
      ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10));
      if (records.isEmpty()) {
   
        continue;
      }
      if (!rateLimiter.tryAcquire()) {
   
        log.warn("无法拿到令牌,开始限流...");
        kafkaConsumer.pause(Arrays.asList(p0, p1));
      } else {
   
        log.info("拿到令牌,开始消费...");
        kafkaConsumer.resume(Arrays.asList(p0, p1));
      }

      for (TopicPartition partition : records.partitions()) {
   
        List<ConsumerRecord<String, String>> recordList = records.records(partition);
        for (ConsumerRecord<String, String> record : recordList) {
   
          log.info("消费信息入库:partition:[{}] offset:[{}] key:[{}] value:[{}]", record.partition(), record.offset(), record.key(), record.value());
        }
        long offset = recordList.get(recordList.size() - 1).offset();
        Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.partitions().size());
        offsetMap.put(partition, new OffsetAndMetadata(offset + 1));
        kafkaConsumer.commitSync(offsetMap);
      }

    }

  }, "限流器线程 => ").start();
  return "success";
}
张贴在2