kafka 学习

windows 上的 kafka 学习和安装相关的问题

一. 下载 kafka

kafka 使用 java 实现并且官方提供了 windows 的支持,所以直接下载就完事了,将其解压到一个文件夹下,如我在 D:\kafka,这里版本是 3.20,其他版本可能会有点不同.整体路径如下,分别是

  • bin: 提供的一些已经写好了的 shell 命令文件和 windows 下面的 bat 文件
  • config: 一些已经配置好的文件,如 kafka server 的配置,zookeeper 的配置,consumer 和 producer 的配置
  • libs: jar 包和一些依赖
  • licenses: 开源协议证书

image-20220611234942477

二. 启动 kafka 单实例

激动人心的时刻来了,我们下载了文件,安装了 jdk 环境(一般都会有环境吧),然后设置 properties 文件,在这里我随便贴一下要注意的 properties 文件,定义了后面需要用的端口:

1
2
3
4
# file:config/server.properties
listeners=PLAINTEXT://127.0.0.1:9092 # 指定端口
log.dirs=E:\\kafka-logs-1 # 我觉得指定个文件夹比较好
zookeeper.connect=localhost:2181 # 指定zookeeper服务器
1
2
3
4
# file:config/zookeeper.properties
dataDir=E:\\zookeeper
# the port at which the clients will connect
clientPort=2181

1. 启动 zookeeper

1
2
3
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
# 如果使用wsl或者bash下面
.\bin\zookeeper-server-start.sh .\config\zookeeper.properties

image-20220612001053363

2. 启动 kafka

1
2
3
.\bin\windows\kafka-server-start.bat .\config\server.properties
# 如果使用wsl或者bash下面
.\bin\kafka-server-start.sh .\config\zookeeper.properties

image-20220612001424728

image-20220612001428305

这样子就算是启动成功了,并且可以看到启动的实例连接的 zookeeper 和 broker 提供的 ip.

3. 创建 topic

1
2
# 老版本使用zookeeper-server 确定对应的kafka集群,但是新版本使用bootstrap-server确定连接的集群
.\bin\windows\kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test

我们查看现在的集群里的 topic 情况可以使用下面的命令:

1
.\bin\windows\kafka-topics --describe --bootstrap-server 127.0.0.1:9092

image-20220612001957165

可以看到本身实际上存在一个 top 叫做__consumer_offsets 去保存对应的 consumer 的 offset 数据.

4. 向 topic 写入数据和读取数据

1
.\bin\windows\kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic test1

输入日志数据,之后再读出来

1
.\bin\windows\kafka-console-consumer.bat --topic test1 --bootstrap-server 127.0.0.1:9092 # --from-beginning 可以看到现在还保存的消息

那么至此我们就完成了最基本的 kafka 的操作,创建主题\写入数据\读出数据

5. 多 broker 和多主题

对于 kafka 而言可以修改上面的server.properties可以实现一次启动多个 broker,也可以多次使用kafka-server-start.sh实现顺序启动多个 broker,我们可以看一下效果,同样的方式但是将listeners改成PLAINTEXT://127.0.0.1:9093我们看看效果:

image-20220612013910567

log 里可以看到新的 broker 将原来的 9092 的 broker 作为 controller,进而我们简单的实现了一个两个 broker 的 kafka 集群.

并且在新的 broker 上新建 test1 topic

1
.\bin\windows\kafka-topics --create --bootstrap-server 127.0.0.1:9093 --topic test1

image-20220612014647010

可以看到两个 bootstrap-server 上都有了对应的 topic 但是 leader 不一样,这是因为创建 topic 会以当时的 broker 为 leader,那么我们在不同的 broker 上消费和生产有什么区别呢?没有区别,因为实际上你访问任何一个都会把 leader 的信息发给你,你会获得对应的 broker 信息然后向对应的 broker 传输数据.

三. 编写自己的代码

1. 编写自己的 producer

