kafka 学习 windows 上的 kafka 学习和安装相关的问题
一. 下载 kafka kafka 使用 java 实现并且官方提供了 windows 的支持,所以直接下载 就完事了,将其解压到一个文件夹下,如我在 D:\kafka,这里版本是 3.20,其他版本可能会有点不同.整体路径如下,分别是
bin: 提供的一些已经写好了的 shell 命令文件和 windows 下面的 bat 文件
config: 一些已经配置好的文件,如 kafka server 的配置,zookeeper 的配置,consumer 和 producer 的配置
libs: jar 包和一些依赖
licenses: 开源协议证书
二. 启动 kafka 单实例 激动人心的时刻来了,我们下载了文件,安装了 jdk 环境(一般都会有环境吧),然后设置 properties 文件,在这里我随便贴一下要注意的 properties 文件,定义了后面需要用的端口:
1 2 3 4 listeners =PLAINTEXT://127.0.0.1:9092 # 指定端口 log.dirs =E:\\kafka-logs-1 # 我觉得指定个文件夹比较好 zookeeper.connect =localhost:2181 # 指定zookeeper服务器
1 2 3 4 dataDir =E:\\zookeeper clientPort =2181
1. 启动 zookeeper 1 2 3 .\bin\windows\zookeeper-server-start .bat .\config\zookeeper.properties # 如果使用wsl或者bash下面 .\bin\zookeeper-server-start .sh .\config\zookeeper.properties
2. 启动 kafka 1 2 3 .\bin\windows\kafka-server-start .bat .\config\server.properties # 如果使用wsl或者bash下面 .\bin\kafka-server-start .sh .\config\zookeeper.properties
这样子就算是启动成功了,并且可以看到启动的实例连接的 zookeeper 和 broker 提供的 ip.
3. 创建 topic 1 2 # 老版本使用zookeeper-server 确定对应的kafka集群,但是新版本使用bootstrap-server确定连接的集群 .\bin\windows\kafka-topics --create --bootstrap-server 127 .0 .0 .1 :9092 --topic test
我们查看现在的集群里的 topic 情况可以使用下面的命令:
1 .\bin\windows\kafka-topics --describe --bootstrap-server 127 .0 .0 .1 :9092
可以看到本身实际上存在一个 top 叫做__consumer_offsets 去保存对应的 consumer 的 offset 数据.
4. 向 topic 写入数据和读取数据 1 .\bin\windows\kafka-console-producer --bootstrap-server 127 .0 .0 .1 :9092 --topic test1
输入日志数据,之后再读出来
1 .\bin\windows\kafka-console-consumer.bat --topic test1 --bootstrap-server 127 .0 .0 .1 :9092 # --from-beginning 可以看到现在还保存的消息
那么至此我们就完成了最基本的 kafka 的操作,创建主题\写入数据\读出数据
5. 多 broker 和多主题 对于 kafka 而言可以修改上面的server.properties
可以实现一次启动多个 broker,也可以多次使用kafka-server-start.sh
实现顺序启动多个 broker,我们可以看一下效果,同样的方式但是将listeners
改成PLAINTEXT://127.0.0.1:9093
我们看看效果:
log 里可以看到新的 broker 将原来的 9092 的 broker 作为 controller,进而我们简单的实现了一个两个 broker 的 kafka 集群.
并且在新的 broker 上新建 test1 topic
1 .\bin\windows\kafka-topics --create --bootstrap-server 127 .0 .0 .1 :9093 --topic test1
可以看到两个 bootstrap-server 上都有了对应的 topic 但是 leader 不一样,这是因为创建 topic 会以当时的 broker 为 leader,那么我们在不同的 broker 上消费和生产有什么区别呢?没有区别,因为实际上你访问任何一个都会把 leader 的信息发给你,你会获得对应的 broker 信息然后向对应的 broker 传输数据.
三. 编写自己的代码 1. 编写自己的 producer 根据 kafka 本身的教程,kafka-clients 本身提供了三个 send 模式,分别是阻塞和非阻塞以及实现好了的 future 回调.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package com.lixiande.kafkaLearn;import java.util.Date;import java.util.Properties;import java.util.concurrent.Future;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Producer { Logger logger = LoggerFactory.getLogger(Producer.class); private Properties kafkaProducerProps; private KafkaProducer kafkaProducer; public static void main (String[] args) { Producer producer = new Producer (); try { while (true ) { producer.SendCallBack( "test" , "what fuck about key value" , "this is nothing about kafka and send with Callback" + new Date ().toString() ); producer.SendBlock( "test" , "what fuck about key value" , "this is nothing about kafka and send by blocking" + new Date ().toString() ); producer.SendAsync( "test" , "what fuck about key value" , "this is nothing about kafka and send by async" + new Date ().toString() ); } } finally { producer.kafkaProducer.close(); } } public void SendBlock (String topic, String key, String value) { try { System.out.println( "block send :" + kafkaProducer .send(new ProducerRecord <String, String>(topic, key, value)) .get() .toString() ); } catch (Exception e) { e.printStackTrace(); } } public Future SendAsync (String topic, String key, String value) { return kafkaProducer.send( new ProducerRecord <String, String>(topic, key, value) ); } public void SendCallBack (String topic, String key, String value) { kafkaProducer.send( new ProducerRecord <String, String>(topic, key, value), new Callback () { @Override public void onCompletion (RecordMetadata recordMetadata, Exception e) { System.out.println(recordMetadata.toString()); if (e != null ) System.out.println(e.toString()); } } ); } public Producer () { kafkaProducerProps = new Properties (); kafkaProducerProps.put( "key.serializer" , org.apache.kafka.common.serialization.StringSerializer.class.getName() ); kafkaProducerProps.put( "value.serializer" , org.apache.kafka.common.serialization.StringSerializer.class.getName() ); kafkaProducerProps.put("bootstrap.servers" , "127.0.0.1:9092" ); kafkaProducer = new KafkaProducer <String, String>(kafkaProducerProps); } }
2.编写自己的 consumer 同样的 consumer 也是可以有很多种方式,比如订阅 topic,订阅 topic 里面的某些 partition,以及订阅正则匹配的 topics
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package com.lixiande.kafkaLearn;import java.util.*;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.errors.WakeupException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); private KafkaConsumer kafkaConsumer; private Properties kafkaConsumerProps; private Map<String, Integer> consumeMap; public static void main (String[] args) { Consumer consumer = new Consumer (); Thread mainThread = Thread.currentThread(); Runtime .getRuntime() .addShutdownHook( new Thread ( () -> { System.out.println("consumer starting exiting" ); consumer.kafkaConsumer.wakeup(); try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } ) ); consumer.listen(); } public Consumer () { kafkaConsumerProps = new Properties (); kafkaConsumerProps.put("bootstrap.servers" , "127.0.0.1:9092" ); kafkaConsumerProps.put( "key.deserializer" , org.apache.kafka.common.serialization.StringDeserializer.class.getName() ); kafkaConsumerProps.put( "value.deserializer" , org.apache.kafka.common.serialization.StringDeserializer.class.getName() ); kafkaConsumerProps.put("group.id" , "loopConsumer" ); kafkaConsumer = new KafkaConsumer <String, String>(kafkaConsumerProps); kafkaConsumer.subscribe(Collections.singletonList("test" )); List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor("test" ); consumeMap = new HashMap <>(); } public void listen () { try { while (true ) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100 ); for (ConsumerRecord<String, String> record : records) { logger.warn(record.toString()); int updatedCount = 1 ; if (consumeMap.containsValue(record.value())) { updatedCount = consumeMap.get(record.value()) + 1 ; } consumeMap.put(record.value(), updatedCount); } System.out.println( "\n-------------------------------------------------\n" ); System.out.println(consumeMap); System.out.println( "\n-------------------------------------------------\n" ); consumeMap.clear(); kafkaConsumer.commitAsync(); } } catch (WakeupException e) {} finally { kafkaConsumer.close(); System.out.println("Closed Consumer and we are done" ); } } }