Просмотр исходного кода

feat: 搭建MQ消息组件Kafka服务环境

seamew 2 лет назад
Родитель
Сommit
a3df3c36d4

+ 5 - 0
lottery-application/pom.xml

@@ -18,6 +18,11 @@
             <version>1.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+        <!-- kafka依赖 -->
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 33 - 0
lottery-application/src/main/java/com/seamew/lottery/application/mq/KafkaConsumer.java

@@ -0,0 +1,33 @@
+package com.seamew.lottery.application.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+/**
+ * @Author: seamew
+ * @Title: KafkaConsumer
+ * @CreateTime: 2023年02月27日 11:12:00
+ * @Description: 消息消费者
+ * @Version: 1.0
+ */
+@Component
+@Slf4j
+public class KafkaConsumer {
+    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
+    public void topicTest(ConsumerRecord<?, ?> record, Acknowledgment ack) {
+        if (ObjectUtils.isNotEmpty(record.value())) {
+            log.info("topic_test 消费了:Topic:{},Message:{}", record.topic(), record.value());
+            ack.acknowledge();
+        } else {
+            log.error("message为空");
+        }
+    }
+}

+ 52 - 0
lottery-application/src/main/java/com/seamew/lottery/application/mq/KafkaProducer.java

@@ -0,0 +1,52 @@
+package com.seamew.lottery.application.mq;
+
+import com.alibaba.fastjson2.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.Resource;
+
+/**
+ * @Author: seamew
+ * @Title: KafkaProducer
+ * @CreateTime: 2023年02月27日 11:06:00
+ * @Description: 消息生产者
+ * @Version: 1.0
+ */
+@Component
+@Slf4j
+public class KafkaProducer {
+    @Resource
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    public static final String TOPIC_TEST = "Hello-Kafka";
+
+    public static final String TOPIC_GROUP = "test-consumer-group";
+
+    public void send(Object obj) {
+        log.info("准备发送消息为:{}", JSON.toJSONString(obj));
+
+        // 发送消息
+        kafkaTemplate
+                .send(TOPIC_TEST, obj)
+                .addCallback(success -> {
+                    if (success != null) {
+                        // 消息发送到的topic
+                        String topic = success.getRecordMetadata().topic();
+                        // 消息发送到的分区
+                        int partition = success.getRecordMetadata().partition();
+                        // 消息在分区内的offset
+                        long offset = success.getRecordMetadata().offset();
+                        // 获取message
+                        Object message = success.getProducerRecord().value();
+                        log.info("{}-{}-{} - 生产者 发送消息成功:{}", topic, partition, offset, message);
+                    }
+                }, error -> {
+                    log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + error.getMessage());
+                });
+    }
+}

+ 38 - 0
lottery-interfaces/src/main/resources/application.yaml

@@ -1,6 +1,44 @@
 server:
   port: 8080
 
+spring:
+  kafka:
+    bootstrap-servers: 180.76.231.231:9092
+    producer:
+      # 发生错误后,消息重发的次数。
+      retries: 1
+      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+      batch-size: 16384
+      # 设置生产者内存缓冲区的大小。
+      buffer-memory: 33554432
+      # 键的序列化方式
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # 值的序列化方式
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+      acks: 1
+    consumer:
+      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+      auto-commit-interval: 1S
+      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+      auto-offset-reset: earliest
+      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+      enable-auto-commit: false
+      # 键的反序列化方式
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      # 值的反序列化方式
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    listener:
+      # 在侦听器容器中运行的线程数。
+      concurrency: 5
+      #listner负责ack,每调用一次,就立即commit
+      ack-mode: manual_immediate
+      missing-topics-fatal: false
+
 mini-db-router:
   jdbc:
     datasource:

+ 78 - 0
lottery-interfaces/src/test/java/com/seamew/lottery/test/application/KafkaProducerTest.java

@@ -0,0 +1,78 @@
+package com.seamew.lottery.test.application;
+
+import com.seamew.lottery.application.mq.KafkaProducer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Author: seamew
+ * @Title: KafkaProducerTest
+ * @CreateTime: 2023年02月27日 11:14:00
+ * @Description:
+ * @Version: 1.0
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@Slf4j
+public class KafkaProducerTest {
+
+
+    @Resource
+    private KafkaProducer kafkaProducer;
+
+    @Test
+    public void test_send() throws InterruptedException {
+        // 循环发送消息
+        // for (int i = 0; i < 5; i++) {
+        //     kafkaProducer.send("你好,seamew 00" + i);
+        //     Thread.sleep(3500);
+        // }
+        kafkaProducer.send("你好,seamew 007");
+        Thread.sleep(4000);
+    }
+
+    @Test
+    public void context() {
+        Map<String, Object> configs = new HashMap<>();
+        // 设置连接Kafka的初始连接用到的服务器地址
+        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://180.76.231.231:9092");
+        //KEY反序列化类
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+        //value反序列化类
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
+        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        //设置手动提交
+        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
+        //创建消费者对象
+        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
+
+        List<String> topics = new ArrayList<>();
+        topics.add("Hello-Kafka");
+        //消费者订阅主题
+        consumer.subscribe(topics);
+        while (true){
+            //批量拉取主题消息,每3秒拉取一次
+            ConsumerRecords<Integer, String> records = consumer.poll(3000);
+            //变量消息
+            for (ConsumerRecord<Integer, String> record : records) {
+                System.out.println(record.value());
+            }
+            consumer.commitAsync();
+        }
+    }
+}