根据 kafka 本身的教程,kafka-clients 本身提供了三个 send 模式,分别是阻塞和非阻塞以及实现好了的 future 回调.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.lixiande.kafkaLearn;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer {
Logger logger = LoggerFactory.getLogger(Producer.class);
private Properties kafkaProducerProps;
private KafkaProducer kafkaProducer;

public static void main(String[] args) {
Producer producer = new Producer();
try {
while (true) {
producer.SendCallBack(
"test",
"what fuck about key value",
"this is nothing about kafka and send with Callback" +
new Date().toString()
);
producer.SendBlock(
"test",
"what fuck about key value",
"this is nothing about kafka and send by blocking" +
new Date().toString()
);
producer.SendAsync(
"test",
"what fuck about key value",
"this is nothing about kafka and send by async" +
new Date().toString()
);
}
} finally {
producer.kafkaProducer.close();
}
}

public void SendBlock(String topic, String key, String value) {
try {
System.out.println(
"block send :" +
kafkaProducer
.send(new ProducerRecord<String, String>(topic, key, value))
.get()
.toString()
);
} catch (Exception e) {
e.printStackTrace();
}
}

public Future SendAsync(String topic, String key, String value) {
return kafkaProducer.send(
new ProducerRecord<String, String>(topic, key, value)
);
}

public void SendCallBack(String topic, String key, String value) {
kafkaProducer.send(
new ProducerRecord<String, String>(topic, key, value),
new Callback() {

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(recordMetadata.toString());
if (e != null) System.out.println(e.toString());
}
}
);
}

public Producer() {
kafkaProducerProps = new Properties();
kafkaProducerProps.put(
"key.serializer",
org.apache.kafka.common.serialization.StringSerializer.class.getName()
);
kafkaProducerProps.put(
"value.serializer",
org.apache.kafka.common.serialization.StringSerializer.class.getName()
);
kafkaProducerProps.put("bootstrap.servers", "127.0.0.1:9092");
kafkaProducer = new KafkaProducer<String, String>(kafkaProducerProps);
}
}

2.编写自己的 consumer

同样的 consumer 也是可以有很多种方式,比如订阅 topic,订阅 topic 里面的某些 partition,以及订阅正则匹配的 topics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.lixiande.kafkaLearn;

import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
private static Logger logger = LoggerFactory.getLogger(Consumer.class);
private KafkaConsumer kafkaConsumer;
private Properties kafkaConsumerProps;

private Map<String, Integer> consumeMap;

public static void main(String[] args) {
Consumer consumer = new Consumer();
Thread mainThread = Thread.currentThread();
Runtime
.getRuntime()
.addShutdownHook(
new Thread(
() -> {
System.out.println("consumer starting exiting");
consumer.kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
)
);
consumer.listen();
}

public Consumer() {
kafkaConsumerProps = new Properties();
kafkaConsumerProps.put("bootstrap.servers", "127.0.0.1:9092");
kafkaConsumerProps.put(
"key.deserializer",
org.apache.kafka.common.serialization.StringDeserializer.class.getName()
);
kafkaConsumerProps.put(
"value.deserializer",
org.apache.kafka.common.serialization.StringDeserializer.class.getName()
);
kafkaConsumerProps.put("group.id", "loopConsumer");
kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProps);
kafkaConsumer.subscribe(Collections.singletonList("test"));
// kafkaConsumer.subscribe(Pattern.compile("test*")); // 也可以订阅所有的test*的主题
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor("test");
// XXX:这里可以用于获取特定的topic的分区,从而实现不同的消费者手动分配,而不会走均衡
/*
// if (partitionInfoList != null){
// for (PartitionInfo info : partitionInfoList){
// partitions.add(new TopicPartition(info.topic(), info.partition()));
// }
// kafkaConsumer.assign(partitions);
// }
*/
consumeMap = new HashMap<>();
}

public void listen() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
logger.warn(record.toString());
int updatedCount = 1;
if (consumeMap.containsValue(record.value())) {
updatedCount = consumeMap.get(record.value()) + 1;
}
consumeMap.put(record.value(), updatedCount);
}
System.out.println(
"\n-------------------------------------------------\n"
);
System.out.println(consumeMap);
System.out.println(
"\n-------------------------------------------------\n"
);
consumeMap.clear();
kafkaConsumer.commitAsync();
}
} catch (WakeupException e) {} finally {
kafkaConsumer.close();
System.out.println("Closed Consumer and we are done");
}
}
}