瀏覽代碼

feat: 使用MQ解耦抽奖发货流程

seamew 2 年之前
父節點
當前提交
801a1afd20
共有 29 個文件被更改,包括 429 次插入154 次删除
  1. 0 33
      lottery-application/src/main/java/com/seamew/lottery/application/mq/KafkaConsumer.java
  2. 0 52
      lottery-application/src/main/java/com/seamew/lottery/application/mq/KafkaProducer.java
  3. 64 0
      lottery-application/src/main/java/com/seamew/lottery/application/mq/consumer/KafkaConsumer.java
  4. 41 0
      lottery-application/src/main/java/com/seamew/lottery/application/mq/producer/KafkaProducer.java
  5. 35 2
      lottery-application/src/main/java/com/seamew/lottery/application/process/impl/ActivityProcessImpl.java
  6. 26 0
      lottery-common/src/main/java/com/seamew/lottery/common/Constants.java
  7. 58 0
      lottery-domain/src/main/java/com/seamew/lottery/domain/activity/model/vo/InvoiceVO.java
  8. 9 0
      lottery-domain/src/main/java/com/seamew/lottery/domain/activity/repository/IUserTakeActivityRepository.java
  9. 8 0
      lottery-domain/src/main/java/com/seamew/lottery/domain/activity/service/partake/IActivityPartake.java
  10. 4 3
      lottery-domain/src/main/java/com/seamew/lottery/domain/activity/service/partake/impl/ActivityPartakeImpl.java
  11. 11 2
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/model/req/GoodsReq.java
  12. 0 12
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/repository/IAwardRepository.java
  13. 22 0
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/repository/IOrderRepository.java
  14. 4 5
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/DistributionBase.java
  15. 1 1
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/CouponGoods.java
  16. 1 1
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/DescGoods.java
  17. 1 1
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/PhysicalGoods.java
  18. 1 1
      lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/RedeemCodeGoods.java
  19. 1 1
      lottery-domain/src/main/java/com/seamew/lottery/domain/strategy/service/draw/AbstractDrawBase.java
  20. 16 0
      lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/dao/IUserStrategyExportDao.java
  21. 4 0
      lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/po/UserStrategyExport.java
  22. 0 17
      lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/AwardRepository.java
  23. 34 0
      lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/OrderRepository.java
  24. 12 1
      lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/UserTakeActivityRepository.java
  25. 4 2
      lottery-interfaces/src/main/resources/application.yaml
  26. 23 2
      lottery-interfaces/src/main/resources/mybatis/mapper/UserStrategyExport_Mapper.xml
  27. 1 1
      lottery-interfaces/src/test/java/com/seamew/lottery/test/DrawAlgorithmTest.java
  28. 27 5
      lottery-interfaces/src/test/java/com/seamew/lottery/test/application/ActivityProcessTest.java
  29. 21 12
      lottery-interfaces/src/test/java/com/seamew/lottery/test/application/KafkaProducerTest.java

+ 0 - 33
lottery-application/src/main/java/com/seamew/lottery/application/mq/KafkaConsumer.java

@@ -1,33 +0,0 @@
-package com.seamew.lottery.application.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.Acknowledgment;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-import java.util.Optional;
-
-/**
- * @Author: seamew
- * @Title: KafkaConsumer
- * @CreateTime: 2023年02月27日 11:12:00
- * @Description: 消息消费者
- * @Version: 1.0
- */
-@Component
-@Slf4j
-public class KafkaConsumer {
-    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
-    public void topicTest(ConsumerRecord<?, ?> record, Acknowledgment ack) {
-        if (ObjectUtils.isNotEmpty(record.value())) {
-            log.info("topic_test 消费了:Topic:{},Message:{}", record.topic(), record.value());
-            ack.acknowledge();
-        } else {
-            log.error("message为空");
-        }
-    }
-}

+ 0 - 52
lottery-application/src/main/java/com/seamew/lottery/application/mq/KafkaProducer.java

@@ -1,52 +0,0 @@
-package com.seamew.lottery.application.mq;
-
-import com.alibaba.fastjson2.JSON;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.stereotype.Component;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-import javax.annotation.Resource;
-
-/**
- * @Author: seamew
- * @Title: KafkaProducer
- * @CreateTime: 2023年02月27日 11:06:00
- * @Description: 消息生产者
- * @Version: 1.0
- */
-@Component
-@Slf4j
-public class KafkaProducer {
-    @Resource
-    private KafkaTemplate<String, Object> kafkaTemplate;
-
-    public static final String TOPIC_TEST = "Hello-Kafka";
-
-    public static final String TOPIC_GROUP = "test-consumer-group";
-
-    public void send(Object obj) {
-        log.info("准备发送消息为:{}", JSON.toJSONString(obj));
-
-        // 发送消息
-        kafkaTemplate
-                .send(TOPIC_TEST, obj)
-                .addCallback(success -> {
-                    if (success != null) {
-                        // 消息发送到的topic
-                        String topic = success.getRecordMetadata().topic();
-                        // 消息发送到的分区
-                        int partition = success.getRecordMetadata().partition();
-                        // 消息在分区内的offset
-                        long offset = success.getRecordMetadata().offset();
-                        // 获取message
-                        Object message = success.getProducerRecord().value();
-                        log.info("{}-{}-{} - 生产者 发送消息成功:{}", topic, partition, offset, message);
-                    }
-                }, error -> {
-                    log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + error.getMessage());
-                });
-    }
-}

+ 64 - 0
lottery-application/src/main/java/com/seamew/lottery/application/mq/consumer/KafkaConsumer.java

@@ -0,0 +1,64 @@
+package com.seamew.lottery.application.mq.consumer;
+
+import com.alibaba.fastjson2.JSON;
+import com.seamew.lottery.application.mq.producer.KafkaProducer;
+import com.seamew.lottery.common.Constants;
+import com.seamew.lottery.domain.activity.model.vo.InvoiceVO;
+import com.seamew.lottery.domain.award.model.req.GoodsReq;
+import com.seamew.lottery.domain.award.model.res.DistributionRes;
+import com.seamew.lottery.domain.award.service.factory.DistributionGoodsFactory;
+import com.seamew.lottery.domain.award.service.goods.IDistributionGoods;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @Author: seamew
+ * @Title: KafkaConsumer
+ * @CreateTime: 2023年02月27日 11:12:00
+ * @Description: 消息消费者
+ * @Version: 1.0
+ */
+@Component
+@Slf4j
+public class KafkaConsumer {
+
+    @Resource
+    private DistributionGoodsFactory distributionGoodsFactory;
+
+    @KafkaListener(topics = KafkaProducer.TOPIC_NAME, groupId = KafkaProducer.GROUP_NAME)
+    public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack) {
+        // 1. 判断消息是否存在
+        if (ObjectUtils.isEmpty(record.value())) {
+            return;
+        }
+        // 2. 处理 MQ 消息
+        try {
+            // 1. 转化对象
+            InvoiceVO invoiceVO = JSON.parseObject((String) record.value(), InvoiceVO.class);
+
+            // 2. 获取发送奖品工厂,执行发奖
+            IDistributionGoods distributionGoodsService = distributionGoodsFactory.getDistributionGoodsService(invoiceVO.getAwardType());
+            DistributionRes distributionRes = distributionGoodsService.doDistribution(new GoodsReq(invoiceVO.getUId(), invoiceVO.getOrderId(), invoiceVO.getAwardId(), invoiceVO.getAwardName(), invoiceVO.getAwardContent()));
+
+            if (!Constants.AwardState.SUCCESS.getCode().equals(distributionRes.getCode())) {
+                throw new RuntimeException("发奖失败");
+            }
+
+            // 3. 打印日志
+            log.info("消费MQ消息,完成 topic:{} bizId:{} 发奖结果:{}", record.topic(), invoiceVO.getUId(), JSON.toJSONString(distributionRes));
+
+            // 4. 消息消费完成
+            ack.acknowledge();
+        } catch (Exception e) {
+            // 发奖环节失败,消息重试。所有到环节,发货、更新库,都需要保证幂等。
+            log.error("消费MQ消息,失败 topic:{} message:{}", record.topic(), record.value());
+            throw e;
+        }
+    }
+}

+ 41 - 0
lottery-application/src/main/java/com/seamew/lottery/application/mq/producer/KafkaProducer.java

