最近在做Flink Streaming读取Kafka的数据,包括使用Flink Jar和Flink SQL去读取Kafka中的数据,熟悉碰到的第一个较为迷惑的地方就是offset的问题。

因为需要演示,确保业务的流程完整,所以在这样的背景下要求Flink程序每次启动的时候都是从Kafka最开始进行读取,在Kafka中可以通过auto.offset.reset进行offset的rese。

一共有三种模式earliestlatestnone,我设置的是earliest,按照我的理解,既然是earliest,那么每次启动的时候都应该是从头开始读取,但是实际却是只有第一次运行jar的时候会消费到数据。

后续再启动如果kafka没有新数据流入,那么便不会再有消费行为,这个跟我对earliest的理解不一致,于是我又去看了下官方的描述:

earliest: automatically reset the offset to the earliest offset

这个描述就比较宽泛了,不过也算能表达出意思,那就是earliest是把offset给重置到earliest offset而并不是头,所以这三种模式实际的意思是:

  • earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none :topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

这里所有的offset都是基于某个group id的,kafka是基于了某个group id,如果每次创建全新的group id,那肯定没问题。

不过我又看到另外一个参数scan.startup.mode,我发现在使用flink sql的时候可以通过scan.startup.mode=earliest-offset来设置offset,可以有如下几种设置方式:

  • group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
  • earliest-offset: start from the earliest offset possible.
  • latest-offset: start from the latest offset.
  • timestamp: start from user-supplied timestamp for each partition.
  • specific-offsets: start from user-supplied specific offsets for each partition.

默认是group-offsets,这是在Flink SQL模式下的用法,效果与代码类似。

于是要达到我的目的,只有2种方式:每次切换group id,确保每次运行的group id是全新的,这样自然每次提交程序都是从头开始,这种方式简单粗暴,还有一种方式我想的就是既然某个group id已经有了offset,那么我能不能把这个offset删掉。

这里引申出的问题就是kafka对于某个group id的offset是怎么存储和处理的,在kafka0.8.1.1以前,offset保存在zk中,存放在/consumers节点下,由于频繁访问zk,zk需要一个一个节点更新offset,不能批量或分组更新,导致offset更新成了瓶颈。后续两个过渡版本增加了参数“offsets.storage”,该参数可配置为“zookeeper”或“kafka”分别表示offset的保持位置在zk或是broker,默认保留在zk。

在ZK中保存的结构为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value

从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上。

Kafka版本0.10.1.1,已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。

其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。

broker消息保存目录在配置文件server.properties中:

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs

该目录下默认包含50个以__consumer_offsets开头的目录,用于存放offset:

__consumer_offsets-0            __consumer_offsets-22           __consumer_offsets-36           __consumer_offsets-5
__consumer_offsets-1 __consumer_offsets-23 __consumer_offsets-37 __consumer_offsets-6

offset的存放位置决定于groupid的hash值,其获取方式:

Utils.abs(groupId.hashCode) % numPartitions

其中numPartitions由offsets.topic.num.partitions参数决定,默认值即50,以groupid “test-group”为例,计数其存储位置为:__consumer_offsets-12,当其消费全部10条数据后,使用命令查看该目录下消息记录:

kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --partition 12 --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

输出:

[test-group,test1,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[0], metadata=, commitTimestamp=1605601180391, expireTimestamp=None)
[test-group,test1,2]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1605601180391, expireTimestamp=None)
[test-group,test1,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1605601180391, expireTimestamp=None)

该数据结构为以groupid-topic-partition作为key,value为OffsetAndMetadata,其中包含了offset信息。可以看到group“test-group”在topic“test1”的三个partition下offset值分别为6,2,2。同保存在zk数据一样,offset只记录groupid的消费情况,对于具体consumer是透明的。

那么offset具体被发送给哪个broker保存呢?offset的存储分区是通过groupid的hash值取得的,那么offset发送的broker就是该分区的leader broker,这也符合kafka普通消息的发生逻辑。
所以,每个group的offset都将发生到一个broker,broker中存在一个offset manager 实例负责接收处理offset提交请求,并返回提交操作结果。

自定义Kafka的Offset

在Kafka中,offset默认存储在broker的Topic,我们也可以自定义存储位置,为了保证消费和提交偏移量同时成功或失败,我们可以利用数据库事务来实现,下面是把offset存储在Mysql里的一个例子。

