Zhu.Yang

朱阳的个人博客(公众号:think123)

0%

consumer提供三种不同的分区策略,可以通过partition.assignment.strategy参数进行配置,默认使用的策略是org.apache.kafka.clients.consumer.RangeAssignor,还存在org.apache.kafka.clients.consumer.RoundRobinAssignororg.apache.kafka.clients.consumer.StickyAssignor这两种,它们的关系图如下所示。

partition分配机制

阅读全文 »

Kafka consumer的offset提交机制有以下两种

手动提交

同步提交

consumer.commitSync()方式提交

异步提交

consumer.commitAsync(callback)方式提交

自动提交

props.put(“enable.auto.commit”, “true”);
props.put(“auto.commit.interval.ms”, “1000”);

通过上面启动自动提交以及设置自动提交间隔时间(默认为5s)

阅读全文 »

上一篇讲了consumer如何加入consumer group的,现在加入组成功之后,就要准备开始消费,但是我们需要知道consumer从offset为多少的位置开始消费。

consumer中关于如何消费有2种策略:

1. 手动指定
调用consumer.seek(TopicPartition, offset),然后开始poll

2. 自动指定
poll之前给集群发送请求,让集群告知客户端,当前该TopicPartition的offset是多少,这也是我们此次分析的重点.

阅读全文 »

consumer比producer要复杂许多,producer没有组的概念,也不需要关注offset,而consumer不一样,它有组织(consumer group),有纪律(offset)。这些对consumer的要求就会很高,这篇文章就先从consumer如何加入consumer group说起。

GroupCoordinator是运行在服务器上的一个服务,负责consumer以及offset的管理。消费者客户端的ConsumerCoordinator负责与GroupCoordinator进行通信。Broker在启动的时候,都会启动一个GroupCoordinator服务。

如何找到对应的GroupCoordinator节点?

对于 consumer group 而言,是根据其 group.id 进行 hash 并通过一定的计算得到其具对应的 partition 值(计算方式如下),该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。

__consumer_offsets 这个topic 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认三个副本。
partition计算方式:abs(GroupId.hashCode()) % NumPartitions(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)。
比如,现在通过计算abs(GroupId.hashCode()) % NumPartitions的值为35,那么就找第35个partition的leader在哪个broker(假设在192.168.1.12),那么GroupCoordinator节点就在这个broker。

同时这个消费者所提交的消费位移信息也会发送给这个partition leader所对应的broker节点,因此这个节点不仅是GroupCoordinator而且还保存分区分配方案和组内消费者位移。

阅读全文 »

如何消费数据

我们已经知道了如何发送数据到Kafka,既然有数据发送,那么肯定就有数据消费,消费者也是Kafka整个体系中不可缺少的一环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class KafkaConsumerDemo {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();

// 必须设置的属性
props.put("bootstrap.servers", "192.168.239.131:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group1");

// 可选设置属性
props.put("enable.auto.commit", "true");
// 自动提交offset,每1s提交一次
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset","earliest ");
props.put("client.id", "zy_client_id");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅test1 topic
consumer.subscribe(Collections.singletonList("test1"));

while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
}
}
}
阅读全文 »

前几篇文章分析了Kafka的发送流程以及NIO的使用方式,但是还是留下了不少坑,这里就对剩下的问题做一个总结。

收到的数据为什么要缓存起来?

Kafka中Selector读取从远端回来的数据的时候会先把收到的数据缓存起来

1
2
3
4
5
6
7
8
9
10
11
12
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
addToStagedReceives(channel, networkReceive);
}
}
}

在NetworkClient中,往下传的是一个完整的ClientRequest,进到Selector,暂存到channel中的,也是一个完整的Send对象(1个数据包)。但这个Send对象,交由底层的channel.write(Bytebuffer b)的时候,并不一定一次可以完全发送,可能要调用多次write,才能把一个Send对象完全发出去。这是因为write是非阻塞的,不是等到完全发出去,才会返回。

1
2
3
4
5
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}