@@ -0,0 +1,41 @@
+package com.seamew.lottery.application.mq.producer;
+
+import com.alibaba.fastjson2.JSON;
+import com.seamew.lottery.domain.activity.model.vo.InvoiceVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.Resource;
+
+/**
+ * @Author: seamew
+ * @Title: KafkaProducer
+ * @CreateTime: 2023年02月27日 11:06:00
+ * @Description: 消息生产者
+ * @Version: 1.0
+ */
+@Component
+@Slf4j
+public class KafkaProducer {
+    @Resource
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    public static final String TOPIC_NAME = "lottery_invoice";
+
+    public static final String GROUP_NAME = "lottery";
+
+    /**
+     * 发送中奖物品发货单消息
+     *
+     * @param invoice 发货单
+     */
+    public ListenableFuture<SendResult<String, Object>> sendLotteryInvoice(InvoiceVO invoice) {
+        String objJson = JSON.toJSONString(invoice);
+        log.info("发送MQ消息 topic:{} bizId:{} message:{}", TOPIC_NAME, invoice.getUId(), objJson);
+        return kafkaTemplate.send(TOPIC_NAME, objJson);
+    }
+}

+ 35 - 2
lottery-application/src/main/java/com/seamew/lottery/application/process/impl/ActivityProcessImpl.java

@@ -1,13 +1,16 @@
 package com.seamew.lottery.application.process.impl;
 
+import com.seamew.lottery.application.mq.producer.KafkaProducer;
 import com.seamew.lottery.application.process.IActivityProcess;
 import com.seamew.lottery.application.process.req.DrawProcessReq;
 import com.seamew.lottery.application.process.res.DrawProcessResult;
 import com.seamew.lottery.application.process.res.RuleQuantificationCrowdResult;
 import com.seamew.lottery.common.Constants;
+import com.seamew.lottery.common.Result;
 import com.seamew.lottery.domain.activity.model.req.PartakeReq;
 import com.seamew.lottery.domain.activity.model.res.PartakeResult;
 import com.seamew.lottery.domain.activity.model.vo.DrawOrderVO;
+import com.seamew.lottery.domain.activity.model.vo.InvoiceVO;
 import com.seamew.lottery.domain.activity.service.partake.IActivityPartake;
 import com.seamew.lottery.domain.rule.model.req.DecisionMatterReq;
 import com.seamew.lottery.domain.rule.model.res.EngineResult;
@@ -18,6 +21,7 @@ import com.seamew.lottery.domain.strategy.model.vo.DrawAwardVO;
 import com.seamew.lottery.domain.strategy.service.draw.IDrawExec;
 import com.seamew.lottery.domain.support.ids.IIdGenerator;
 import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFutureCallback;
 
 import javax.annotation.Resource;
 import java.util.Map;
@@ -43,6 +47,9 @@ public class ActivityProcessImpl implements IActivityProcess {
     @Resource
     private Map<Constants.Ids, IIdGenerator> idGeneratorMap;
 
+    @Resource
+    private KafkaProducer kafkaProducer;
+
     @Override
     public DrawProcessResult doDrawProcess(DrawProcessReq req) {
         // 1. 领取活动
@@ -61,10 +68,23 @@ public class ActivityProcessImpl implements IActivityProcess {
         DrawAwardVO drawAwardVO = drawResult.getDrawAwardInfo();
 
         // 3. 结果落库
-        activityPartake.recordDrawOrder(buildDrawOrderVO(req, strategyId, takeId, drawAwardVO));
+        DrawOrderVO drawOrderVO = buildDrawOrderVO(req, strategyId, takeId, drawAwardVO);
+        Result recordResult = activityPartake.recordDrawOrder(drawOrderVO);
+        if (!Constants.ResponseCode.SUCCESS.getCode().equals(recordResult.getCode())) {
+            return new DrawProcessResult(recordResult.getCode(), recordResult.getInfo());
+        }
 
         // 4. 发送MQ,触发发奖流程
-
+        InvoiceVO invoiceVO = buildInvoiceVO(drawOrderVO);
+        kafkaProducer
+                .sendLotteryInvoice(invoiceVO)
+                .addCallback(success -> {
+                    // 4.1 MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1
+                    activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());
+                }, error -> {
+                    // 4.2 MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
+                    activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
+                });
         // 5. 返回结果
         return new DrawProcessResult(Constants.ResponseCode.SUCCESS.getCode(), Constants.ResponseCode.SUCCESS.getInfo(), drawAwardVO);
     }
@@ -104,4 +124,17 @@ public class ActivityProcessImpl implements IActivityProcess {
         drawOrderVO.setAwardContent(drawAwardVO.getAwardContent());
         return drawOrderVO;
     }
+
+    private InvoiceVO buildInvoiceVO(DrawOrderVO drawOrderVO) {
+        InvoiceVO invoiceVO = new InvoiceVO();
+        invoiceVO.setUId(drawOrderVO.getUId());
+        invoiceVO.setOrderId(drawOrderVO.getOrderId());
+        invoiceVO.setAwardId(drawOrderVO.getAwardId());
+        invoiceVO.setAwardType(drawOrderVO.getAwardType());
+        invoiceVO.setAwardName(drawOrderVO.getAwardName());
+        invoiceVO.setAwardContent(drawOrderVO.getAwardContent());
+        invoiceVO.setShippingAddress(null);
+        invoiceVO.setExtInfo(null);
+        return invoiceVO;
+    }
 }