在重新均衡分组之前保存数,在重新均衡后读取数据。

在提交偏移量时保存数据。

注意 在重置偏移量时候,要比提交的大1,因此读取时候对偏移量值加1。

读取条件 分组+Topic+Partition。

建表语句:

CREATE TABLE `consumer_offset` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`consumer_group` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`consumer_topic` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`consumer_partition` int(255) DEFAULT NULL,
`consumer_offset` bigint(255) DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

一个Spring集成Kafka使用定义的存储的例子:

import cn.spring.tech.dao.ConsumerOffsetDao;
import cn.spring.tech.model.ConsumerOffsetEntity;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.*;

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {

@Autowired
private ConsumerOffsetDao consumerOffsetDao;

@PostMapping("/s/{msg}")
public String send(@PathVariable("msg") String msg){
String topic="t3";

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.211.134:9091");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,1);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Object, Object> producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord(topic,"key-"+msg.hashCode(),"val-"+msg));
producer.close();
log.info("oks");
return "oks";
}

@PostMapping("/c")
public String consumer(){
String group="f6";
String topic="t3";

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.134:9091");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
for(TopicPartition topicPartition:collection){
int partition = topicPartition.partition();
long offset = consumer.position(topicPartition);
commitOffset(group,topic,partition,offset);
}
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
for(TopicPartition topicPartition:collection){
int partition = topicPartition.partition();
long offset = getOffset(group, topic, partition);
consumer.seek(topicPartition,offset);
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
List<ConsumerOffsetEntity> list=new ArrayList<>();
for(ConsumerRecord<String,String> consumerRecord:records){
String topics=consumerRecord.topic();
int p=consumerRecord.partition();
log.info("{}消费到消息:partition="+topics+p+",key="+consumerRecord.key()+",val="+consumerRecord.value()+",offset="+consumerRecord.offset(),getThread());
ConsumerOffsetEntity offsetEntity = new ConsumerOffsetEntity();
offsetEntity.setConsumerGroup(group);
offsetEntity.setConsumerTopic(consumerRecord.topic());
offsetEntity.setConsumerPartition(consumerRecord.partition()+"");
offsetEntity.setConsumerOffset(consumerRecord.offset()+"");
offsetEntity.setCreateTime(new Date());
list.add(offsetEntity);
}
if(!CollectionUtils.isEmpty(list)){
ConsumerOffsetEntity consumerOffsetEntity = list.get(list.size() - 1);
consumerOffsetDao.insert(consumerOffsetEntity);
log.info("{}===偏移量提交成功==="+consumerOffsetEntity,getThread());
}

}
}

private long getOffset(String group, String topic, int partition) {
QueryWrapper<ConsumerOffsetEntity> wrapper = new QueryWrapper<>();
wrapper.eq("consumer_group",group);
wrapper.eq("consumer_topic",topic);
wrapper.eq("consumer_partition",partition);
wrapper.orderByDesc("create_time");
List<ConsumerOffsetEntity> list = consumerOffsetDao.selectList(wrapper);
if(CollectionUtils.isEmpty(list)){
log.info("{}>>>>>>>>>重新均衡分组<<<<<<<<<读取偏移量"+0,getThread());
return 0;
}
ConsumerOffsetEntity consumerOffsetEntity = list.stream().max((p1, p2) -> Integer.valueOf(p1.getConsumerOffset()) - Integer.valueOf(p2.getConsumerOffset())).get();
String offset = consumerOffsetEntity.getConsumerOffset();
log.info("{}>>>>>>>>>重新均衡分组<<<<<<<<<读取偏移量"+consumerOffsetEntity,getThread());
return Long.valueOf(offset)+1;
}

private void commitOffset(String group, String topic, int partition, long offset) {
ConsumerOffsetEntity consumerOffsetEntity = new ConsumerOffsetEntity();
consumerOffsetEntity.setConsumerGroup(group);
consumerOffsetEntity.setConsumerTopic(topic);
consumerOffsetEntity.setConsumerPartition(String.valueOf(partition));
consumerOffsetEntity.setConsumerOffset(String.valueOf(offset));
consumerOffsetEntity.setCreateTime(new Date());
consumerOffsetDao.insert(consumerOffsetEntity);
log.info("{}>>>>>>>>>重新均衡分组>>>>>>>>>提交偏移量",getThread());

}

private String getThread(){
return Thread.currentThread().getName();
}
}

扫码手机观看或分享: