|
@@ -1,14 +1,22 @@
|
|
|
package com.seamew.lottery.application.worker;
|
|
|
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
+import com.seamew.lottery.application.mq.producer.KafkaProducer;
|
|
|
import com.seamew.lottery.common.Constants;
|
|
|
import com.seamew.lottery.common.Result;
|
|
|
import com.seamew.lottery.domain.activity.model.vo.ActivityVO;
|
|
|
+import com.seamew.lottery.domain.activity.model.vo.InvoiceVO;
|
|
|
import com.seamew.lottery.domain.activity.service.deploy.IActivityDeploy;
|
|
|
+import com.seamew.lottery.domain.activity.service.partake.IActivityPartake;
|
|
|
import com.seamew.lottery.domain.activity.service.stateflow.IStateHandler;
|
|
|
+import com.seamew.middleware.db.router.strategy.IDBRouterStrategy;
|
|
|
+import com.xxl.job.core.context.XxlJobHelper;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.kafka.support.SendResult;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.concurrent.ListenableFuture;
|
|
|
+import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.Date;
|
|
@@ -27,9 +35,19 @@ public class LotteryXxlJob {
|
|
|
@Resource
|
|
|
private IActivityDeploy activityDeploy;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private IActivityPartake activityPartake;
|
|
|
+
|
|
|
@Resource
|
|
|
private IStateHandler stateHandler;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private IDBRouterStrategy dbRouter;
|
|
|
+
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private KafkaProducer kafkaProducer;
|
|
|
+
|
|
|
@XxlJob("lotteryActivityStateJobHandler")
|
|
|
public void lotteryActivityStateJobHandler() throws Exception {
|
|
|
log.info("扫描活动状态 Begin");
|
|
@@ -69,4 +87,63 @@ public class LotteryXxlJob {
|
|
|
log.info("扫描活动状态 End");
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ @XxlJob("lotteryOrderMQStateJobHandler")
|
|
|
+ public void lotteryOrderMQStateJobHandler() throws Exception {
|
|
|
+ // 验证参数
|
|
|
+ String jobParam = XxlJobHelper.getJobParam();
|
|
|
+ if (null == jobParam) {
|
|
|
+ log.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 错误 params is null");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取分布式任务配置参数信息 参数配置格式:1,2,3 也可以是指定扫描一个,也可以配置多个库,按照部署的任务集群进行数量配置,均摊分别扫描效率更高
|
|
|
+ String[] params = jobParam.split(",");
|
|
|
+ log.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 开始 params:{}", JSON.toJSONString(params));
|
|
|
+
|
|
|
+ if (params.length == 0) {
|
|
|
+ log.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 结束 params is null");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取分库分表配置下的分表数
|
|
|
+ int tbCount = dbRouter.tbCount();
|
|
|
+
|
|
|
+ // 循环获取指定扫描库
|
|
|
+ for (String param : params) {
|
|
|
+ // 获取当前任务扫描的指定分库
|
|
|
+ int dbCount = Integer.parseInt(param);
|
|
|
+
|
|
|
+ // 判断配置指定扫描库数,是否存在
|
|
|
+ if (dbCount > dbRouter.dbCount()) {
|
|
|
+ log.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 结束 dbCount not exist");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 循环扫描对应表
|
|
|
+ for (int i = 0; i < tbCount; i++) {
|
|
|
+
|
|
|
+ // 扫描库表数据
|
|
|
+ List<InvoiceVO> invoiceVOList = activityPartake.scanInvoiceMqState(dbCount, i);
|
|
|
+ log.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 扫描库:{} 扫描表:{} 扫描数:{}", dbCount, i, invoiceVOList.size());
|
|
|
+
|
|
|
+ // 补偿 MQ 消息
|
|
|
+ for (InvoiceVO invoiceVO : invoiceVOList) {
|
|
|
+ kafkaProducer
|
|
|
+ .sendLotteryInvoice(invoiceVO)
|
|
|
+ .addCallback(success -> {
|
|
|
+ // MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1
|
|
|
+ activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());
|
|
|
+ }, error -> {
|
|
|
+ // MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
|
|
|
+ activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 完成 param:{}", JSON.toJSONString(params));
|
|
|
+
|
|
|
+ }
|
|
|
}
|