这里如果返回send==null就表示没有发送完毕,需要等到下一次Selector.poll再次进行发送。所以当下次发送的时候如果Channel里面的Send只发送了部分,那么此次这个node就不会处于ready状态,就不会从RecordAccumulator取出要往这个node发的数据,等到Send对象发送完毕之后,这个node才会处于ready状态,就又可以取出数据进行处理了。

同样,在接收的时候,channel.read(Bytebuffer b),一个response也可能要read多次,才能完全接收。所以就有了上面的while循环代码。

如何确定消息接收完成?

从上面知道,底层数据的通信,是在每一个channel上面,2个源源不断的byte流,一个send流,一个receive流。
send的时候,还好说,发送之前知道一个完整的消息的大小。
但是当我们接收消息response的时候,这个信息可能是不完整的(剩余的数据要晚些才能获得),也可能包含不止一条消息。那么我们是怎么判断消息发送完毕的呢?
对于消息的读取我们必须考虑消息结尾是如何表示的,标识消息结尾通常有以下几种方式:

  1. 固定的消息大小。
  2. 将消息的长度作为消息的前缀。
  3. 用一个特殊的符号来标识消息的结束。

很明显第一种和第三种方式不是很合适,因此Kafka采用了第二种方式来确定要发送消息的大小。在消息头部放入了4个字节来确定消息的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//接收消息,前4个字节表示消息的大小
public class NetworkReceive implements Receive {
private final String source;

//确定消息size
private final ByteBuffer size;

private final int maxSize;

//整个消息response的buffer
private ByteBuffer buffer;

public NetworkReceive(String source) {
this.source = source;

//分配4字节的头部
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = UNLIMITED;
}
}

//消息发送,前4个字节表示消息大小
public class NetworkSend extends ByteBufferSend {

public NetworkSend(String destination, ByteBuffer buffer) {
super(destination, sizeDelimit(buffer));
}

private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
}

private static ByteBuffer sizeBuffer(int size) {
//4个字节表示消息大小
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(size);
sizeBuffer.rewind();
return sizeBuffer;
}

}

OP_WRITE何时就绪?

上一篇文章虽然讲了epoll的原理,但是我相信还是有人觉得很迷惘,这里换个简单的说法再说下OP_WRITE事件。
OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,也不是发生在调用channel.register(selector,SelectionKey.OP_WRITE)后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于写就绪,选择处理现场就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。

max.in.flight.requests.per.connection

这个参数指定了生产者在收到服务器响应之前可以发送多少个消息,找Kafka Producer中对应有一个类InFlightRequests,表示在天上飞的请求,也就是请求发出去了response还没有回来的请求数,这个参数也是判断节点是否ready的关键因素。只有ready的节点数据才能从Accumulator中取出来进行发送。

在谈NIO之前,简单回顾下内核态和用户态

内核空间是Linux内核运行的空间,而用户空间是用户程序的运行空间,为了保证内核安全,它们之间是隔离的,即使用户的程序崩溃了,内核也不受影响。
内核空间可以执行任意命令,调用系统的一切资源,用户空间只能执行简单运算,不能直接调用系统资源(I/O,进程资源,内存分配,外设,计时器,网络通信等),必须通过系统接口(又称 system call),才能向内核发出指令。

内核图

阅读全文 »

上一篇讲了Kafka Producer发送消息的主体流程,这一篇我们关注下Kafka的网络层是如何实现的。
对于发送消息而言,Producer是客户端,Broker是服务器端。
Kafka使用了JavaNIO向服务器发送消息,所以在这之前需要了解java nio的基本知识。这次网络层源码分析从metadata request切入。

开局一张图

阅读全文 »

故事还要从月月给她老公亮亮发了一条消息说起。

阅读全文 »

实参和形参

1
2
3
4
5
public int sum(int x,int y) {
return x+y;
}

sum(2,3);

上面的代码中sum()方法中的x,y就是形参,而调用方法sum(2,3)中的2与3就是实参。形参是在方法定义阶段,而实参实在方法调用阶段。

阅读全文 »