+ 26 - 0
lottery-common/src/main/java/com/seamew/lottery/common/Constants.java

@@ -359,6 +359,32 @@ public class Constants {
         }
 
 
+        public String getInfo() {
+            return info;
+        }
+
+    }
+
+    /**
+     * 消息发送状态(0未发送、1发送成功、2发送失败)
+     */
+    public enum MQState {
+        INIT(0, "初始"),
+        COMPLETE(1, "完成"),
+        FAIL(2, "失败");
+
+        private final Integer code;
+        private final String info;
+
+        MQState(Integer code, String info) {
+            this.code = code;
+            this.info = info;
+        }
+
+        public Integer getCode() {
+            return code;
+        }
+
         public String getInfo() {
             return info;
         }

+ 58 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/model/vo/InvoiceVO.java

@@ -0,0 +1,58 @@
+package com.seamew.lottery.domain.activity.model.vo;
+
+import com.seamew.lottery.domain.award.model.vo.ShippingAddress;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author: seamew
+ * @Title: InvoiceVO
+ * @CreateTime: 2023年02月28日 10:19:00
+ * @Description: 中奖物品发货单,用于发送MQ消息,异步触达发货奖品给用户
+ * @Version: 1.0
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class InvoiceVO {
+    /**
+     * 用户ID
+     */
+    private String uId;
+
+    /**
+     * 抽奖单号 ID
+     */
+    private Long orderId;
+
+    /**
+     * 奖品ID
+     */
+    private String awardId;
+
+    /**
+     * 奖品类型(1:文字描述、2:兑换码、3:优惠券、4:实物奖品)
+     */
+    private Integer awardType;
+
+    /**
+     * 奖品名称
+     */
+    private String awardName;
+
+    /**
+     * 奖品内容「描述、奖品码、sku」
+     */
+    private String awardContent;
+
+    /**
+     * 四级送货地址(只有实物类商品需要地址)
+     */
+    private ShippingAddress shippingAddress;
+
+    /**
+     * 扩展信息,用于一些个性商品发放所需要的透传字段内容
+     */
+    private String extInfo;
+}

+ 9 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/repository/IUserTakeActivityRepository.java

@@ -67,4 +67,13 @@ public interface IUserTakeActivityRepository {
      * @return 领取单
      */
     UserTakeActivityVO queryNoConsumedTakeActivityOrder(Long activityId, String uId);
+
+    /**
+     * 更新发货单MQ状态
+     *
+     * @param uId     用户ID
+     * @param orderId 订单ID
+     * @param mqState MQ 发送状态
+     */
+    void updateInvoiceMqState(String uId, Long orderId, Integer mqState);
 }

+ 8 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/service/partake/IActivityPartake.java

@@ -26,4 +26,12 @@ public interface IActivityPartake {
      * @return          保存结果
      */
     Result recordDrawOrder(DrawOrderVO drawOrder);
+
+    /**
+     * 更新发货单MQ状态
+     *  @param uId      用户ID
+     * @param orderId   订单ID
+     * @param mqState   MQ 发送状态
+     */
+    void updateInvoiceMqState(String uId, Long orderId, Integer mqState);
 }

+ 4 - 3
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/service/partake/impl/ActivityPartakeImpl.java

@@ -32,9 +32,6 @@ public class ActivityPartakeImpl extends BaseActivityPartake {
     @Resource
     private IUserTakeActivityRepository userTakeActivityRepository;
 
-    @Resource
-    private Map<Constants.Ids, IIdGenerator> idGeneratorMap;
-
     @Resource
     private TransactionTemplate transactionTemplate;
 
@@ -139,6 +136,10 @@ public class ActivityPartakeImpl extends BaseActivityPartake {
         } finally {
             dbRouter.clear();
         }
+    }
 
+    @Override
+    public void updateInvoiceMqState(String uId, Long orderId, Integer mqState) {
+        userTakeActivityRepository.updateInvoiceMqState(uId, orderId, mqState);
     }
 }

+ 11 - 2
lottery-domain/src/main/java/com/seamew/lottery/domain/award/model/req/GoodsReq.java

