Kafka consumer的offset提交机制有以下两种
手动提交
同步提交
consumer.commitSync()方式提交
异步提交
consumer.commitAsync(callback)方式提交
自动提交
props.put(“enable.auto.commit”, “true”);
props.put(“auto.commit.interval.ms”, “1000”);
通过上面启动自动提交以及设置自动提交间隔时间(默认为5s)
源码分析
同步提交源码分析
同步提交的核心代码在ConsumerCoordinator.commitOffsetSync中,核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
invokeCompletedOffsetCommitCallbacks(); do { RequestFuture<Void> future = sendOffsetCommitRequest(offsets); client.poll(future, remainingMs); if (future.succeeded()) { if (interceptors != null) interceptors.onCommit(offsets); return true; } if (future.failed() && !future.isRetriable()) throw future.exception(); time.sleep(retryBackoffMs);
now = time.milliseconds(); remainingMs = timeoutMs - (now - startMs); } while (remainingMs > 0);
return false;
}
|
通过常见的几个步骤就向GroupCoordinator发送了offset的同步信息,同时也完成了offset的confirm
- 组装OffsetCommitRequest,准备向GroupCoordinator发送请求
- 发送请求,底层仍然是NIO
- future返回后调用拦截器
- 如果响应还未返回则继续等待一段时间,默认是300ms
异步提交源码分析
异步提交的核心源码在ConsumerCoordinator.commitOffsetsAsync中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { if (interceptors != null) interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); }
@Override public void onFailure(RuntimeException e) { Exception commitException = e;
if (e instanceof RetriableException) commitException = new RetriableCommitFailedException(e);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); } });
|
看到上面的三个步骤你是否会疑惑,怎么没有poll发送数据呢?其实异步提交offset时发送commitOffsetRequest是在consumer.poll()中完成的,这个时候会把Channel中堆积的数据发送出去。
同时同步发送和异步发送还有一个阻塞的区别,也就是上面同步发送未收到响应的时候会休眠一段时间直到收到响应为止。
Kafka 网络层源码分析
自动发送源码分析
那么自动发送offset又是在哪里完成的呢?同样的仍然是在consumer.poll()中完成的,之前分析的在poll之前需要加入Group,也就是和GroupCoordinator通信,在这个过程中就会处理offset的自动提交。
Kafka consumer加入group的分析可以查看这篇文章
源码在ConsumerCoordinator.poll()方法中,核心源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public boolean poll(final long timeoutMs) { final long startTime = time.milliseconds(); long currentTime = startTime; long elapsed = 0L;
invokeCompletedOffsetCommitCallbacks();
maybeAutoCommitOffsetsAsync(currentTime); return true; }
public void maybeAutoCommitOffsetsAsync(long now) { if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs; Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {}); } }
|
也就是经过下面几个步骤就实现了自动提交了
开启自动提交并且当前时间大于等于下一次自动提交的截止时间
this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
nextAutoCommitDeadline截止时间等于new KafkaConsumer的系统时间加上自动提交间隔时间
更新截止时间为现在的时间加上自动提交间隔时间
获取所有已被消费的TopicPartiton的offset信息
异步提交offset
第三步走获取所有已被消费的partition的offset信息,这里不得不说到SubscriptionState这个类,我们来关注下它重要的成员变量
1 2 3 4 5 6 7 8 9 10
| public class SubscriptionState {
private final Set<String> subscription;
private final Set<String> groupSubscription;
private final Map<TopicPartition, TopicPartitionState> assignment;
|
所以通过SubscriptionState我们就可以获取已被消费的TopicPartition的offset信息,根据这个可以获取所有已消费的offset,然后发送请求commit offset.