Browse Source

[bug] 修改dds消息发布订阅

zishang 3 months ago
parent
commit
06145e6fd5

+ 4 - 49
src/main/java/io/renren/common/utils/DockerClientUtils.java

@@ -175,7 +175,7 @@ public class DockerClientUtils {
         long timestamp = System.currentTimeMillis();
         String timestampString = String.valueOf(timestamp);
         String outputFilePath = "/opt/" + filePath + "/algTrain" + timestampString + ".out";
-        String returnFileName = "algTrain" + timestampString + ".out";
+        String logFile = "algTrain" + timestampString + ".out";
 
         //创建要执行的命令
 //        String[] execPython1={"nohup","python","-u","/opt/" + filePath + "/"+getOriginFileName(fileName),">/opt" + filePath + "nohup.out", "2>&1","&"};
@@ -189,6 +189,8 @@ public class DockerClientUtils {
 
         //读取所执行命令的输出
         LogStream output = docker.execStart(execCreation.id());
+        String s = output.readFully();
+        System.out.println(s);
 
         //执行tensorboard命令
 //        String execTensorboard=docker.execCreate(containerId,new String[]{"tensorboard","--logdir=/opt/testTensorboard"}).id();
@@ -196,54 +198,7 @@ public class DockerClientUtils {
 //            stream2.readFully();
 //        }
 
-        // 处理日志文件
-        // 检查 MinIO 中的桶是否存在,如果不存在则创建
-        if (!MinIoUtils.isBucketExists("algorithm-train-task")) {
-            MinIoUtils.createBucket("algorithm-train-task");
-        }
-        final String[] minioPath = {""};
-
-        Future<String> future = executorService.submit(() -> {
-            try {
-                // 复制文件到容器
-                DockerClientUtils.copyFile(containerId, "/" + returnFileName, "/opt/algTrain" + algorithmTrainingId);
-
-                // 显示文件列表
-                Vector<ChannelSftp.LsEntry> vector = FTPUtils.showFiles("algTrain" + algorithmTrainingId);
-                for (ChannelSftp.LsEntry entry : vector) {
-                    System.out.println(entry);
-                }
-
-                // 获取最后一个文件
-                ChannelSftp.LsEntry entry = vector.lastElement();
-                String logFile = entry.getFilename();
-                Long fileSize = entry.getAttrs().getSize();
-
-                // 设置 MinIO 路径
-                minioPath[0] = "algorithm" + algorithmTrainingId + "/" + logFile;
-
-                // 下载文件
-                InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + logFile);
-
-                // 上传文件到 MinIO
-                MinIoUtils.uploadFileByInputStream(inputStream, fileSize, "algorithm-train-task", minioPath[0]);
-
-                // 返回 MinIO 路径
-                return minioPath[0];
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        });
-
-        // 获取结果
-        try {
-            String resultPath = future.get(); // 阻塞直到任务完成
-            System.out.println("Uploaded file path: " + resultPath);
-        } catch (InterruptedException | ExecutionException e) {
-            e.printStackTrace();
-        }
-
-        return minioPath[0];
+        return logFile;
     }
 
     public static boolean isContainerRunning(String containerId) throws InterruptedException {

+ 1 - 21
src/main/java/io/renren/dds/DataReaderListenerImpl.java

@@ -97,32 +97,11 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
 
                 // 格式化时间
                 String formattedDateTime = now.format(formatter);
-                if(mh.value.subject.equals("1")){
-                    if(isInit==1){
-                        System.out.println("已完成初始化,丢弃");
-                        return;
-                    }
-                    //收到初始化消息 完成初始化
-                    isInit = 1;
-                    //回复初始化应答
-                    isRecall = 1;
-                }
-                if(mh.value.subject.equals("3")){
-                    //收到平台位置信息
-                    if(isInit==0){
-                        System.out.println("未完成初始化,丢弃");
-                        return;
-                    }
-                }
                 if(mh.value.subject.equals("2")){
                     if(mh.value.text.equals("4")){
                         resetAll();
                     }
                 }
-                if(mh.value.subject.equals("4")){
-                    //收到网络分组信息
-                    isTeam = 1;
-                }
                 if(mh.value.subject.equals("110")){
                     // 使用算法文件消息
                     isRequest = 1;
@@ -203,6 +182,7 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
         this.isRecall = 0;
         this.isTeam = 0;
         this.isRequest = 0;
+        this.algMap.clear();
     }
 
     public void report_validity() {

+ 32 - 52
src/main/java/io/renren/modules/sys/controller/algs/algTrainController.java

@@ -257,12 +257,12 @@ public class algTrainController {
     @GetMapping("/startTraining")
     public  R startTraining(String algorithmTrainingId) throws DockerException, InterruptedException, RegionConflictException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
         AlgTrain algTrain=algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
-        try {
+//        try {
 //            if(!DockerClientUtils.isContainerRunning(algTrain.getContainerId())){
 //                DockerClientUtils.startContainer(algTrain.getContainerId());
 //                Thread.sleep(7000);
 //            }
-            String minioPath = DockerClientUtils.execPython(algTrain.getContainerId(), "algTrain" + algTrain.getAlgorithmTrainingId(), algTrain.getRunfileName(), algorithmTrainingId);
+            String logFile = DockerClientUtils.execPython(algTrain.getContainerId(), "algTrain" + algTrain.getAlgorithmTrainingId(), algTrain.getRunfileName(), algorithmTrainingId);
 
             //完成训练后,将任务状态改为已结束,并保存结束时间
             algTrain.setMissStatus((byte) 3);
@@ -275,19 +275,20 @@ public class algTrainController {
             Date date = new Date();
             //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
             algTrain.setMissStopTime(date);
+            algTrain.setLogFile(logFile);
             algTrainService.update(algTrain);
 
             //保存日志到数据库
-            String algorithmTrainingLogContent = (String) getOutput(algorithmTrainingId, minioPath).get("output");
+            String algorithmTrainingLogContent = (String) getOutput(algorithmTrainingId, logFile).get("output");
             AlgTrainLog algTrainLog = new AlgTrainLog();
             algTrainLog.setAlgorithmTrainingId(Long.parseLong(algorithmTrainingId));
             algTrainLog.setAlgorithmTrainingLogContent(algorithmTrainingLogContent);
-            algTrainLog.setAlgorithmTrainingLogMinioPath(minioPath);
+            algTrainLog.setAlgorithmTrainingLogMinioPath(logFile);
             algTrainLogService.save(algTrainLog);
             //运行结束之后停掉docker
-        } finally {
+//        } finally {
 //            DockerClientUtils.stopContainer(algTrain.getContainerId());
-        }
+//        }
 
         // 产生数据
 //        publisher.publishMessage(
@@ -310,7 +311,7 @@ public class algTrainController {
 //        }catch (Exception e){
 //            throw e;
 //        }
-        return R.ok();
+        return R.ok().put("logFile", logFile);
     }
 
     @Async
@@ -358,6 +359,7 @@ public class algTrainController {
                 Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
                 executorService.submit(() -> {
                     try {
+                        changeMissStatus(String.valueOf(algorithmTrainingId));
                         startTraining(String.valueOf(algorithmTrainingId));
                     } catch (Exception e) {
                         throw new RRException(e.getMessage());
@@ -374,7 +376,7 @@ public class algTrainController {
 //        // 初始化时运行
 //        int flag = 0; //标志位,判断是否初始化成功
 //        if (subscriber.listener != null && subscriber.listener.isRequest == 1) {
-////            algRun(subscriber.listener.algMap);
+//            algRun(subscriber.listener.algMap);
 //            flag = 1;
 //        }
 //        if (flag == 1) {
@@ -382,19 +384,16 @@ public class algTrainController {
 //        }
 //    }
 
-    private void algRun(Map<String, String> algMap) throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException, RegionConflictException {
-        for(Map.Entry<String, String> map : algMap.entrySet()){
-            String key = map.getKey();
-            String missName = map.getValue();
-            AlgTrain algTrain = algTrainService.selectByMissName(missName);
-            Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
-            startTraining(String.valueOf(algorithmTrainingId));
-        }
-    }
-//
-//    private void resolverDDS(){
-//        subscriber.listener.algMap.get()
+//    private void algRun(Map<String, String> algMap) throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException, RegionConflictException {
+//        for(Map.Entry<String, String> map : algMap.entrySet()){
+//            String key = map.getKey();
+//            String missName = map.getValue();
+//            AlgTrain algTrain = algTrainService.selectByMissName(missName);
+//            Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
+//            startTraining(String.valueOf(algorithmTrainingId));
+//        }
 //    }
+//
     /**
      * Description:根据algorithmTrainingId,开始训练,并附带参数
      * @param algorithmTrainingId
@@ -466,39 +465,20 @@ public class algTrainController {
      * @return
      */
     @GetMapping("/getOutput")
-    public R getOutput(String algorithmTrainingId, String minioPath) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
-//        AlgTrain algTrain = algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
-//        DockerClientUtils.copyFile(algTrain.getContainerId(),"/" + returnFileName,"/opt/algTrain" + algorithmTrainingId);
-//        InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + returnFileName);
-//        if (inputStream == null) {
-//            return R.error("结果不存在");
-//        }
-//        String result = new BufferedReader(new InputStreamReader(inputStream))
-//                .lines().collect(Collectors.joining("\n"));
-//
-//        inputStream.close();
-////        System.out.println(result);
-//
-//        return R.ok().put("output",result);
-        String objectName = "";
-        if (minioPath == null) {
-            AlgTrainLog algTrainLog = algTrainLogService.selectByAlgTrainId(Long.parseLong(algorithmTrainingId));
-            objectName = algTrainLog.getAlgorithmTrainingLogMinioPath();
-        } else {
-            objectName = minioPath;
-        }
-        log.info("获取文件从minio的objectName {}", objectName);
-        try (InputStream fileInputStream = MinIoUtils.getFileInputStream("algorithm-train-task", objectName)) {
-            // 继续处理文件流
-            String result = new BufferedReader(new InputStreamReader(fileInputStream))
-                    .lines().collect(Collectors.joining("\n"));
-            log.info("result {}", result);
-            return R.ok().put("output", result);
-        } catch (ErrorResponseException e){
-            if (e.getMessage().equals("The specified key does not exist."))
-                return R.error("文件不存在,请确认文件位置或者重新上传");
-            else return R.error(e.getMessage());
+    public R getOutput(String algorithmTrainingId, String logFile) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
+        AlgTrain algTrain = algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
+        DockerClientUtils.copyFile(algTrain.getContainerId(),"/" + logFile,"/opt/algTrain" + algorithmTrainingId);
+        InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + logFile);
+        if (inputStream == null) {
+            return R.error("结果不存在");
         }
+        String result = new BufferedReader(new InputStreamReader(inputStream))
+                .lines().collect(Collectors.joining("\n"));
+
+        inputStream.close();
+//        System.out.println(result);
+
+        return R.ok().put("output",result);
     }
 
     /**

+ 2 - 0
src/main/java/io/renren/modules/sys/entity/algs/AlgTrain.java

@@ -42,4 +42,6 @@ public class AlgTrain {
     private String ipAddress;
 
     private Integer hasRun;
+
+    private String logFile;
 }

+ 2 - 0
src/main/java/io/renren/modules/sys/entity/algs/AlgTrain_Vo.java

@@ -43,4 +43,6 @@ public class AlgTrain_Vo {
     private String ipAddress;
 
     private Integer hasRun;
+
+    private String logFile;
 }

+ 15 - 4
src/main/resources/mapper/sys/algs/AlgTrainMapper.xml

@@ -17,10 +17,11 @@
     <result column="runfile_name" jdbcType="VARCHAR" property="runfileName" />
     <result column="ip_address" jdbcType="VARCHAR" property="ipAddress" />
     <result column="has_run" jdbcType="INTEGER" property="hasRun" />
+    <result column="log_file" jdbcType="VARCHAR" property="logFile" />
   </resultMap>
   <sql id="Base_Column_List">
     algorithm_training_id, miss_name, alg_frame_id, category_id, miss_status, miss_creation_time, miss_stop_time, remark,
-    uid, algorithm_id, version_id, container_id, runfile_name, ip_address, has_run
+    uid, algorithm_id, version_id, container_id, runfile_name, ip_address, has_run, log_file
   </sql>
   <select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
     select 
@@ -42,11 +43,11 @@
     insert into algorithm_training (algorithm_training_id, miss_name, alg_frame_id, category_id,
       miss_status, miss_creation_time, miss_stop_time, remark,
       uid, algorithm_id, version_id, 
-      container_id,runfile_name,ip_address, has_run)
+      container_id,runfile_name,ip_address, has_run, log_file)
     values (#{algorithmTrainingId,jdbcType=BIGINT}, #{missName,jdbcType=VARCHAR}, #{algFrameId,jdbcType=BIGINT}, #{categoryId,jdbcType=BIGINT},
       #{missStatus,jdbcType=TINYINT}, #{missCreationTime,jdbcType=TIMESTAMP}, #{missStopTime,jdbcType=TIMESTAMP}, #{remark,jdbcType=VARCHAR},
       #{uid,jdbcType=BIGINT}, #{algorithmId,jdbcType=BIGINT}, #{versionId,jdbcType=BIGINT}, 
-      #{containerId,jdbcType=VARCHAR},#{runfileName,jdbcType=VARCHAR},#{ipAddress,jdbcType=VARCHAR}, #{hasRun,jdbcType=INTEGER})
+      #{containerId,jdbcType=VARCHAR},#{runfileName,jdbcType=VARCHAR},#{ipAddress,jdbcType=VARCHAR}, #{hasRun,jdbcType=INTEGER},#{logFile,jdbcType=VARCHAR})
   </insert>
   <insert id="insertSelective" parameterType="io.renren.modules.sys.entity.algs.AlgTrain">
     insert into algorithm_training
@@ -96,6 +97,9 @@
       <if test="hasRun != null">
         has_run,
       </if>
+      <if test="logFile != null">
+        log_file,
+      </if>
     </trim>
     <trim prefix="values (" suffix=")" suffixOverrides=",">
       <if test="algorithmTrainingId != null">
@@ -143,6 +147,9 @@
       <if test="hasRun != null">
         #{hasRun,jdbcType=INTEGER},
       </if>
+      <if test="logFile != null">
+        #{logFile,jdbcType=INTEGER},
+      </if>
     </trim>
   </insert>
   <update id="updateByPrimaryKeySelective" parameterType="io.renren.modules.sys.entity.algs.AlgTrain">
@@ -190,6 +197,9 @@
       <if test="hasRun !=null">
         has_run = #{hasRun,jdbcType=INTEGER},
       </if>
+      <if test="logFile !=null">
+        log_file = #{logFile,jdbcType=INTEGER},
+      </if>
     </set>
     where algorithm_training_id = #{algorithmTrainingId,jdbcType=BIGINT}
   </update>
@@ -208,7 +218,8 @@
       container_id = #{containerId,jdbcType=VARCHAR},
       runfile_name=#{runfileName,jdbcType=VARCHAR},
       ip_address = #{ipAddress,jdbcType=VARCHAR},
-      has_run = #{hasRun,jdbcType=INTEGER}
+      has_run = #{hasRun,jdbcType=INTEGER},
+      log_file = #{logFile,jdbcType=VARCHAR}
     where algorithm_training_id = #{algorithmTrainingId,jdbcType=BIGINT}
   </update>
 </mapper>