Browse Source

feat: 引入xxl job处理活动状态扫描

seamew 2 years ago
parent
commit
02e73ac2c2

+ 7 - 0
lottery-application/pom.xml

@@ -18,11 +18,18 @@
             <version>1.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+
         <!-- kafka依赖 -->
         <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
         </dependency>
+
+        <!-- xxl_job -->
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 72 - 0
lottery-application/src/main/java/com/seamew/lottery/application/worker/LotteryXxlJob.java

@@ -0,0 +1,72 @@
+package com.seamew.lottery.application.worker;
+
+import com.alibaba.fastjson2.JSON;
+import com.seamew.lottery.common.Constants;
+import com.seamew.lottery.common.Result;
+import com.seamew.lottery.domain.activity.model.vo.ActivityVO;
+import com.seamew.lottery.domain.activity.service.deploy.IActivityDeploy;
+import com.seamew.lottery.domain.activity.service.stateflow.IStateHandler;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @Author: seamew
+ * @Title: LotteryXxlJob
+ * @CreateTime: 2023年03月01日 14:02:00
+ * @Description: 抽奖业务,任务配置
+ * @Version: 1.0
+ */
+@Component
+@Slf4j
+public class LotteryXxlJob {
+    @Resource
+    private IActivityDeploy activityDeploy;
+
+    @Resource
+    private IStateHandler stateHandler;
+
+    @XxlJob("lotteryActivityStateJobHandler")
+    public void lotteryActivityStateJobHandler() throws Exception {
+        log.info("扫描活动状态 Begin");
+
+        List<ActivityVO> activityVOList = activityDeploy.scanToDoActivityList(0L);
+        if (activityVOList.isEmpty()) {
+            log.info("扫描活动状态 End 暂无符合需要扫描的活动列表");
+            return;
+        }
+
+        while (!activityVOList.isEmpty()) {
+            for (ActivityVO activityVO : activityVOList) {
+                Integer state = activityVO.getState();
+                switch (state) {
+                    // 活动状态为审核通过,在临近活动开启时间前,审核活动为活动中。在使用活动的时候,需要依照活动状态核时间两个字段进行判断和使用。
+                    case 4:
+                        Result state4Result = stateHandler.doing(activityVO.getActivityId(), Constants.ActivityState.PASS);
+                        log.info("扫描活动状态为活动中 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state4Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());
+                        break;
+                    // 扫描时间已过期的活动,从活动中状态变更为关闭状态【这里也可以细化为2个任务来处理,也可以把时间判断放到数据库中操作】
+                    case 5:
+                        if (activityVO.getEndDateTime().before(new Date())) {
+                            Result state5Result = stateHandler.close(activityVO.getActivityId(), Constants.ActivityState.DOING);
+                            log.info("扫描活动状态为关闭 结果:{} activityId:{} activityName:{} creator:{}", JSON.toJSONString(state5Result), activityVO.getActivityId(), activityVO.getActivityName(), activityVO.getCreator());
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            // 获取集合中最后一条记录,继续扫描后面10条记录
+            ActivityVO activityVO = activityVOList.get(activityVOList.size() - 1);
+            activityVOList = activityDeploy.scanToDoActivityList(activityVO.getId());
+        }
+
+        log.info("扫描活动状态 End");
+
+    }
+}

+ 60 - 0
lottery-application/src/main/java/com/seamew/lottery/application/worker/LotteryXxlJobConfig.java

@@ -0,0 +1,60 @@
+package com.seamew.lottery.application.worker;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @Author: seamew
+ * @Title: LotteryXxlJobConfig
+ * @CreateTime: 2023年03月01日 11:29:00
+ * @Description: XXL-JOB 配置
+ * @Version: 1.0
+ */
+@Configuration
+@Slf4j
+public class LotteryXxlJobConfig {
+    @Value("${xxl.job.admin.addresses}")
+    private String adminAddresses;
+
+    @Value("${xxl.job.accessToken}")
+    private String accessToken;
+
+    @Value("${xxl.job.executor.appname}")
+    private String appname;
+
+    @Value("${xxl.job.executor.address}")
+    private String address;
+
+    @Value("${xxl.job.executor.ip}")
+    private String ip;
+
+    @Value("${xxl.job.executor.port}")
+    private int port;
+
+    @Value("${xxl.job.executor.logpath}")
+    private String logPath;
+
+    @Value("${xxl.job.executor.logretentiondays}")
+    private int logRetentionDays;
+
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor() {
+        log.info(">>>>>>>>>>> xxl-job config init.");
+
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
+        xxlJobSpringExecutor.setAppname(appname);
+        xxlJobSpringExecutor.setAddress(address);
+        xxlJobSpringExecutor.setIp(ip);
+        xxlJobSpringExecutor.setPort(port);
+        xxlJobSpringExecutor.setAccessToken(accessToken);
+        xxlJobSpringExecutor.setLogPath(logPath);
+        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
+
+        return xxlJobSpringExecutor;
+    }
+
+}

+ 10 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/model/vo/ActivityVO.java

@@ -17,6 +17,11 @@ import java.util.Date;
 @NoArgsConstructor
 @AllArgsConstructor
 public class ActivityVO {
+    /**
+     * 自增ID
+     */
+    private Long id;
+
     /**
      * 活动ID
      */
@@ -52,6 +57,11 @@ public class ActivityVO {
      */
     private Integer takeCount;
 
+    /**
+     * 策略ID
+     */
+    private Long strategyId;
+
     /**
      * 活动状态:编辑、提审、撤审、通过、运行、拒绝、关闭、开启
      */

+ 8 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/repository/IActivityRepository.java

@@ -64,4 +64,12 @@ public interface IActivityRepository {
      * @return      扣减结果
      */
     int subtractionActivityStock(Long activityId);
+
+    /**
+     * 扫描待处理的活动列表,状态为:通过、活动中
+     *
+     * @param id ID
+     * @return 待处理的活动集合
+     */
+    List<ActivityVO> scanToDoActivityList(Long id);
 }

+ 14 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/service/deploy/IActivityDeploy.java

@@ -1,6 +1,9 @@
 package com.seamew.lottery.domain.activity.service.deploy;
 
 import com.seamew.lottery.domain.activity.model.req.ActivityConfigReq;
+import com.seamew.lottery.domain.activity.model.vo.ActivityVO;
+
+import java.util.List;
 
 /**
  * @Author: seamew
@@ -23,4 +26,15 @@ public interface IActivityDeploy {
      * @param req 活动配置信息
      */
     void updateActivity(ActivityConfigReq req);
+
+    /**
+     * 扫描待处理的活动列表,状态为:通过、活动中
+     * <p>
+     * 通过 -> 时间符合时 -> 活动中
+     * 活动中 -> 时间到期时 -> 关闭
+     *
+     * @param id ID
+     * @return 待处理的活动集合
+     */
+    List<ActivityVO> scanToDoActivityList(Long id);
 }

+ 5 - 0
lottery-domain/src/main/java/com/seamew/lottery/domain/activity/service/deploy/impl/ActivityDeployImpl.java

@@ -64,4 +64,9 @@ public class ActivityDeployImpl implements IActivityDeploy {
     public void updateActivity(ActivityConfigReq req) {
         // TODO: 非核心功能后续补充
     }
+
+    @Override
+    public List<ActivityVO> scanToDoActivityList(Long id) {
+        return activityRepository.scanToDoActivityList(id);
+    }
 }

+ 10 - 0
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/dao/IActivityDao.java

@@ -4,6 +4,8 @@ import com.seamew.lottery.domain.activity.model.vo.AlterStateVO;
 import com.seamew.lottery.infrastructure.po.Activity;
 import org.apache.ibatis.annotations.Mapper;
 
+import java.util.List;
+
 /**
  * @Author: seamew
  * @Title: IActivityDao
@@ -45,4 +47,12 @@ public interface IActivityDao {
      */
     int subtractionActivityStock(Long activityId);
 
+    /**
+     * 扫描待处理的活动列表,状态为:通过、活动中
+     *
+     * @param id ID
+     * @return 待处理的活动集合
+     */
+    List<Activity> scanToDoActivityList(Long id);
+
 }

+ 17 - 0
lottery-infrastructure/src/main/java/com/seamew/lottery/infrastructure/repository/ActivityRepository.java

@@ -111,4 +111,21 @@ public class ActivityRepository implements IActivityRepository {
         return activityDao.subtractionActivityStock(activityId);
     }
 
+    @Override
+    public List<ActivityVO> scanToDoActivityList(Long id) {
+        List<Activity> activityList = activityDao.scanToDoActivityList(id);
+        List<ActivityVO> activityVOList = new ArrayList<>(activityList.size());
+        for (Activity activity : activityList) {
+            ActivityVO activityVO = new ActivityVO();
+            activityVO.setId(activity.getId());
+            activityVO.setActivityId(activity.getActivityId());
+            activityVO.setActivityName(activity.getActivityName());
+            activityVO.setBeginDateTime(activity.getBeginDateTime());
+            activityVO.setEndDateTime(activity.getEndDateTime());
+            activityVO.setState(activity.getState());
+            activityVOList.add(activityVO);
+        }
+        return activityVOList;
+    }
+
 }

+ 18 - 0
lottery-interfaces/src/main/resources/application.yaml

@@ -41,6 +41,24 @@ spring:
       ack-mode: manual_immediate
       missing-topics-fatal: false
 
+# xxl-job
+# 官网:https://github.com/xuxueli/xxl-job/
+# 地址:http://localhost:7397/xxl-job-admin 【需要先启动 xxl-job】
+# 账号:admin
+# 密码:123456
+xxl:
+  job:
+    admin:
+      addresses: http://127.0.0.1:7397/xxl-job-admin
+    executor:
+      address:
+      appname: lottery-job
+      ip:
+      port: 9998
+      logpath: ./xxl-job/jobhandler
+      logretentiondays: 50
+    accessToken: default_token
+
 mini-db-router:
   jdbc:
     datasource:

+ 10 - 3
lottery-interfaces/src/main/resources/mybatis/mapper/Activity_Mapper.xml

@@ -6,9 +6,8 @@
         INSERT INTO activity
         (activity_id, activity_name, activity_desc, begin_date_time, end_date_time,
          stock_count, stock_surplus_count, take_count, strategy_id, state, creator, create_time, update_time)
-        VALUES
-            (#{activityId}, #{activityName}, #{activityDesc},#{beginDateTime}, #{endDateTime},
-             #{stockCount}, #{stockSurplusCount}, #{takeCount}, #{strategyId}, #{state}, #{creator}, now(), now())
+        VALUES (#{activityId}, #{activityName}, #{activityDesc}, #{beginDateTime}, #{endDateTime},
+                #{stockCount}, #{stockSurplusCount}, #{takeCount}, #{strategyId}, #{state}, #{creator}, now(), now())
     </insert>
 
     <select id="queryActivityById" parameterType="java.lang.Long" resultType="com.seamew.lottery.infrastructure.po.Activity">
@@ -41,5 +40,13 @@
         WHERE activity_id = #{activityId} AND stock_surplus_count > 0
     </update>
 
+    <select id="scanToDoActivityList" parameterType="java.lang.Long" resultType="com.seamew.lottery.infrastructure.po.Activity">
+        SELECT activity_id, activity_name, begin_date_time, end_date_time, state, creator
+        FROM activity
+        WHERE id >= #{id}
+          AND state in (4, 5)
+        ORDER BY ID ASC LIMIT 10
+    </select>
+
 
 </mapper>

+ 5 - 0
pom.xml

@@ -92,6 +92,11 @@
                 <artifactId>mapstruct-processor</artifactId>
                 <version>1.4.2.Final</version>
             </dependency>
+            <dependency>
+                <groupId>com.xuxueli</groupId>
+                <artifactId>xxl-job-core</artifactId>
+                <version>2.3.1</version>
+            </dependency>
 
             <!-- 分库分表插件 -->
             <dependency>

+ 3 - 0
xxl-job/jobhandler/2023-03-01/5.log

@@ -0,0 +1,3 @@
+2023-03-01 14:30:51 [com.xxl.job.core.thread.JobThread#run]-[133]-[xxl-job, JobThread-2-1677652251468] <br>----------- xxl-job job execute start -----------<br>----------- Param:
+2023-03-01 14:32:58 [com.xxl.job.core.thread.JobThread#run]-[179]-[xxl-job, JobThread-2-1677652251468] <br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode=200, handleMsg = null
+2023-03-01 14:32:58 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] <br>----------- xxl-job job callback finish.

+ 10 - 0
xxl-job/jobhandler/2023-03-01/6.log

@@ -0,0 +1,10 @@
+2023-03-01 14:33:45 [com.xxl.job.core.thread.JobThread#run]-[133]-[xxl-job, JobThread-2-1677652251468] <br>----------- xxl-job job execute start -----------<br>----------- Param:
+2023-03-01 14:34:18 [com.xxl.job.core.thread.JobThread#run]-[179]-[xxl-job, JobThread-2-1677652251468] <br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode=200, handleMsg = null
+2023-03-01 14:34:18 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] <br>----------- xxl-job job callback finish.
+2023-03-01 14:35:05 [com.xxl.job.core.thread.JobThread#run]-[194]-[xxl-job, JobThread-2-1677652251468] <br>----------- JobThread toStop, stopReason:web container destroy and kill the job.
+2023-03-01 14:35:05 [com.xxl.job.core.thread.JobThread#run]-[204]-[xxl-job, JobThread-2-1677652251468] <br>----------- JobThread Exception:java.lang.InterruptedException
+	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
+	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
+	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
+	at com.xxl.job.core.thread.JobThread.run(JobThread.java:114)
+<br>----------- xxl-job job execute end(error) -----------

+ 10 - 0
xxl-job/jobhandler/2023-03-01/7.log

@@ -0,0 +1,10 @@
+2023-03-01 14:35:38 [com.xxl.job.core.thread.JobThread#run]-[133]-[xxl-job, JobThread-2-1677652538490] <br>----------- xxl-job job execute start -----------<br>----------- Param:
+2023-03-01 14:35:39 [com.xxl.job.core.thread.JobThread#run]-[179]-[xxl-job, JobThread-2-1677652538490] <br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode=200, handleMsg = null
+2023-03-01 14:35:39 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] <br>----------- xxl-job job callback finish.
+2023-03-01 14:36:27 [com.xxl.job.core.thread.JobThread#run]-[194]-[xxl-job, JobThread-2-1677652538490] <br>----------- JobThread toStop, stopReason:web container destroy and kill the job.
+2023-03-01 14:36:27 [com.xxl.job.core.thread.JobThread#run]-[204]-[xxl-job, JobThread-2-1677652538490] <br>----------- JobThread Exception:java.lang.InterruptedException
+	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
+	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
+	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
+	at com.xxl.job.core.thread.JobThread.run(JobThread.java:114)
+<br>----------- xxl-job job execute end(error) -----------