Browse Source

[bug] 修改对任务平台结束指令的重置问题

zishang 3 months ago
parent
commit
52ea60efcb

+ 0 - 1
src/main/java/io/renren/common/utils/DockerClientUtils.java

@@ -14,7 +14,6 @@ import io.renren.common.exception.RRException;
 import io.renren.modules.sys.entity.algs.AlgTrain;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.springframework.mock.web.MockMultipartFile;
 import org.springframework.web.multipart.MultipartFile;
 
 import java.io.*;

+ 11 - 7
src/main/java/io/renren/dds/DataReaderListenerImpl.java

@@ -27,6 +27,7 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
     public int isRecall = 0;
     public int isTeam = 0;
     public int isRequest = 0;
+    public int isRunning = 0;
     public int versionID = 1;
     public String text = "";
     private static final int N_EXPECTED = 40;
@@ -105,13 +106,15 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
                 if(mh.value.subject.equals("110")){
                     // 使用算法文件消息
                     isRequest = 1;
-                    String message = mh.value.text;
-                    String[] messageList = message.split("\n");
-                    for(int i=0;i<messageList.length;i++){
-                        String[] s = messageList[i].split(" ");
-                        String platformID = s[0];
-                        if (s[1].equals("无")){continue;}
-                        algMap.put(platformID,s[1]);
+                    if (isRunning == 0) {
+                        String message = mh.value.text;
+                        String[] messageList = message.split("\n");
+                        for(int i=0;i<messageList.length;i++){
+                            String[] s = messageList[i].split(" ");
+                            String platformID = s[0];
+                            if (s[1].equals("无")){continue;}
+                            algMap.put(platformID,s[1]);
+                        }
                     }
                 }
 
@@ -182,6 +185,7 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
         this.isRecall = 0;
         this.isTeam = 0;
         this.isRequest = 0;
+        this.isRunning = 0;
         this.algMap.clear();
     }
 

+ 12 - 12
src/main/java/io/renren/dds/TestPublisher.java

@@ -39,18 +39,18 @@ public class TestPublisher {
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
     private volatile boolean running = true;
     public TestPublisher() throws Exception {
-//        String[] s = new String[6];
-        String[] s = new String[4];
-//        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
-//        s[1] = "0";                      // Bit值
-//        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
-        s[0] = "-DCPSBit";
-        s[1] = "0";
-        s[2] = "-DCPSConfigFile";
-        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
-//        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
-//        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
-//        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
+        String[] s = new String[6];
+//        String[] s = new String[4];
+        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
+        s[1] = "0";                      // Bit值
+        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
+//        s[0] = "-DCPSBit";
+//        s[1] = "0";
+//        s[2] = "-DCPSConfigFile";
+//        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
+        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
+        s[5] = "corbaloc:iiop:192.168.5.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
         executor.submit(() -> initialize(s)); // 初始化放在后台线程
     }
     public static boolean checkReliable(String[] args) {

+ 12 - 12
src/main/java/io/renren/dds/TestSubscriber.java

@@ -42,18 +42,18 @@ public class TestSubscriber {
         return false;
     }
     public TestSubscriber() throws Exception {
-//        String[] s = new String[6];
-        String[] s = new String[4];
-//        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
-//        s[1] = "0";                      // Bit值
-//        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
-        s[0] = "-DCPSBit";
-        s[1] = "0";
-        s[2] = "-DCPSConfigFile";
-        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
-//        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
-//        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
-//        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
+        String[] s = new String[6];
+//        String[] s = new String[4];
+        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
+        s[1] = "0";                      // Bit值
+        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
+//        s[0] = "-DCPSBit";
+//        s[1] = "0";
+//        s[2] = "-DCPSConfigFile";
+//        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
+        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
+        s[5] = "corbaloc:iiop:192.168.5.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
         executor.submit(() -> initialize(s)); // 初始化放在后台线程
     }
     public String getText(){

+ 15 - 9
src/main/java/io/renren/modules/sys/controller/algs/algTrainController.java

@@ -53,6 +53,7 @@ import static io.renren.common.utils.ShiroUtils.getUserId;
  * @Date: 2021/6/11 16:28
  */
 @Slf4j
+@Transactional
 @RestController
 @RequestMapping("/algstrain")
 public class algTrainController {
@@ -348,7 +349,10 @@ public class algTrainController {
                 "这是心跳信息"
         );
 
-        if(subscriber.listener != null && subscriber.listener.isRequest == 1 && !subscriber.listener.algMap.isEmpty()) {
+        if(subscriber.listener != null
+                && subscriber.listener.isRequest == 1
+                && !subscriber.listener.algMap.isEmpty()) {
+            subscriber.listener.isRunning = 1; //设置正在运行,其余的消息丢弃
             for(Map.Entry<String, String> map : subscriber.listener.algMap.entrySet()){
                 String key = map.getKey();
                 String missName = map.getValue();
@@ -357,17 +361,19 @@ public class algTrainController {
                     continue;
                 }
                 Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
-                executorService.submit(() -> {
-                    try {
-                        changeMissStatus(String.valueOf(algorithmTrainingId));
-                        startTraining(String.valueOf(algorithmTrainingId));
-                    } catch (Exception e) {
-                        throw new RRException(e.getMessage());
-                    }
-                });
+                changeMissStatus(String.valueOf(algorithmTrainingId));
+                startTraining(String.valueOf(algorithmTrainingId));
+//                executorService.submit(() -> {
+//                    try {
+//
+//                    } catch (Exception e) {
+//                        throw new RRException(e.getMessage());
+//                    }
+//                });
 
             }
             subscriber.listener.algMap.clear();
+            subscriber.listener.isRunning = 0;
         }
     }