Browse Source

[bug] 修改dds消息发布订阅

zishang 3 months ago
parent
commit
ebc810f67e

+ 89 - 10
src/main/java/io/renren/common/utils/DockerClientUtils.java

@@ -1,5 +1,7 @@
 package io.renren.common.utils;
 
+import cn.hutool.core.date.DateUtil;
+import com.jcraft.jsch.ChannelSftp;
 import com.spotify.docker.client.DefaultDockerClient;
 import com.spotify.docker.client.DockerCertificates;
 import com.spotify.docker.client.DockerClient;
@@ -7,18 +9,34 @@ import com.spotify.docker.client.LogStream;
 import com.spotify.docker.client.exceptions.DockerCertificateException;
 import com.spotify.docker.client.exceptions.DockerException;
 import com.spotify.docker.client.messages.*;
+import io.minio.errors.*;
+import io.renren.common.exception.RRException;
+import io.renren.modules.sys.entity.algs.AlgTrain;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.mock.web.MockMultipartFile;
+import org.springframework.web.multipart.MultipartFile;
 
+import java.io.*;
 import java.net.URI;
 import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+@Slf4j
 public class DockerClientUtils {
     private static DockerClient docker;
     private static String dockerUrl;
     private static String docker_ca;
     private static String basePath;
+    static ExecutorService executorService = Executors.newSingleThreadExecutor(); // 创建单线程执行器
+
 
     public DockerClientUtils(String dockerUrl, String docker_ca,String basePath) {
         DockerClientUtils.dockerUrl=dockerUrl;
@@ -41,6 +59,10 @@ public class DockerClientUtils {
         }
     }
 
+    public static void stopContainer(String containerId) throws DockerException, InterruptedException {
+        docker.stopContainer(containerId, 0);
+    }
+
     /**
      * Description:获取所有镜像列表
      * @return
@@ -116,6 +138,11 @@ public class DockerClientUtils {
         return container.id();
     }
 
+    public static void startContainer(String containerId) throws DockerException, InterruptedException {
+
+        docker.startContainer(containerId);
+    }
+
     /**
      * Description 在docker容器中安装whl格式的依赖包
      * @param containerId
@@ -142,7 +169,7 @@ public class DockerClientUtils {
      * @throws DockerException
      * @throws InterruptedException
      */
-    public static String execPython(String containerId,String filePath,String fileName) throws DockerException, InterruptedException {
+    public static String execPython(String containerId,String filePath,String fileName, String algorithmTrainingId) throws DockerException, InterruptedException, IOException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException, RegionConflictException {
 
         // 获取当前时间戳
         long timestamp = System.currentTimeMillis();
@@ -161,19 +188,71 @@ public class DockerClientUtils {
                 DockerClient.ExecCreateParam.attachStdin());
 
         //读取所执行命令的输出
-        LogStream output=docker.execStart(execCreation.id());
-
+        LogStream output = docker.execStart(execCreation.id());
 
         //执行tensorboard命令
-        String execTensorboard=docker.execCreate(containerId,new String[]{"tensorboard","--logdir=/opt/testTensorboard"}).id();
-        try (LogStream stream2=docker.execStart(execTensorboard)){
-            stream2.readFully();
+//        String execTensorboard=docker.execCreate(containerId,new String[]{"tensorboard","--logdir=/opt/testTensorboard"}).id();
+//        try (LogStream stream2=docker.execStart(execTensorboard)){
+//            stream2.readFully();
+//        }
+
+        // 处理日志文件
+        // 检查 MinIO 中的桶是否存在,如果不存在则创建
+        if (!MinIoUtils.isBucketExists("algorithm-train-task")) {
+            MinIoUtils.createBucket("algorithm-train-task");
         }
+        final String[] minioPath = {""};
 
-        //打印输出
-        String execPythonOutput=output.readFully();
-        System.out.println(execPythonOutput);
-        return returnFileName;
+        Future<String> future = executorService.submit(() -> {
+            try {
+                // 复制文件到容器
+                DockerClientUtils.copyFile(containerId, "/" + returnFileName, "/opt/algTrain" + algorithmTrainingId);
+
+                // 显示文件列表
+                Vector<ChannelSftp.LsEntry> vector = FTPUtils.showFiles("algTrain" + algorithmTrainingId);
+                for (ChannelSftp.LsEntry entry : vector) {
+                    System.out.println(entry);
+                }
+
+                // 获取最后一个文件
+                ChannelSftp.LsEntry entry = vector.lastElement();
+                String logFile = entry.getFilename();
+                Long fileSize = entry.getAttrs().getSize();
+
+                // 设置 MinIO 路径
+                minioPath[0] = "algorithm" + algorithmTrainingId + "/" + logFile;
+
+                // 下载文件
+                InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + logFile);
+
+                // 上传文件到 MinIO
+                MinIoUtils.uploadFileByInputStream(inputStream, fileSize, "algorithm-train-task", minioPath[0]);
+
+                // 返回 MinIO 路径
+                return minioPath[0];
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        // 获取结果
+        try {
+            String resultPath = future.get(); // 阻塞直到任务完成
+            System.out.println("Uploaded file path: " + resultPath);
+        } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+        }
+
+        return minioPath[0];
+    }
+
+    public static boolean isContainerRunning(String containerId) throws InterruptedException {
+        try {
+            ContainerInfo container = docker.inspectContainer(containerId);
+            return container.state().running();
+        } catch (DockerException e) {
+            return false; // 如果发生异常,认为容器没有在运行
+        }
     }
 
     public static String getOriginFileName(String fileName) {

+ 3 - 1
src/main/java/io/renren/common/utils/FTPUtils.java

@@ -71,6 +71,8 @@ public class FTPUtils {
 
             sftp = (ChannelSftp) channel;
             ftp.enterLocalPassiveMode();
+            ftp.setConnectTimeout(30000); // 30秒连接超时
+            ftp.setDataTimeout(30000); // 30秒数据超时
         } catch (JSchException e) {
             e.printStackTrace();
         }
@@ -145,7 +147,7 @@ public class FTPUtils {
             return inputStream;
         } catch (Exception e) {
 //            e.printStackTrace();
-            throw new RRException("文件不存在");
+            throw new RRException("结果不存在");
         }
 //        return null;
     }

+ 0 - 146
src/main/java/io/renren/dds/DataReaderListenerImpl.java

@@ -9,22 +9,9 @@ import DDS.*;
 import Messenger.*;
 import OpenDDS.DCPS.*;
 import OpenDDS.DCPS.transport.*;
-import com.spotify.docker.client.exceptions.DockerException;
-import io.minio.errors.*;
-import io.renren.modules.sys.controller.algs.algTrainController;
-import io.renren.modules.sys.entity.algs.AlgTrain;
-import io.renren.modules.sys.entity.algs.Algorithm;
-import io.renren.modules.sys.service.AlgTrainService;
-import io.renren.modules.sys.service.AlgsService;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-//import com.example.backend.model.MessageStatus;
-//import com.example.backend.model.Position;
 
-import javax.annotation.Resource;
-import java.io.IOException;
 import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -44,17 +31,8 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
     public String text = "";
     private static final int N_EXPECTED = 40;
     private ArrayList<Boolean> counts = new ArrayList<Boolean>(N_EXPECTED);
-//    public Map<String, Position> positionMap = new ConcurrentHashMap<>();
-//    public Map<String, Position> radarPositionMap = new ConcurrentHashMap<>();
-//    public Map<String, List<String>> coopTeamMap = new ConcurrentHashMap<>();
     public final Map<String, List<io.renren.modules.sys.entity.Message>> messageList = new ConcurrentHashMap<>();
     public Map<String, String> algMap = new ConcurrentHashMap<>();
-//    public Map<String, MessageStatus> messageStatusMap = new ConcurrentHashMap<>();
-
-    @Resource
-    AlgTrainService algTrainService;
-    @Autowired
-    private io.renren.modules.sys.controller.algs.algTrainController algTrainController;
 
     private void initialize_counts() {
         if (counts.size() > 0) {
@@ -126,33 +104,6 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
                     }
                     //收到初始化消息 完成初始化
                     isInit = 1;
-                    String message = mh.value.text;
-                    String[] messageList = message.split("\n");
-                    String[] countList = messageList[0].split(" ");
-                    int count = Integer.parseInt(countList[0]);
-                    int radarCount = Integer.parseInt(countList[1]);
-                    for(int i=1;i<=count;i++){
-                        String[] s = messageList[i].split(" ");
-//                        Position p = new Position();
-//                        p.coopID = s[1];
-//                        p.jaming = s[2];
-//                        p.isReal = s[3];
-//                        p.x = Double.parseDouble(s[4]);
-//                        p.y = Double.parseDouble(s[5]);
-//                        p.z = Double.parseDouble(s[6]);
-//                        positionMap.put(s[0],p);
-                    }
-                    for(int i=count+1;i<=radarCount+count;i++){
-                        String[] s = messageList[i].split(" ");
-//                        Position p = new Position();
-//                        p.coopID = "未知";
-//                        p.jaming = "未知";
-//                        p.isReal = "未知";
-//                        p.x = Double.parseDouble(s[4]);
-//                        p.y = Double.parseDouble(s[5]);
-//                        p.z = Double.parseDouble(s[6]);
-//                        radarPositionMap.put(s[0],p);
-                    }
                     //回复初始化应答
                     isRecall = 1;
                 }
@@ -162,119 +113,26 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
                         System.out.println("未完成初始化,丢弃");
                         return;
                     }
-                    String message = mh.value.text;
-                    String[] messageList = message.split("\n");
-                    for(int i=1;i<messageList.length;i++){
-                        String[] s = messageList[i].split(" ");
-//                        Position p = new Position();
-//                        p.coopID = positionMap.get(s[0]).coopID;
-//                        p.jaming = positionMap.get(s[0]).jaming;
-//                        p.isReal = positionMap.get(s[0]).isReal;
-//                        p.x = Double.parseDouble(s[1]);
-//                        p.y = Double.parseDouble(s[2]);
-//                        p.z = Double.parseDouble(s[3]);
-//                        positionMap.put(s[0],p);
-                    }
                 }
                 if(mh.value.subject.equals("2")){
                     if(mh.value.text.equals("4")){
                         resetAll();
                     }
                 }
-                if(mh.value.subject.equals("5")){
-                    //收到雷达脉冲消息
-                    String[] s = mh.value.from.split(" ");
-                    String recivier = s[2];
-                    String sender = s[0];
-//                    List<com.example.backend.model.Message> list = messageList.get(recivier)==null ? new ArrayList<>() : messageList.get(recivier);
-//                    list.add(new com.example.backend.model.Message(formattedDateTime,sender,"雷达脉冲",mh.value.text,"0"));
-//                    messageList.put(recivier,list);
-
-                }
-                if(mh.value.subject.equals("6")){
-                    //收到电子干扰消息
-                    String[] s = mh.value.from.split(" ");
-                    String recivier = s[2];
-                    String sender = s[0];
-//                    List<com.example.backend.model.Message> list = messageList.get(recivier)==null ? new ArrayList<>() : messageList.get(recivier);
-//                    list.add(new com.example.backend.model.Message(formattedDateTime,sender,"电子干扰",mh.value.text,"0"));
-//                    messageList.put(recivier,list);
-
-                }
-                if(mh.value.subject.equals("7")){
-                    //收到综合情报消息
-                    String[] s = mh.value.from.split(" ");
-                    String recivier = s[2];
-                    String sender = s[0];
-//                    List<com.example.backend.model.Message> list = messageList.get(recivier)==null ? new ArrayList<>() : messageList.get(recivier);
-//                    list.add(new com.example.backend.model.Message(formattedDateTime,sender,"综合情报",mh.value.text,"0"));
-//                    messageList.put(recivier,list);
-
-                }
                 if(mh.value.subject.equals("4")){
                     //收到网络分组信息
-                    String message = mh.value.text;
-                    String[] messageList = message.split("\n");
-                    String[] countList = messageList[0].split(" ");
-                    if(Integer.parseInt(countList[2])<=versionID)return;//旧版本或重复
-                    for(int i=1;i<=Integer.parseInt(countList[0]);i++){
-                        String[] s = messageList[i].split(" ");
-                        String coopID = s[0];
-                        List<String> list = new ArrayList<>();
-                        for(int j=2;j<s.length;j++){
-                            list.add(s[j]);
-                        }
-//                        coopTeamMap.put(coopID,list);
-                    }
                     isTeam = 1;
                 }
-                if(mh.value.subject.equals("9")){
-                    //收到分组状态参数控制
-                    String message = mh.value.text;
-                    String[] s = message.split(" ");
-//                    MessageStatus m = new MessageStatus();
-//                    m.startUp = s[1];
-//                    m.quite = s[2];
-//                    m.mode = s[3];
-//                    messageStatusMap.put(s[0],m);
-                }
-                if(mh.value.subject.equals("10")){
-                    //收到分组状态参数初始化
-                    String message = mh.value.text;
-                    String[] messageList = message.split("\n");
-                    String[] countList = messageList[0].split(" ");
-                    for(int i=1;i<=Integer.parseInt(countList[0]);i++){
-                        String[] s = messageList[i].split(" ");
-                        String platformID = s[0];
-//                        MessageStatus m = new MessageStatus();
-//                        m.startUp = s[1];
-//                        m.quite = s[2];
-//                        m.mode = s[3];
-//                        messageStatusMap.put(platformID,m);
-                    }
-                }
                 if(mh.value.subject.equals("110")){
                     // 使用算法文件消息
                     isRequest = 1;
                     String message = mh.value.text;
                     String[] messageList = message.split("\n");
-                    String[] countList = messageList[0].split(" ");
                     for(int i=0;i<messageList.length;i++){
                         String[] s = messageList[i].split(" ");
                         String platformID = s[0];
                         if (s[1].equals("无")){continue;}
-//                        AlgTrain algTrain1 = new AlgTrain();
-//                        algTrain1.setMissName(s[1]);
-//                        AlgTrain algTrain = algTrainService.selectByMissName(s[1]);
                         algMap.put(platformID,s[1]);
-//                        if (algTrain != null){
-//                            try {
-//                                algTrainController.startTraining(String.valueOf(algTrain.getAlgorithmTrainingId()));
-//                            } catch (Exception e) {
-//                                throw new RuntimeException(e);
-//                            }
-//
-//                        }
                     }
                 }
 
@@ -340,11 +198,7 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
     }
 
     public void resetAll(){
-//        this.positionMap.clear();
-//        this.radarPositionMap.clear();
-//        this.coopTeamMap.clear();
         this.messageList.clear();
-//        this.messageStatusMap.clear();
         this.isInit = 0;
         this.isRecall = 0;
         this.isTeam = 0;

+ 12 - 7
src/main/java/io/renren/dds/TestPublisher.java

@@ -39,13 +39,18 @@ public class TestPublisher {
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
     private volatile boolean running = true;
     public TestPublisher() throws Exception {
-        String[] s = new String[6];
-        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
-        s[1] = "0";                      // Bit值
-        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
-        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
-        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
-        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
+//        String[] s = new String[6];
+        String[] s = new String[4];
+//        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
+//        s[1] = "0";                      // Bit值
+//        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
+        s[0] = "-DCPSBit";
+        s[1] = "0";
+        s[2] = "-DCPSConfigFile";
+        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+//        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
+//        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
+//        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
         executor.submit(() -> initialize(s)); // 初始化放在后台线程
     }
     public static boolean checkReliable(String[] args) {

+ 12 - 7
src/main/java/io/renren/dds/TestSubscriber.java

@@ -42,13 +42,18 @@ public class TestSubscriber {
         return false;
     }
     public TestSubscriber() throws Exception {
-        String[] s = new String[6];
-        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
-        s[1] = "0";                      // Bit值
-        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
-        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
-        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
-        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
+//        String[] s = new String[6];
+        String[] s = new String[4];
+//        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
+//        s[1] = "0";                      // Bit值
+//        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
+        s[0] = "-DCPSBit";
+        s[1] = "0";
+        s[2] = "-DCPSConfigFile";
+        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+//        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
+//        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
+//        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
         executor.submit(() -> initialize(s)); // 初始化放在后台线程
     }
     public String getText(){

+ 116 - 44
src/main/java/io/renren/modules/sys/controller/algs/algTrainController.java

@@ -7,6 +7,7 @@ import com.spotify.docker.client.exceptions.DockerException;
 import com.spotify.docker.client.messages.Image;
 import io.minio.errors.*;
 import io.renren.common.annotation.SysLog;
+import io.renren.common.exception.RRException;
 import io.renren.common.utils.*;
 import io.renren.common.validator.ValidatorUtils;
 import io.renren.common.validator.group.AddGroup;
@@ -21,6 +22,7 @@ import io.renren.modules.sys.service.impl.AlgTrainServiceImpl;
 import io.renren.modules.sys.service.impl.AlgsModelsServiceImpl;
 import io.renren.modules.sys.service.impl.AlgsServiceImpl;
 import io.swagger.models.auth.In;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -39,6 +41,9 @@ import java.security.NoSuchAlgorithmException;
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static io.renren.common.utils.ShiroUtils.getUserId;
@@ -47,6 +52,7 @@ import static io.renren.common.utils.ShiroUtils.getUserId;
  * @Author: Ivan Q
  * @Date: 2021/6/11 16:28
  */
+@Slf4j
 @RestController
 @RequestMapping("/algstrain")
 public class algTrainController {
@@ -77,6 +83,8 @@ public class algTrainController {
 
     public final TestSubscriber subscriber = new TestSubscriber();
     public final TestPublisher publisher = new TestPublisher();
+    ExecutorService executorService = Executors.newSingleThreadExecutor(); // 创建单线程执行器
+
 
     private static String returnFileName;
 
@@ -192,7 +200,8 @@ public class algTrainController {
         //以下部分是创建容器部分
 
         //首先获取内存大小限制,并转换为以字节为单位
-        Long memoryMB=Long.parseLong(map.get("memory"));
+//        Long memoryMB=Long.parseLong(map.get("memory"));
+        Long memoryMB = 512L;
         Long memoryByte=memoryMB*1024*1024;
 
         //选择一个未使用的端口进行映射,并标记已使用
@@ -208,7 +217,7 @@ public class algTrainController {
         }*/
         //获取该算法在被创建时所选的算法框架
         String baseImageName=baseImageService.getById(alg.getFrameId()).getBaseImageName();
-        String containerId=DockerClientUtils.createContainer("algTrain"+algTrain.getAlgorithmTrainingId(),String.valueOf(portInt),memoryByte,baseImageName,map.get("setCpus"));
+        String containerId=DockerClientUtils.createContainer("algTrain"+algTrain.getAlgorithmTrainingId(),String.valueOf(portInt),memoryByte,baseImageName,"1");
 
         //保存该训练任务所在容器id
         algTrain.setContainerId(containerId);
@@ -246,29 +255,39 @@ public class algTrainController {
      * @return
      */
     @GetMapping("/startTraining")
-    public R startTraining(String algorithmTrainingId) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
+    public  R startTraining(String algorithmTrainingId) throws DockerException, InterruptedException, RegionConflictException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
         AlgTrain algTrain=algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
-        returnFileName = DockerClientUtils.execPython(algTrain.getContainerId(),"algTrain"+algTrain.getAlgorithmTrainingId(),algTrain.getRunfileName());
-
-        //完成训练后,将任务状态改为已结束,并保存结束时间
-        algTrain.setMissStatus((byte) 3);
-        //将对应版本训练状态改为已训练
-        algTrain.setHasRun(1);
+        try {
+//            if(!DockerClientUtils.isContainerRunning(algTrain.getContainerId())){
+//                DockerClientUtils.startContainer(algTrain.getContainerId());
+//                Thread.sleep(7000);
+//            }
+            String minioPath = DockerClientUtils.execPython(algTrain.getContainerId(), "algTrain" + algTrain.getAlgorithmTrainingId(), algTrain.getRunfileName(), algorithmTrainingId);
+
+            //完成训练后,将任务状态改为已结束,并保存结束时间
+            algTrain.setMissStatus((byte) 3);
+            //将对应版本训练状态改为已训练
+            algTrain.setHasRun(1);
         /*Version version=versionService.getById(algTrain.getVersionId());
         version.setVersionStatus((byte) 1);
         versionService.update(version);*/
 
-        Date date = new Date();
-        //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        algTrain.setMissStopTime(date);
-        algTrainService.update(algTrain);
-
-        //保存日志到数据库
-        String algorithmTrainingLogContent= (String) getOutput(algorithmTrainingId).get("output");
-        AlgTrainLog algTrainLog=new AlgTrainLog();
-        algTrainLog.setAlgorithmTrainingId(Long.parseLong(algorithmTrainingId));
-        algTrainLog.setAlgorithmTrainingLogContent(algorithmTrainingLogContent);
-        algTrainLogService.save(algTrainLog);
+            Date date = new Date();
+            //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            algTrain.setMissStopTime(date);
+            algTrainService.update(algTrain);
+
+            //保存日志到数据库
+            String algorithmTrainingLogContent = (String) getOutput(algorithmTrainingId, minioPath).get("output");
+            AlgTrainLog algTrainLog = new AlgTrainLog();
+            algTrainLog.setAlgorithmTrainingId(Long.parseLong(algorithmTrainingId));
+            algTrainLog.setAlgorithmTrainingLogContent(algorithmTrainingLogContent);
+            algTrainLog.setAlgorithmTrainingLogMinioPath(minioPath);
+            algTrainLogService.save(algTrainLog);
+            //运行结束之后停掉docker
+        } finally {
+//            DockerClientUtils.stopContainer(algTrain.getContainerId());
+        }
 
         // 产生数据
 //        publisher.publishMessage(
@@ -296,41 +315,74 @@ public class algTrainController {
 
     @Async
     @Scheduled(fixedRate = 6000)//每6秒执行一次,获取消息
-    public void checkAlgRequest() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
+    public void checkAlgRequest() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException, RegionConflictException {
         if (subscriber.listener != null && subscriber.listener.isRequest == 0) {
             publisher.publishMessage(
                     "4",
                     "111",
                     "算法文件使用请求"
             );
+
         }
+        // 创建 CountDownLatch,初始化为1
+        CountDownLatch latch = new CountDownLatch(1);
+        // 启动一个线程检查 publisher 初始化状态
+        new Thread(() -> {
+            while (!publisher.isInitialized) {
+                try {
+                    // 等待一段时间后再次检查
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt(); // 恢复中断状态
+                }
+            }
+            latch.countDown(); // 初始化成功,释放锁
+        }).start();
+        // 等待 publisher 初始化完成
+        System.out.println("Waiting for publisher to initialize...");
+        latch.await();
         publisher.publishMessage(
                 "4",
                 "8",
                 "这是心跳信息"
         );
 
-        if(subscriber.listener != null && subscriber.listener.isRequest == 1) {
+        if(subscriber.listener != null && subscriber.listener.isRequest == 1 && !subscriber.listener.algMap.isEmpty()) {
             for(Map.Entry<String, String> map : subscriber.listener.algMap.entrySet()){
                 String key = map.getKey();
                 String missName = map.getValue();
                 AlgTrain algTrain = algTrainService.selectByMissName(missName);
+                if (algTrain == null) {
+                    continue;
+                }
                 Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
-                startTraining(String.valueOf(algorithmTrainingId));
+                executorService.submit(() -> {
+                    try {
+                        startTraining(String.valueOf(algorithmTrainingId));
+                    } catch (Exception e) {
+                        throw new RRException(e.getMessage());
+                    }
+                });
+
             }
+            subscriber.listener.algMap.clear();
         }
     }
 
-    @PostConstruct
-    public void init() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
-        // 初始化时运行
-        if (subscriber.listener.isRequest == 1) {
-            algRun(subscriber.listener.algMap);
-        }
-        subscriber.listener.isRequest = 0;
-    }
+//    @PostConstruct
+//    public void init() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException, RegionConflictException {
+//        // 初始化时运行
+//        int flag = 0; //标志位,判断是否初始化成功
+//        if (subscriber.listener != null && subscriber.listener.isRequest == 1) {
+////            algRun(subscriber.listener.algMap);
+//            flag = 1;
+//        }
+//        if (flag == 1) {
+//            subscriber.listener.isRequest = 0;
+//        }
+//    }
 
-    private void algRun(Map<String, String> algMap) throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
+    private void algRun(Map<String, String> algMap) throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException, RegionConflictException {
         for(Map.Entry<String, String> map : algMap.entrySet()){
             String key = map.getKey();
             String missName = map.getValue();
@@ -414,19 +466,39 @@ public class algTrainController {
      * @return
      */
     @GetMapping("/getOutput")
-    public R getOutput(String algorithmTrainingId) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
-        AlgTrain algTrain = algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
-        DockerClientUtils.copyFile(algTrain.getContainerId(),"/" + returnFileName,"/opt/algTrain" + algorithmTrainingId);
-        InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + returnFileName);
-        if (inputStream == null) {
-            return R.error("文件不存在");
+    public R getOutput(String algorithmTrainingId, String minioPath) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
+//        AlgTrain algTrain = algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
+//        DockerClientUtils.copyFile(algTrain.getContainerId(),"/" + returnFileName,"/opt/algTrain" + algorithmTrainingId);
+//        InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + returnFileName);
+//        if (inputStream == null) {
+//            return R.error("结果不存在");
+//        }
+//        String result = new BufferedReader(new InputStreamReader(inputStream))
+//                .lines().collect(Collectors.joining("\n"));
+//
+//        inputStream.close();
+////        System.out.println(result);
+//
+//        return R.ok().put("output",result);
+        String objectName = "";
+        if (minioPath == null) {
+            AlgTrainLog algTrainLog = algTrainLogService.selectByAlgTrainId(Long.parseLong(algorithmTrainingId));
+            objectName = algTrainLog.getAlgorithmTrainingLogMinioPath();
+        } else {
+            objectName = minioPath;
+        }
+        log.info("获取文件从minio的objectName {}", objectName);
+        try (InputStream fileInputStream = MinIoUtils.getFileInputStream("algorithm-train-task", objectName)) {
+            // 继续处理文件流
+            String result = new BufferedReader(new InputStreamReader(fileInputStream))
+                    .lines().collect(Collectors.joining("\n"));
+            log.info("result {}", result);
+            return R.ok().put("output", result);
+        } catch (ErrorResponseException e){
+            if (e.getMessage().equals("The specified key does not exist."))
+                return R.error("文件不存在,请确认文件位置或者重新上传");
+            else return R.error(e.getMessage());
         }
-        String result = new BufferedReader(new InputStreamReader(inputStream))
-                .lines().collect(Collectors.joining("\n"));
-        inputStream.close();
-        System.out.println(result);
-
-        return R.ok().put("output",result);
     }
 
     /**

+ 4 - 1
src/main/java/io/renren/modules/sys/entity/algs/AlgTrainLog.java

@@ -1,5 +1,6 @@
 package io.renren.modules.sys.entity.algs;
 
+import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.Data;
@@ -7,10 +8,12 @@ import lombok.Data;
 @Data
 @TableName("algorithm_training_log")
 public class AlgTrainLog {
-    @TableId
+    @TableId(type = IdType.AUTO)
     private Long algorithmTrainingLogId;
 
     private Long algorithmTrainingId;
 
     private String algorithmTrainingLogContent;
+
+    private String algorithmTrainingLogMinioPath;
 }

+ 12 - 3
src/main/resources/mapper/sys/algs/AlgTrainLogMapper.xml

@@ -7,9 +7,10 @@
         <id column="algorithm_training_log_id" jdbcType="BIGINT" property="algorithmTrainingLogId" />
         <result column="algorithm_training_id" jdbcType="BIGINT" property="algorithmTrainingId" />
         <result column="algorithm_training_log_content" jdbcType="LONGVARCHAR" property="algorithmTrainingLogContent" />
+        <result column="algorithm_training_log_minio_path" jdbcType="LONGVARCHAR" property="algorithmTrainingLogMinioPath" />
     </resultMap>
     <sql id="Base_Column_List">
-      algorithm_training_log_id,algorithm_training_id,algorithm_training_log_content
+      algorithm_training_log_id,algorithm_training_id,algorithm_training_log_content,algorithm_training_log_minio_path
     </sql>
     <select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
         select
@@ -22,14 +23,16 @@
         <include refid="Base_Column_List" />
         from algorithm_training_log
         where algorithm_training_id = #{algorithmTrainingId,jdbcType=BIGINT}
+        order by algorithm_training_log_id desc
+        limit 1
     </select>
     <delete id="deleteByPrimaryKey" parameterType="java.lang.Long">
       delete from algorithm_training_log
       where algorithm_training_log_id = #{algorithmTrainingLogId,jdbcType=BIGINT}
     </delete>
     <insert id="insert" parameterType="io.renren.modules.sys.entity.algs.AlgTrainLog">
-        insert into algorithm_training_log(algorithm_training_log_id,algorithm_training_id,algorithm_training_log_content)
-        values (#{algorithmTrainingLogId,jdbcType=BIGINT},#{algorithmTrainingId,jdbcType=BIGINT},#{algorithmTrainingLogContent,jdbcType=LONGVARCHAR})
+        insert into algorithm_training_log(algorithm_training_id,algorithm_training_log_content, algorithm_training_log_minio_path)
+        values (#{algorithmTrainingId,jdbcType=BIGINT},#{algorithmTrainingLogContent,jdbcType=LONGVARCHAR}, #{algorithmTrainingLogMinioPath,jdbcType=LONGVARCHAR})
     </insert>
     <insert id="insertSelective" parameterType="io.renren.modules.sys.entity.algs.AlgTrainLog">
         insert into algorithm_training_log
@@ -43,6 +46,9 @@
             <if test="algorithmTrainingLogContent != null">
                 algorithm_training_log_content,
             </if>
+            <if test="algorithmTrainingLogMinioPath != null">
+                algorithm_training_log_minio_path,
+            </if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="algorithmTrainingLogId != null">
@@ -54,6 +60,9 @@
             <if test="algorithmTrainingLogContent != null">
                 #{algorithmTrainingLogContent,jdbcType=LONGVARCHAR},
             </if>
+            <if test="algorithmTrainingLogMinioPath != null">
+                #{algorithmTrainingLogMinioPath,jdbcType=LONGVARCHAR},
+            </if>
         </trim>
     </insert>
 </mapper>