@@ -25,7 +25,7 @@ public class GoodsReq {
     /**
      * 抽奖单号 ID
      */
-    private String orderId;
+    private Long orderId;
 
     /**
      * 奖品ID
@@ -52,7 +52,7 @@ public class GoodsReq {
      */
     private String extInfo;
 
-    public GoodsReq(String uId, String orderId, String awardId, String awardName, String awardContent) {
+    public GoodsReq(String uId, Long orderId, String awardId, String awardName, String awardContent) {
         this.uId = uId;
         this.orderId = orderId;
         this.awardId = awardId;
@@ -60,5 +60,14 @@ public class GoodsReq {
         this.awardContent = awardContent;
     }
 
+    public GoodsReq(String uId, Long orderId, String awardId, String awardName, String awardContent, ShippingAddress shippingAddress) {
+        this.uId = uId;
+        this.orderId = orderId;
+        this.awardId = awardId;
+        this.awardName = awardName;
+        this.awardContent = awardContent;
+        this.shippingAddress = shippingAddress;
+    }
+
 
 }

+ 0 - 12
lottery-domain/src/main/java/com/seamew/lottery/domain/award/repository/IAwardRepository.java

@@ -1,12 +0,0 @@
-package com.seamew.lottery.domain.award.repository;
-
-/**
- * @Author: seamew
- * @Title: IAwardRepository
- * @CreateTime: 2023年02月15日 10:39:00
- * @Description: 奖品表仓储服务接口
- * @Version: 1.0
- */
-public interface IAwardRepository {
-    // TODO 对分库分表中的用户中奖纪录操作
-}

+ 22 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/award/repository/IOrderRepository.java

@@ -0,0 +1,22 @@
+package com.seamew.lottery.domain.award.repository;
+
+/**
+ * @Author: seamew
+ * @Title: IOrderRepository
+ * @CreateTime: 2023年02月15日 10:39:00
+ * @Description: 奖品表仓储服务接口
+ * @Version: 1.0
+ */
+public interface IOrderRepository {
+
+    /**
+     * 更新奖品发放状态
+     *
+     * @param uId               用户ID
+     * @param orderId           订单ID
+     * @param awardId           奖品ID
+     * @param grantState        奖品状态
+     */
+    void updateUserAwardState(String uId, Long orderId, String awardId, Integer grantState);
+
+}

+ 4 - 5
lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/DistributionBase.java

@@ -1,6 +1,6 @@
 package com.seamew.lottery.domain.award.service.goods;
 
-import com.seamew.lottery.domain.award.repository.IAwardRepository;
+import com.seamew.lottery.domain.award.repository.IOrderRepository;
 import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Resource;
@@ -15,10 +15,9 @@ import javax.annotation.Resource;
 @Slf4j
 public class DistributionBase {
     @Resource
-    private IAwardRepository awardRepository;
+    private IOrderRepository awardRepository;
 
-    protected void updateUserAwardState(String uId, String orderId, String awardId, Integer awardState, String awardStateInfo) {
-        // TODO 后期添加更新分库分表中,用户个人的抽奖记录表中奖品发奖状态
-        log.info("TODO 后期添加更新分库分表中,用户个人的抽奖记录表中奖品发奖状态 uId:{}", uId);
+    protected void updateUserAwardState(String uId, Long orderId, String awardId, Integer grantState) {
+        awardRepository.updateUserAwardState(uId, orderId, awardId, grantState);
     }
 }

+ 1 - 1
lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/CouponGoods.java

@@ -27,7 +27,7 @@ public class CouponGoods extends DistributionBase implements IDistributionGoods
         log.info("模拟调用优惠券发放接口 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());
 
         // 更新用户领奖结果
-        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
+        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
 
         return new DistributionRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
     }

+ 1 - 1
lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/DescGoods.java

@@ -26,7 +26,7 @@ public class DescGoods extends DistributionBase implements IDistributionGoods {
         log.info("模拟描述类商品发奖 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());
 
         // 更新用户领奖结果
-        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
+        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
 
         return new DistributionRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
     }

+ 1 - 1
lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/PhysicalGoods.java

@@ -27,7 +27,7 @@ public class PhysicalGoods extends DistributionBase implements IDistributionGood
         log.info("模拟调用实物发奖 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());
 
         // 更新用户领奖结果
-        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
+        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
 
         return new DistributionRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
     }

+ 1 - 1
lottery-domain/src/main/java/com/seamew/lottery/domain/award/service/goods/impl/RedeemCodeGoods.java

@@ -27,7 +27,7 @@ public class RedeemCodeGoods extends DistributionBase implements IDistributionGo
         log.info("模拟调用兑换码 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());
 
         // 更新用户领奖结果
-        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
+        updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
 
         return new DistributionRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
     }

+ 1 - 1
lottery-domain/src/main/java/com/seamew/lottery/domain/strategy/service/draw/AbstractDrawBase.java

@@ -98,7 +98,7 @@ public abstract class AbstractDrawBase extends DrawStrategySupport implements ID
             return new DrawResult(uId, strategyId, Constants.DrawState.FAIL.getCode());
         }
 
-        AwardBriefVO award = super.queryAwardInfoByAwardId(awardId);
+        AwardBriefVO award = queryAwardInfoByAwardId(awardId);
         DrawAwardVO drawAwardInfo = new DrawAwardVO(uId, award.getAwardId(), award.getAwardType(), award.getAwardName(), award.getAwardContent());
         drawAwardInfo.setStrategyMode(strategy.getStrategyMode());
         drawAwardInfo.setGrantType(strategy.getGrantType());

+ 16 - 0
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/dao/IUserStrategyExportDao.java

@@ -29,4 +29,20 @@ public interface IUserStrategyExportDao {
      */
     @DBRouter(splitTable = true)
     UserStrategyExport queryUserStrategyExportByUId(String uId);
+
+    /**
+     * 更新发奖状态
+     *
+     * @param userStrategyExport 发奖信息
+     */
+    @DBRouter(splitTable = true)
+    void updateUserAwardState(UserStrategyExport userStrategyExport);
+
+    /**
+     * 更新发送MQ状态
+     *
+     * @param userStrategyExport 发送消息
+     */
+    @DBRouter(splitTable = true)
+    void updateInvoiceMqState(UserStrategyExport userStrategyExport);
 }

+ 4 - 0
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/po/UserStrategyExport.java

@@ -73,6 +73,10 @@ public class UserStrategyExport {
      * 防重ID
      */
     private String uuid;
+    /**
+     * 消息发送状态(0未发送、1发送成功、2发送失败)
+     */
+    private Integer mqState;
     /**
      * 创建时间
      */

+ 0 - 17
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/AwardRepository.java

@@ -1,17 +0,0 @@
-package com.seamew.lottery.infrastructure.repository;
-
-import com.seamew.lottery.domain.award.repository.IAwardRepository;
-import org.springframework.stereotype.Repository;
-
-/**
- * @Author: seamew
- * @Title: AwardRepository
- * @CreateTime: 2023年02月15日 10:39:00
- * @Description: 奖品表仓储服务
- * @Version: 1.0
- */
-@Repository
-public class AwardRepository implements IAwardRepository {
-
-}
-

+ 34 - 0
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/OrderRepository.java

@@ -0,0 +1,34 @@
+package com.seamew.lottery.infrastructure.repository;
+
+import com.seamew.lottery.domain.award.repository.IOrderRepository;
+import com.seamew.lottery.infrastructure.dao.IUserStrategyExportDao;
+import com.seamew.lottery.infrastructure.po.UserStrategyExport;
+import org.springframework.stereotype.Repository;
+
+import javax.annotation.Resource;
+
+/**
+ * @Author: seamew
+ * @Title: AwardRepository
+ * @CreateTime: 2023年02月15日 10:39:00
+ * @Description: 奖品表仓储服务
+ * @Version: 1.0
+ */
+@Repository
+public class OrderRepository implements IOrderRepository {
+
+    @Resource
+    private IUserStrategyExportDao userStrategyExportDao;
+
+    @Override
+    public void updateUserAwardState(String uId, Long orderId, String awardId, Integer grantState) {
+        UserStrategyExport userStrategyExport = new UserStrategyExport();
+        userStrategyExport.setUId(uId);
+        userStrategyExport.setOrderId(orderId);
+        userStrategyExport.setAwardId(awardId);
+        userStrategyExport.setGrantState(grantState);
+        userStrategyExportDao.updateUserAwardState(userStrategyExport);
+    }
+}
+
+

+ 12 - 1
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/UserTakeActivityRepository.java

@@ -98,7 +98,8 @@ public class UserTakeActivityRepository implements IUserTakeActivityRepository {
         userStrategyExport.setAwardType(drawOrder.getAwardType());
         userStrategyExport.setAwardName(drawOrder.getAwardName());
         userStrategyExport.setAwardContent(drawOrder.getAwardContent());
-        userStrategyExport.setUuid(String.valueOf(drawOrder.getOrderId()));
+        userStrategyExport.setUuid(String.valueOf(drawOrder.getTakeId()));
+        userStrategyExport.setMqState(Constants.MQState.INIT.getCode());
 
         userStrategyExportDao.insert(userStrategyExport);
     }
@@ -125,4 +126,14 @@ public class UserTakeActivityRepository implements IUserTakeActivityRepository {
         return userTakeActivityVO;
     }
 
+
+    @Override
+    public void updateInvoiceMqState(String uId, Long orderId, Integer mqState) {
+        UserStrategyExport userStrategyExport = new UserStrategyExport();
+        userStrategyExport.setUId(uId);
+        userStrategyExport.setOrderId(orderId);
+        userStrategyExport.setMqState(mqState);
+        userStrategyExportDao.updateInvoiceMqState(userStrategyExport);
+    }
+
 }

+ 4 - 2
lottery-interfaces/src/main/resources/application.yaml

@@ -21,7 +21,7 @@ spring:
       acks: 1
     consumer:
       # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
-      auto-commit-interval: 1S
+      # auto-commit-interval: 1S
       # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
       # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
       # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
@@ -32,6 +32,8 @@ spring:
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       # 值的反序列化方式
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      properties:
+        session.timeout.ms: 60000
     listener:
       # 在侦听器容器中运行的线程数。
       concurrency: 5
@@ -66,7 +68,7 @@ mybatis:
   configuration:
     map-underscore-to-camel-case: true
     # 开启sql日志
-#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+    # log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
 
 dubbo:
   application:

+ 23 - 2
lottery-interfaces/src/main/resources/mybatis/mapper/UserStrategyExport_Mapper.xml

@@ -6,10 +6,10 @@
         INSERT INTO user_strategy_export
         (u_id, activity_id, order_id, strategy_id, strategy_mode,
          grant_type, grant_date, grant_state, award_id, award_type,
-         award_name, award_content, uuid, create_time, update_time)
+         award_name, award_content, uuid, mq_state, create_time, update_time)
         VALUES (#{uId}, #{activityId}, #{orderId}, #{strategyId}, #{strategyMode},
                 #{grantType}, #{grantDate}, #{grantState}, #{awardId}, #{awardType},
-                #{awardName}, #{awardContent}, #{uuid}, now(), now())
+                #{awardName}, #{awardContent}, #{uuid}, #{mqState}, now(), now())
     </insert>
 
     <select id="queryUserStrategyExportByUId" parameterType="java.lang.String"
@@ -28,10 +28,31 @@
                award_name,
                award_content,
                uuid,
+               mq_state,
                create_time,
                update_time
         FROM user_strategy_export
         WHERE u_id = #{uId}
     </select>
 
+
+    <update id="updateUserAwardState" parameterType="com.seamew.lottery.infrastructure.po.UserStrategyExport">
+        UPDATE user_strategy_export
+        SET grant_state = #{grantState},
+            grant_date  = now(),
+            update_time = now()
+        WHERE u_id = #{uId}
+          AND order_id = #{orderId}
+          AND award_id = #{awardId}
+    </update>
+
+
+    <update id="updateInvoiceMqState" parameterType="com.seamew.lottery.infrastructure.po.UserStrategyExport">
+        UPDATE user_strategy_export
+        SET mq_state    = #{mqState},
+            update_time = now()
+        WHERE u_id = #{uId}
+          AND order_id = #{orderId}
+    </update>
+
 </mapper>

+ 1 - 1
lottery-interfaces/src/test/java/com/seamew/lottery/test/DrawAlgorithmTest.java

@@ -55,7 +55,7 @@ public class DrawAlgorithmTest {
 
         // 封装发奖参数,orderId:2109313442431 为模拟ID,需要在用户参与领奖活动时生成
         DrawAwardVO drawAwardInfo = drawResult.getDrawAwardInfo();
-        GoodsReq goodsReq = new GoodsReq(drawResult.getUId(), "2109313442431", drawAwardInfo.getAwardId(), drawAwardInfo.getAwardName(), drawAwardInfo.getAwardContent());
+        GoodsReq goodsReq = new GoodsReq(drawResult.getUId(), 2109313442431L, drawAwardInfo.getAwardId(), drawAwardInfo.getAwardName(), drawAwardInfo.getAwardContent());
 
         // 根据 awardType 从抽奖工厂中获取对应的发奖服务
         IDistributionGoods distributionGoodsService = distributionGoodsFactory.getDistributionGoodsService(drawAwardInfo.getAwardType());

+ 27 - 5
lottery-interfaces/src/test/java/com/seamew/lottery/test/application/ActivityProcessTest.java

@@ -4,12 +4,15 @@ import com.alibaba.fastjson2.JSON;
 import com.seamew.lottery.application.process.IActivityProcess;
 import com.seamew.lottery.application.process.req.DrawProcessReq;
 import com.seamew.lottery.application.process.res.DrawProcessResult;
+import com.seamew.lottery.common.Constants;
+import com.seamew.lottery.domain.activity.service.partake.IActivityPartake;
 import com.seamew.middleware.db.router.strategy.IDBRouterStrategy;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.bind.annotation.ResponseStatus;
 
 import javax.annotation.Resource;
 
@@ -30,23 +33,42 @@ public class ActivityProcessTest {
     @Resource
     IDBRouterStrategy dbRouter;
 
+    @Resource
+    private IActivityPartake activityPartake;
+
     @Test
-    public void test_doDrawProcess() {
+    public void test_doDrawProcess() throws Exception {
         DrawProcessReq req = new DrawProcessReq();
-        req.setUId("fustack");
+        req.setUId("xiaofuge");
         req.setActivityId(100001L);
-
         DrawProcessResult drawProcessResult = activityProcess.doDrawProcess(req);
+
         log.info("请求入参:{}", JSON.toJSONString(req));
         log.info("测试结果:{}", JSON.toJSONString(drawProcessResult));
+
+        Thread.sleep(10000);
+
     }
 
     @Test
-    public void test_router() {
+    public void test_router() throws Exception {
         DrawProcessReq req = new DrawProcessReq();
         req.setUId("fustack");
         req.setActivityId(100001L);
-        dbRouter.doRouter(req.getUId());
+        DrawProcessResult drawProcessResult = activityProcess.doDrawProcess(req);
+
+        log.info("请求入参:{}", JSON.toJSONString(req));
+        log.info("测试结果:{}", JSON.toJSONString(drawProcessResult));
+
+        while (true) {
+            Thread.sleep(30000);
+        }
+    }
+
+    @Test
+    public void test_update() {
+        activityPartake.updateInvoiceMqState("fustack", 1630424938135584768L, Constants.MQState.COMPLETE.getCode());
+
     }
 
 }

+ 21 - 12
lottery-interfaces/src/test/java/com/seamew/lottery/test/application/KafkaProducerTest.java

@@ -1,6 +1,8 @@
 package com.seamew.lottery.test.application;
 
-import com.seamew.lottery.application.mq.KafkaProducer;
+import com.seamew.lottery.application.mq.producer.KafkaProducer;
+import com.seamew.lottery.common.Constants;
+import com.seamew.lottery.domain.activity.model.vo.InvoiceVO;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -35,13 +37,20 @@ public class KafkaProducerTest {
 
     @Test
     public void test_send() throws InterruptedException {
-        // 循环发送消息
-        // for (int i = 0; i < 5; i++) {
-        //     kafkaProducer.send("你好,seamew 00" + i);
-        //     Thread.sleep(3500);
-        // }
-        kafkaProducer.send("你好,seamew 007");
-        Thread.sleep(4000);
+        InvoiceVO invoice = new InvoiceVO();
+        invoice.setUId("fustack");
+        invoice.setOrderId(1444540456057864192L);
+        invoice.setAwardId("3");
+        invoice.setAwardType(Constants.AwardType.DESC.getCode());
+        invoice.setAwardName("Code");
+        invoice.setAwardContent("苹果电脑");
+        invoice.setShippingAddress(null);
+        invoice.setExtInfo(null);
+
+        kafkaProducer.sendLotteryInvoice(invoice);
+        while (true) {
+            Thread.sleep(10000);
+        }
     }
 
     @Test
@@ -54,18 +63,18 @@ public class KafkaProducerTest {
         configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
         //value反序列化类
         configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
+        configs.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaProducer.GROUP_NAME);
         configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         //设置手动提交
-        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
+        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         //创建消费者对象
         KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
 
         List<String> topics = new ArrayList<>();
-        topics.add("Hello-Kafka");
+        topics.add(KafkaProducer.TOPIC_NAME);
         //消费者订阅主题
         consumer.subscribe(topics);
-        while (true){
+        while (true) {
             //批量拉取主题消息,每3秒拉取一次
             ConsumerRecords<Integer, String> records = consumer.poll(3000);
             //变量消息