Browse Source

[新增] 新增dds消息发布订阅

zishang 3 months ago
parent
commit
369284e103

+ 7 - 7
pom.xml

@@ -296,13 +296,13 @@
 			<artifactId>dom4j</artifactId>
 			<version>2.1.4</version>
 		</dependency>
-		<!--<dependency>-->
-			<!--<groupId>jdk.tools</groupId>-->
-			<!--<artifactId>jdk.tools</artifactId>-->
-			<!--<version>1.8</version>-->
-			<!--<scope>system</scope>-->
-			<!--<systemPath>D:/JAVA8/lib/tools.jar</systemPath>-->
-		<!--</dependency>-->
+		<dependency>
+			<groupId>jdk.tools</groupId>
+			<artifactId>jdk.tools</artifactId>
+			<version>1.8</version>
+			<scope>system</scope>
+			<systemPath>C:/software/Java/jdk1.8.0_271/lib/tools.jar</systemPath>
+		</dependency>
 <!--		<dependency>-->
 <!--			<groupId>javax.websocket</groupId>-->
 <!--			<artifactId>javax.websocket-api</artifactId>-->

+ 4 - 0
src/main/java/io/renren/RenrenApplication.java

@@ -12,9 +12,13 @@ import lgh.springboot.starter.hbase.template.HBaseTemplate;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 
 @SpringBootApplication
+@EnableScheduling// 启用定时任务
+@EnableAsync
 public class RenrenApplication {
 
 	public static void main(String[] args) {

+ 4 - 1
src/main/java/io/renren/common/exception/RRExceptionHandler.java

@@ -8,6 +8,7 @@
 
 package io.renren.common.exception;
 
+import com.jcraft.jsch.SftpException;
 import io.renren.common.utils.R;
 import org.apache.shiro.authz.AuthorizationException;
 import org.slf4j.Logger;
@@ -17,6 +18,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
 import org.springframework.web.servlet.NoHandlerFoundException;
 
+import java.io.FileNotFoundException;
+
 /**
  * 异常处理器
  *
@@ -35,7 +38,7 @@ public class RRExceptionHandler {
 		r.put("code", e.getCode());
 		r.put("msg", e.getMessage());
 
-		return r;
+		return R.error(e.getCode(), e.getMessage());
 	}
 
 	@ExceptionHandler(NoHandlerFoundException.class)

+ 18 - 10
src/main/java/io/renren/common/utils/DockerClientUtils.java

@@ -11,10 +11,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.net.URI;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.text.SimpleDateFormat;
+import java.util.*;
 
 public class DockerClientUtils {
     private static DockerClient docker;
@@ -146,11 +144,21 @@ public class DockerClientUtils {
      */
     public static String execPython(String containerId,String filePath,String fileName) throws DockerException, InterruptedException {
 
+        // 获取当前时间戳
+        long timestamp = System.currentTimeMillis();
+        String timestampString = String.valueOf(timestamp);
+        String outputFilePath = "/opt/" + filePath + "/algTrain" + timestampString + ".out";
+        String returnFileName = "algTrain" + timestampString + ".out";
+
         //创建要执行的命令
-        String[] execPython1={"nohup","python","-u","/opt/"+filePath+"/"+getOriginFileName(fileName),">output.log","2>&1","&"};
-        ExecCreation execCreation=docker.execCreate(containerId,execPython1,DockerClient.ExecCreateParam.attachStdout(),
-                DockerClient.ExecCreateParam.attachStderr(), DockerClient.ExecCreateParam.attachStdin(),
-                DockerClient.ExecCreateParam.tty(), DockerClient.ExecCreateParam.detach());
+//        String[] execPython1={"nohup","python","-u","/opt/" + filePath + "/"+getOriginFileName(fileName),">/opt" + filePath + "nohup.out", "2>&1","&"};
+        String pythonCommand = String.format("python -u \"/opt/%s/%s\" > \"%s\" 2>&1",
+                filePath, getOriginFileName(fileName), outputFilePath);
+        String[] execPython1={ "/bin/sh", "-c", pythonCommand};
+        ExecCreation execCreation=docker.execCreate(containerId,execPython1,
+                DockerClient.ExecCreateParam.attachStdout(),
+                DockerClient.ExecCreateParam.attachStderr(),
+                DockerClient.ExecCreateParam.attachStdin());
 
         //读取所执行命令的输出
         LogStream output=docker.execStart(execCreation.id());
@@ -165,10 +173,10 @@ public class DockerClientUtils {
         //打印输出
         String execPythonOutput=output.readFully();
         System.out.println(execPythonOutput);
-        return execPythonOutput;
+        return returnFileName;
     }
 
-    private static String getOriginFileName(String fileName) {
+    public static String getOriginFileName(String fileName) {
         String[] names = fileName.split("/");
         return names[names.length - 1];
     }

+ 8 - 4
src/main/java/io/renren/common/utils/FTPUtils.java

@@ -1,8 +1,11 @@
 package io.renren.common.utils;
 
 import com.jcraft.jsch.*;
+import io.renren.common.exception.RRException;
+import io.renren.common.exception.RRExceptionHandler;
 import org.apache.commons.net.ftp.FTPClient;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -135,15 +138,16 @@ public class FTPUtils {
      * @param filePath FTP服务器文件存放路径。文件的路径为basePath+filePath
      * @return 成功返回true,否则false
      */
-    public static InputStream downloadFile(String filePath) {
+    public static InputStream downloadFile(String filePath) throws FileNotFoundException {
         checkIsConnect();
         try {
             InputStream inputStream = sftp.get(filePath);
             return inputStream;
-        } catch (SftpException e) {
-            e.printStackTrace();
+        } catch (Exception e) {
+//            e.printStackTrace();
+            throw new RRException("文件不存在");
         }
-        return null;
+//        return null;
     }
 
     /**

+ 365 - 0
src/main/java/io/renren/dds/DataReaderListenerImpl.java

@@ -0,0 +1,365 @@
+package io.renren.dds;/*
+ *
+ *
+ * Distributed under the OpenDDS License.
+ * See: http://www.opendds.org/license.html
+ */
+
+import DDS.*;
+import Messenger.*;
+import OpenDDS.DCPS.*;
+import OpenDDS.DCPS.transport.*;
+import com.spotify.docker.client.exceptions.DockerException;
+import io.minio.errors.*;
+import io.renren.modules.sys.controller.algs.algTrainController;
+import io.renren.modules.sys.entity.algs.AlgTrain;
+import io.renren.modules.sys.entity.algs.Algorithm;
+import io.renren.modules.sys.service.AlgTrainService;
+import io.renren.modules.sys.service.AlgsService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+//import com.example.backend.model.MessageStatus;
+//import com.example.backend.model.Position;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
+
+    private int num_msgs = 0;
+    public int isInit = 0;
+    public int isRecall = 0;
+    public int isTeam = 0;
+    public int isRequest = 0;
+    public int versionID = 1;
+    public String text = "";
+    private static final int N_EXPECTED = 40;
+    private ArrayList<Boolean> counts = new ArrayList<Boolean>(N_EXPECTED);
+//    public Map<String, Position> positionMap = new ConcurrentHashMap<>();
+//    public Map<String, Position> radarPositionMap = new ConcurrentHashMap<>();
+//    public Map<String, List<String>> coopTeamMap = new ConcurrentHashMap<>();
+    public final Map<String, List<io.renren.modules.sys.entity.Message>> messageList = new ConcurrentHashMap<>();
+    public Map<String, AlgTrain> algMap = new ConcurrentHashMap<>();
+//    public Map<String, MessageStatus> messageStatusMap = new ConcurrentHashMap<>();
+
+    @Resource
+    AlgTrainService algTrainService;
+    @Autowired
+    private io.renren.modules.sys.controller.algs.algTrainController algTrainController;
+
+    private void initialize_counts() {
+        if (counts.size() > 0) {
+          return;
+        }
+
+        for (int i = 0; i < N_EXPECTED; ++i) {
+            counts.add(false);
+        }
+    }
+
+    public synchronized void on_data_available(DDS.DataReader reader) {
+
+        initialize_counts();
+
+        MessageDataReader mdr = MessageDataReaderHelper.narrow(reader);
+        if (mdr == null) {
+            System.err.println("ERROR: read: narrow failed.");
+            return;
+        }
+
+        MessageHolder mh = new MessageHolder(new Message());
+        SampleInfoHolder sih = new SampleInfoHolder(new SampleInfo(0, 0, 0,
+            new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0));
+        int status = mdr.take_next_sample(mh, sih);
+
+        if (status == RETCODE_OK.value) {
+
+            System.out.println("SampleInfo.sample_rank = "
+                                + sih.value.sample_rank);
+            System.out.println("SampleInfo.instance_state = "
+                                + sih.value.instance_state);
+
+            if (sih.value.valid_data) {
+
+                String prefix = "";
+                boolean invalid_count = false;
+                if (mh.value.count < 0 || mh.value.count >= counts.size()) {
+                    invalid_count = true;
+                }
+                else {
+                    if (counts.get(mh.value.count) == false){
+                        counts.set(mh.value.count, true);
+                    }
+                    else {
+                        prefix = "ERROR: Repeat ";
+                    }
+                }
+                System.out.println(prefix + "Message: subject    = " + mh.value.subject);
+                System.out.println("         subject_id = "
+                                   + mh.value.subject_id);
+                System.out.println("         from       = " + mh.value.from);
+                System.out.println("         count      = " + mh.value.count);
+                System.out.println("         text       = " + mh.value.text);
+                System.out.println("SampleInfo.sample_rank = "
+                                   + sih.value.sample_rank);
+                text = mh.value.text;
+                LocalDateTime now = LocalDateTime.now();
+
+                // 定义格式(例如:yyyy-MM-dd HH:mm:ss)
+                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+                // 格式化时间
+                String formattedDateTime = now.format(formatter);
+                if(mh.value.subject.equals("1")){
+                    if(isInit==1){
+                        System.out.println("已完成初始化,丢弃");
+                        return;
+                    }
+                    //收到初始化消息 完成初始化
+                    isInit = 1;
+                    String message = mh.value.text;
+                    String[] messageList = message.split("\n");
+                    String[] countList = messageList[0].split(" ");
+                    int count = Integer.parseInt(countList[0]);
+                    int radarCount = Integer.parseInt(countList[1]);
+                    for(int i=1;i<=count;i++){
+                        String[] s = messageList[i].split(" ");
+//                        Position p = new Position();
+//                        p.coopID = s[1];
+//                        p.jaming = s[2];
+//                        p.isReal = s[3];
+//                        p.x = Double.parseDouble(s[4]);
+//                        p.y = Double.parseDouble(s[5]);
+//                        p.z = Double.parseDouble(s[6]);
+//                        positionMap.put(s[0],p);
+                    }
+                    for(int i=count+1;i<=radarCount+count;i++){
+                        String[] s = messageList[i].split(" ");
+//                        Position p = new Position();
+//                        p.coopID = "未知";
+//                        p.jaming = "未知";
+//                        p.isReal = "未知";
+//                        p.x = Double.parseDouble(s[4]);
+//                        p.y = Double.parseDouble(s[5]);
+//                        p.z = Double.parseDouble(s[6]);
+//                        radarPositionMap.put(s[0],p);
+                    }
+                    //回复初始化应答
+                    isRecall = 1;
+                }
+                if(mh.value.subject.equals("3")){
+                    //收到平台位置信息
+                    if(isInit==0){
+                        System.out.println("未完成初始化,丢弃");
+                        return;
+                    }
+                    String message = mh.value.text;
+                    String[] messageList = message.split("\n");
+                    for(int i=1;i<messageList.length;i++){
+                        String[] s = messageList[i].split(" ");
+//                        Position p = new Position();
+//                        p.coopID = positionMap.get(s[0]).coopID;
+//                        p.jaming = positionMap.get(s[0]).jaming;
+//                        p.isReal = positionMap.get(s[0]).isReal;
+//                        p.x = Double.parseDouble(s[1]);
+//                        p.y = Double.parseDouble(s[2]);
+//                        p.z = Double.parseDouble(s[3]);
+//                        positionMap.put(s[0],p);
+                    }
+                }
+                if(mh.value.subject.equals("2")){
+                    if(mh.value.text.equals("4")){
+                        resetAll();
+                    }
+                }
+                if(mh.value.subject.equals("5")){
+                    //收到雷达脉冲消息
+                    String[] s = mh.value.from.split(" ");
+                    String recivier = s[2];
+                    String sender = s[0];
+//                    List<com.example.backend.model.Message> list = messageList.get(recivier)==null ? new ArrayList<>() : messageList.get(recivier);
+//                    list.add(new com.example.backend.model.Message(formattedDateTime,sender,"雷达脉冲",mh.value.text,"0"));
+//                    messageList.put(recivier,list);
+
+                }
+                if(mh.value.subject.equals("6")){
+                    //收到电子干扰消息
+                    String[] s = mh.value.from.split(" ");
+                    String recivier = s[2];
+                    String sender = s[0];
+//                    List<com.example.backend.model.Message> list = messageList.get(recivier)==null ? new ArrayList<>() : messageList.get(recivier);
+//                    list.add(new com.example.backend.model.Message(formattedDateTime,sender,"电子干扰",mh.value.text,"0"));
+//                    messageList.put(recivier,list);
+
+                }
+                if(mh.value.subject.equals("7")){
+                    //收到综合情报消息
+                    String[] s = mh.value.from.split(" ");
+                    String recivier = s[2];
+                    String sender = s[0];
+//                    List<com.example.backend.model.Message> list = messageList.get(recivier)==null ? new ArrayList<>() : messageList.get(recivier);
+//                    list.add(new com.example.backend.model.Message(formattedDateTime,sender,"综合情报",mh.value.text,"0"));
+//                    messageList.put(recivier,list);
+
+                }
+                if(mh.value.subject.equals("4")){
+                    //收到网络分组信息
+                    String message = mh.value.text;
+                    String[] messageList = message.split("\n");
+                    String[] countList = messageList[0].split(" ");
+                    if(Integer.parseInt(countList[2])<=versionID)return;//旧版本或重复
+                    for(int i=1;i<=Integer.parseInt(countList[0]);i++){
+                        String[] s = messageList[i].split(" ");
+                        String coopID = s[0];
+                        List<String> list = new ArrayList<>();
+                        for(int j=2;j<s.length;j++){
+                            list.add(s[j]);
+                        }
+//                        coopTeamMap.put(coopID,list);
+                    }
+                    isTeam = 1;
+                }
+                if(mh.value.subject.equals("9")){
+                    //收到分组状态参数控制
+                    String message = mh.value.text;
+                    String[] s = message.split(" ");
+//                    MessageStatus m = new MessageStatus();
+//                    m.startUp = s[1];
+//                    m.quite = s[2];
+//                    m.mode = s[3];
+//                    messageStatusMap.put(s[0],m);
+                }
+                if(mh.value.subject.equals("10")){
+                    //收到分组状态参数初始化
+                    String message = mh.value.text;
+                    String[] messageList = message.split("\n");
+                    String[] countList = messageList[0].split(" ");
+                    for(int i=1;i<=Integer.parseInt(countList[0]);i++){
+                        String[] s = messageList[i].split(" ");
+                        String platformID = s[0];
+//                        MessageStatus m = new MessageStatus();
+//                        m.startUp = s[1];
+//                        m.quite = s[2];
+//                        m.mode = s[3];
+//                        messageStatusMap.put(platformID,m);
+                    }
+                }
+                if(mh.value.subject.equals("11")){
+                    // 使用算法文件消息
+                    String message = mh.value.text;
+                    String[] messageList = message.split("\n");
+                    String[] countList = messageList[0].split(" ");
+                    for(int i=1;i<=Integer.parseInt(countList[0]);i++){
+                        String[] s = messageList[i].split(" ");
+                        String platformID = s[0];
+                        AlgTrain algTrain = algTrainService.selectByMissName(s[1]);
+                        algMap.put(platformID,algTrain);
+//                        if (algTrain != null){
+//                            try {
+//                                algTrainController.startTraining(String.valueOf(algTrain.getAlgorithmTrainingId()));
+//                            } catch (Exception e) {
+//                                throw new RuntimeException(e);
+//                            }
+//
+//                        }
+                    }
+                }
+                if(mh.value.subject.equals("111")){
+                    //收到算法请求信息
+                    isRequest = 1;
+                }
+
+                if (invalid_count == true) {
+                    System.out.println("ERROR: Invalid message.count (" + mh.value.count + ")");
+                }
+                if (!mh.value.from.equals("Comic Book Guy") && !mh.value.from.equals("OpenDDS-Java")) {
+                    System.out.println("ERROR: Invalid message.from (" + mh.value.from + ")");
+                }
+                if (!mh.value.subject.equals("Review")) {
+                    System.out.println("ERROR: Invalid message.subject (" + mh.value.subject + ")");
+                }
+                if (!mh.value.text.equals("Worst. Movie. Ever.")) {
+                    System.out.println("ERROR: Invalid message.text (" + mh.value.text + ")");
+                }
+                if (mh.value.subject_id != 99) {
+                    System.out.println("ERROR: Invalid message.subject_id (" + mh.value.subject_id + ")");
+                }
+            }
+            else if (sih.value.instance_state ==
+                     NOT_ALIVE_DISPOSED_INSTANCE_STATE.value) {
+                System.out.println("instance is disposed");
+            }
+            else if (sih.value.instance_state ==
+                     NOT_ALIVE_NO_WRITERS_INSTANCE_STATE.value) {
+                System.out.println("instance is unregistered");
+            }
+            else {
+                System.out.println("DataReaderListenerImpl::on_data_available: "
+                                   + "ERROR: received unknown instance state "
+                                   + sih.value.instance_state);
+            }
+
+        } else if (status == RETCODE_NO_DATA.value) {
+            System.err.println("ERROR: reader received DDS::RETCODE_NO_DATA!");
+        } else {
+            System.err.println("ERROR: read Message: Error: " + status);
+        }
+    }
+
+    public void on_requested_deadline_missed(DDS.DataReader reader, DDS.RequestedDeadlineMissedStatus status) {
+        System.err.println("DataReaderListenerImpl.on_requested_deadline_missed");
+    }
+
+    public void on_requested_incompatible_qos(DDS.DataReader reader, DDS.RequestedIncompatibleQosStatus status) {
+        System.err.println("DataReaderListenerImpl.on_requested_incompatible_qos");
+    }
+
+    public void on_sample_rejected(DDS.DataReader reader, DDS.SampleRejectedStatus status) {
+        System.err.println("DataReaderListenerImpl.on_sample_rejected");
+    }
+
+    public void on_liveliness_changed(DDS.DataReader reader, DDS.LivelinessChangedStatus status) {
+        System.err.println("DataReaderListenerImpl.on_liveliness_changed");
+    }
+
+    public void on_subscription_matched(DDS.DataReader reader, DDS.SubscriptionMatchedStatus status) {
+        System.err.println("DataReaderListenerImpl.on_subscription_matched");
+    }
+
+    public void on_sample_lost(DDS.DataReader reader, DDS.SampleLostStatus status) {
+        System.err.println("DataReaderListenerImpl.on_sample_lost");
+    }
+
+    public void resetAll(){
+//        this.positionMap.clear();
+//        this.radarPositionMap.clear();
+//        this.coopTeamMap.clear();
+        this.messageList.clear();
+//        this.messageStatusMap.clear();
+        this.isInit = 0;
+        this.isRecall = 0;
+        this.isTeam = 0;
+        this.isRequest = 0;
+    }
+
+    public void report_validity() {
+        int count = 0;
+        int missed_counts = 0;
+        for (Boolean val : counts) {
+            if (val == false)
+                ++missed_counts;
+        }
+        if (missed_counts > 0) {
+            System.out.println("ERROR: Missing " + missed_counts + " messages");
+        }
+    }
+}

+ 241 - 0
src/main/java/io/renren/dds/TestPublisher.java

@@ -0,0 +1,241 @@
+package io.renren.dds;/*
+ *
+ *
+ * Distributed under the OpenDDS License.
+ * See: http://www.opendds.org/license.html
+ */
+
+import DDS.*;
+import Messenger.*;
+import OpenDDS.DCPS.*;
+import org.omg.CORBA.StringSeqHolder;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.*;
+
+@Component
+public class TestPublisher {
+
+    private static final int N_MSGS = 20;
+    private int count = 0;
+    private DomainParticipantFactory dpf;
+    private DomainParticipant dp;
+    private MessageTypeSupportImpl servant;
+    private Topic top;
+    private Publisher pub;
+    private DataWriterQos dw_qos;
+    private DataWriterQosHolder qosh;
+    private DataReaderListenerImpl listener;
+    private DataWriter dw ;
+    private StatusCondition sc;
+    private WaitSet ws;
+    private PublicationMatchedStatusHolder matched;
+    private Duration_t timeout;
+    private MessageDataWriter mdw;
+    private int instanceHandle;
+    private BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
+    public volatile boolean isInitialized = false;
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+    private volatile boolean running = true;
+    public TestPublisher() throws Exception {
+        String[] s = new String[4];
+        s[0] = "-DCPSBit";
+        s[1] = "0";
+        s[2] = "-DCPSConfigFile";
+        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+        executor.submit(() -> initialize(s)); // 初始化放在后台线程
+    }
+    public static boolean checkReliable(String[] args) {
+        for (int i = 0; i < args.length; ++i) {
+            if (args[i].equals("-r")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static boolean checkWaitForAcks(String[] args) {
+        for (int i = 0; i < args.length; ++i) {
+            if (args[i].equals("-w")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void initialize(String[] args) {
+        try {
+            // ... 原有初始化代码到创建DataWriter为止...
+            System.out.println("Start Publisher");
+            boolean reliable = checkReliable(args);
+            boolean waitForAcks = checkWaitForAcks(args);
+
+            dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(args));
+            if (dpf == null) {
+                System.err.println("ERROR: Domain Participant Factory not found");
+                return;
+            }
+            dp = dpf.create_participant(4,
+                    PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
+            if (dp == null) {
+                System.err.println("ERROR: Domain Participant creation failed");
+                return;
+            }
+
+            servant = new MessageTypeSupportImpl();
+            if (servant.register_type(dp, "") != RETCODE_OK.value) {
+                System.err.println("ERROR: register_type failed");
+                return;
+            }
+
+            top = dp.create_topic("Movie Discussion List",
+                    servant.get_type_name(),
+                    TOPIC_QOS_DEFAULT.get(),
+                    null,
+                    DEFAULT_STATUS_MASK.value);
+            if (top == null) {
+                System.err.println("ERROR: Topic creation failed");
+                return;
+            }
+
+            pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
+                    DEFAULT_STATUS_MASK.value);
+            if (pub == null) {
+                System.err.println("ERROR: Publisher creation failed");
+                return;
+            }
+
+            // Use the default transport configuration (do nothing)
+
+            dw_qos = new DataWriterQos();
+            dw_qos.durability = new DurabilityQosPolicy();
+            dw_qos.durability.kind = DurabilityQosPolicyKind.from_int(0);
+            dw_qos.durability_service = new DurabilityServiceQosPolicy();
+            dw_qos.durability_service.history_kind = HistoryQosPolicyKind.from_int(0);
+            dw_qos.durability_service.service_cleanup_delay = new Duration_t();
+            dw_qos.deadline = new DeadlineQosPolicy();
+            dw_qos.deadline.period = new Duration_t();
+            dw_qos.latency_budget = new LatencyBudgetQosPolicy();
+            dw_qos.latency_budget.duration = new Duration_t();
+            dw_qos.liveliness = new LivelinessQosPolicy();
+            dw_qos.liveliness.kind = LivelinessQosPolicyKind.from_int(0);
+            dw_qos.liveliness.lease_duration = new Duration_t();
+            dw_qos.reliability = new ReliabilityQosPolicy();
+            dw_qos.reliability.kind = ReliabilityQosPolicyKind.from_int(0);
+            dw_qos.reliability.max_blocking_time = new Duration_t();
+            dw_qos.destination_order = new DestinationOrderQosPolicy();
+            dw_qos.destination_order.kind = DestinationOrderQosPolicyKind.from_int(0);
+            dw_qos.history = new HistoryQosPolicy();
+            dw_qos.history.kind = HistoryQosPolicyKind.from_int(0);
+            dw_qos.resource_limits = new ResourceLimitsQosPolicy();
+            dw_qos.transport_priority = new TransportPriorityQosPolicy();
+            dw_qos.lifespan = new LifespanQosPolicy();
+            dw_qos.lifespan.duration = new Duration_t();
+            dw_qos.user_data = new UserDataQosPolicy();
+            dw_qos.user_data.value = new byte[0];
+            dw_qos.ownership = new OwnershipQosPolicy();
+            dw_qos.ownership.kind = OwnershipQosPolicyKind.from_int(0);
+            dw_qos.ownership_strength = new OwnershipStrengthQosPolicy();
+            dw_qos.writer_data_lifecycle = new WriterDataLifecycleQosPolicy();
+            dw_qos.representation = new DataRepresentationQosPolicy();
+            dw_qos.representation.value = new short[0];
+
+            qosh = new DataWriterQosHolder(dw_qos);
+            pub.get_default_datawriter_qos(qosh);
+            qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
+            if (reliable) {
+                qosh.value.reliability.kind =
+                        ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
+            }
+            dw = pub.create_datawriter(top,
+                    qosh.value,
+                    null,
+                    DEFAULT_STATUS_MASK.value);
+            if (dw == null) {
+                System.err.println("ERROR: DataWriter creation failed");
+                return;
+            }
+            System.out.println("Publisher Created DataWriter");
+
+            sc = dw.get_statuscondition();
+            sc.set_enabled_statuses(PUBLICATION_MATCHED_STATUS.value);
+            ws = new WaitSet();
+            ws.attach_condition(sc);
+            matched = new PublicationMatchedStatusHolder(new PublicationMatchedStatus());
+            timeout = new Duration_t(DURATION_INFINITE_SEC.value,
+                    DURATION_INFINITE_NSEC.value);
+
+
+
+            ws.detach_condition(sc);
+
+            mdw = MessageDataWriterHelper.narrow(dw);
+            // 等待匹配的逻辑保持原样
+            while (running) {
+                System.out.println("等待match中");
+                final int result = dw.get_publication_matched_status(matched);
+                if (result != RETCODE_OK.value) {
+                    System.err.println("ERROR: get_publication_matched_status()" +
+                            "failed.");
+                    return;
+                }
+                // 原有匹配等待逻辑...
+                if (matched.value.current_count >= 1) {
+                    System.out.println("Publisher Matched");
+                    isInitialized = true;
+                    break;
+                }
+                // 修改等待时间为有限等待
+                Duration_t timeout = new Duration_t(1, 0);
+                ConditionSeqHolder cond = new ConditionSeqHolder(new Condition[]{});
+                ws.wait(cond, timeout);
+            }
+
+            // 创建消息模板
+            Message template = new Message();
+            template.subject_id = 99;
+            instanceHandle = mdw.register_instance(template);
+
+            // 启动消息处理线程
+            executor.submit(this::processMessageQueue);
+
+        } catch (Exception e) {
+            // 异常处理...
+        }
+    }
+    public void publishMessage(String from, String subject, String text) {
+        if (!isInitialized) {
+            throw new IllegalStateException("Publisher not initialized");
+        }
+
+        Message msg = new Message();
+        msg.subject_id = 99;
+        msg.from = from;
+        msg.subject = subject;
+        msg.text = text;
+        msg.count = count++; // 根据需求调整
+
+        messageQueue.offer(msg); // 非阻塞式添加
+    }
+
+    private void processMessageQueue() {
+        try {
+            while (running) {
+                Message msg = messageQueue.poll(1, TimeUnit.SECONDS); // 带超时的获取
+                if (msg != null) {
+                    int ret;
+                    do {
+                        ret = mdw.write(msg, instanceHandle);
+                        if (ret != RETCODE_OK.value) {
+                            Thread.sleep(100); // 失败时短暂等待
+                        }
+                    } while (running && ret == RETCODE_TIMEOUT.value);
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+
+}

+ 188 - 0
src/main/java/io/renren/dds/TestSubscriber.java

@@ -0,0 +1,188 @@
+package io.renren.dds;/*
+ *
+ *
+ * Distributed under the OpenDDS License.
+ * See: http://www.opendds.org/license.html
+ */
+
+import DDS.*;
+import Messenger.*;
+import OpenDDS.DCPS.*;
+import org.omg.CORBA.StringSeqHolder;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Component
+public class TestSubscriber {
+    private DomainParticipantFactory dpf;
+    private DomainParticipant dp;
+    private MessageTypeSupportImpl servant;
+    private Topic top;
+    private Subscriber sub;
+    private DataReaderQos dr_qos;
+    private DataReaderQosHolder qosh;
+    public DataReaderListenerImpl listener;
+    private DataReader dr;
+    private StatusCondition sc;
+    private WaitSet ws;
+    private SubscriptionMatchedStatusHolder matched;
+    private Duration_t timeout;
+    // 添加线程控制相关成员
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+    private volatile boolean running = true;
+
+    public static boolean checkReliable(String[] args) {
+        for (int i = 0; i < args.length; ++i) {
+            if (args[i].equals("-r")) {
+                return true;
+            }
+        }
+        return false;
+    }
+    public TestSubscriber() throws Exception {
+        String[] s = new String[4];
+        s[0] = "-DCPSBit";
+        s[1] = "0";
+        s[2] = "-DCPSConfigFile";
+        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+        executor.submit(() -> initialize(s));
+    }
+    public String getText(){
+        return listener.text;
+    }
+    private void initialize(String[] args) {
+        try {
+            // ... 保持原有初始化代码到创建DataReader为止...
+
+            System.out.println("Start Subscriber");
+            boolean reliable = checkReliable(args);
+
+            dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(args));
+            if (dpf == null) {
+                System.err.println("ERROR: Domain Participant Factory not found");
+                return;
+            }
+            dp = dpf.create_participant(4,
+                    PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
+            if (dp == null) {
+                System.err.println("ERROR: Domain Participant creation failed");
+                return;
+            }
+
+            servant = new MessageTypeSupportImpl();
+            if (servant.register_type(dp, "") != RETCODE_OK.value) {
+                System.err.println("ERROR: register_type failed");
+                return;
+            }
+            top = dp.create_topic("Movie Discussion List",
+                    servant.get_type_name(),
+                    TOPIC_QOS_DEFAULT.get(),
+                    null,
+                    DEFAULT_STATUS_MASK.value);
+            if (top == null) {
+                System.err.println("ERROR: Topic creation failed");
+                return;
+            }
+
+            sub = dp.create_subscriber(SUBSCRIBER_QOS_DEFAULT.get(),
+                    null, DEFAULT_STATUS_MASK.value);
+            if (sub == null) {
+                System.err.println("ERROR: Subscriber creation failed");
+                return;
+            }
+
+            // Use the default transport (do nothing)
+
+            dr_qos = new DataReaderQos();
+            dr_qos.durability = new DurabilityQosPolicy();
+            dr_qos.durability.kind = DurabilityQosPolicyKind.from_int(0);
+            dr_qos.deadline = new DeadlineQosPolicy();
+            dr_qos.deadline.period = new Duration_t();
+            dr_qos.latency_budget = new LatencyBudgetQosPolicy();
+            dr_qos.latency_budget.duration = new Duration_t();
+            dr_qos.liveliness = new LivelinessQosPolicy();
+            dr_qos.liveliness.kind = LivelinessQosPolicyKind.from_int(0);
+            dr_qos.liveliness.lease_duration = new Duration_t();
+            dr_qos.reliability = new ReliabilityQosPolicy();
+            dr_qos.reliability.kind = ReliabilityQosPolicyKind.from_int(0);
+            dr_qos.reliability.max_blocking_time = new Duration_t();
+            dr_qos.destination_order = new DestinationOrderQosPolicy();
+            dr_qos.destination_order.kind = DestinationOrderQosPolicyKind.from_int(0);
+            dr_qos.history = new HistoryQosPolicy();
+            dr_qos.history.kind = HistoryQosPolicyKind.from_int(0);
+            dr_qos.resource_limits = new ResourceLimitsQosPolicy();
+            dr_qos.user_data = new UserDataQosPolicy();
+            dr_qos.user_data.value = new byte[0];
+            dr_qos.ownership = new OwnershipQosPolicy();
+            dr_qos.ownership.kind = OwnershipQosPolicyKind.from_int(0);
+            dr_qos.time_based_filter = new TimeBasedFilterQosPolicy();
+            dr_qos.time_based_filter.minimum_separation = new Duration_t();
+            dr_qos.reader_data_lifecycle = new ReaderDataLifecycleQosPolicy();
+            dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay = new Duration_t();
+            dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay = new Duration_t();
+            dr_qos.representation = new DataRepresentationQosPolicy();
+            dr_qos.representation.value = new short[0];
+
+            qosh = new DataReaderQosHolder(dr_qos);
+            sub.get_default_datareader_qos(qosh);
+            if (reliable) {
+                qosh.value.reliability.kind =
+                        ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
+            }
+            qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
+
+            listener = new DataReaderListenerImpl();
+            dr = sub.create_datareader(top,
+                    qosh.value,
+                    listener,
+                    DEFAULT_STATUS_MASK.value);
+            if (dr == null) {
+                System.err.println("ERROR: DataReader creation failed");
+                return;
+            }
+
+            sc = dr.get_statuscondition();
+            sc.set_enabled_statuses(SUBSCRIPTION_MATCHED_STATUS.value);
+            ws = new WaitSet();
+            ws.attach_condition(sc);
+            matched = new SubscriptionMatchedStatusHolder(new SubscriptionMatchedStatus());
+
+//        timeout = new Duration_t(DURATION_INFINITE_SEC.value,
+//                DURATION_INFINITE_NSEC.value);
+
+            boolean matched_pub = false;
+
+
+             timeout = new Duration_t(1, 0); // 1秒超时
+
+            while (running) {
+                final int result = dr.get_subscription_matched_status(matched);
+                if (result != RETCODE_OK.value) {
+                    System.err.println("ERROR: get_subscription_matched_status() failed");
+                    break;
+                }
+
+                if (matched.value.current_count > 0) {
+                    System.out.println("Subscriber Matched");
+                    break;
+                }
+
+                ConditionSeqHolder cond = new ConditionSeqHolder(new Condition[]{});
+                if (ws.wait(cond, timeout) != RETCODE_OK.value && running) {
+                    System.err.println("ERROR: wait() failed");
+                    break;
+                }
+            }
+
+            System.out.println("Subscriber initialization completed");
+
+        } catch (Exception e) {
+            if (running) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}

+ 71 - 5
src/main/java/io/renren/modules/sys/controller/algs/algTrainController.java

@@ -1,5 +1,6 @@
 package io.renren.modules.sys.controller.algs;
 
+import ch.qos.logback.core.BasicStatusManager;
 import com.jcraft.jsch.ChannelSftp;
 import com.spotify.docker.client.LogStream;
 import com.spotify.docker.client.exceptions.DockerException;
@@ -10,6 +11,8 @@ import io.renren.common.utils.*;
 import io.renren.common.validator.ValidatorUtils;
 import io.renren.common.validator.group.AddGroup;
 import io.renren.common.validator.group.UpdateGroup;
+import io.renren.dds.TestPublisher;
+import io.renren.dds.TestSubscriber;
 import io.renren.modules.sys.entity.algs.*;
 import io.renren.modules.sys.entity.serverport.ServerPort;
 import io.renren.modules.sys.service.*;
@@ -22,16 +25,20 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartHttpServletRequest;
 
+import javax.annotation.PostConstruct;
 import javax.validation.constraints.Min;
 import java.io.*;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static io.renren.common.utils.ShiroUtils.getUserId;
@@ -67,6 +74,15 @@ public class algTrainController {
     VersionService versionService;
     @Autowired
     ServerPortService serverPortService;
+
+    public final TestSubscriber subscriber = new TestSubscriber();
+    public final TestPublisher publisher = new TestPublisher();
+
+    private static String returnFileName;
+
+    public algTrainController() throws Exception {
+    }
+
     /**
      * 所有列表
      */
@@ -232,7 +248,7 @@ public class algTrainController {
     @GetMapping("/startTraining")
     public R startTraining(String algorithmTrainingId) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
         AlgTrain algTrain=algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
-        String execOutput=DockerClientUtils.execPython(algTrain.getContainerId(),"algTrain"+algTrain.getAlgorithmTrainingId(),algTrain.getRunfileName());
+        returnFileName = DockerClientUtils.execPython(algTrain.getContainerId(),"algTrain"+algTrain.getAlgorithmTrainingId(),algTrain.getRunfileName());
 
         //完成训练后,将任务状态改为已结束,并保存结束时间
         algTrain.setMissStatus((byte) 3);
@@ -254,7 +270,14 @@ public class algTrainController {
         algTrainLog.setAlgorithmTrainingLogContent(algorithmTrainingLogContent);
         algTrainLogService.save(algTrainLog);
 
-        System.out.println(algTrainLogService.selectByAlgTrainId(Long.parseLong(algorithmTrainingId)).getAlgorithmTrainingLogContent());
+        // 产生数据
+        publisher.publishMessage(
+                "1",
+                "11",
+                "算法结果" + algorithmTrainingLogContent
+        );
+
+//        System.out.println(algTrainLogService.selectByAlgTrainId(Long.parseLong(algorithmTrainingId)).getAlgorithmTrainingLogContent());
 
         //若有tensorboard文件 则保存至minio
         /*Vector<ChannelSftp.LsEntry> vector=FTPUtils.showFiles("algTrain"+algorithmTrainingId);
@@ -270,6 +293,45 @@ public class algTrainController {
 //        }
         return R.ok();
     }
+
+    @Async
+    @Scheduled(fixedRate = 6000)//每6秒执行一次,获取消息
+    public void checkAlgRequest() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
+        if(!publisher.isInitialized){
+            return;
+        }
+        if (subscriber.listener != null && subscriber.listener.isRequest == 0) {
+            publisher.publishMessage(
+                    "4",
+                    "111",
+                    "算法文件使用请求"
+            );
+        }
+        publisher.publishMessage(
+                "2",
+                "8",
+                ""
+        );
+        if(subscriber.listener.algMap != null) {
+            for(Map.Entry<String, AlgTrain> map : subscriber.listener.algMap.entrySet()){
+                String key = map.getKey();
+                AlgTrain algTrain = map.getValue();
+                startTraining(String.valueOf(algTrain.getAlgorithmTrainingId()));
+            }
+        }
+    }
+
+//    @PostConstruct
+//    public void init() {
+//        // 初始化 publisher 和 subscriber
+//        publisher.initialize();
+//        subscriber.initialize(); // 假设有初始化方法
+//        initialized = true;
+//    }
+//
+//    private void resolverDDS(){
+//        subscriber.listener.algMap.get()
+//    }
     /**
      * Description:根据algorithmTrainingId,开始训练,并附带参数
      * @param algorithmTrainingId
@@ -342,9 +404,12 @@ public class algTrainController {
      */
     @GetMapping("/getOutput")
     public R getOutput(String algorithmTrainingId) throws DockerException, InterruptedException, IOException, InvalidResponseException, InvalidKeyException, NoSuchAlgorithmException, ErrorResponseException, XmlParserException, InvalidBucketNameException, InsufficientDataException, InternalException {
-        AlgTrain algTrain=algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
-        DockerClientUtils.copyFile(algTrain.getContainerId(),"/nohup.out","/opt/algTrain"+algTrain.getAlgorithmTrainingId());
-        InputStream inputStream=FTPUtils.downloadFile("/opt/uploadFile/algTrain"+algorithmTrainingId+"/nohup.out");
+        AlgTrain algTrain = algTrainService.selectByPrimaryKey(Long.parseLong(algorithmTrainingId));
+        DockerClientUtils.copyFile(algTrain.getContainerId(),"/" + returnFileName,"/opt/algTrain" + algorithmTrainingId);
+        InputStream inputStream = FTPUtils.downloadFile("/opt/uploadFile/algTrain" + algorithmTrainingId + "/" + returnFileName);
+        if (inputStream == null) {
+            return R.error("文件不存在");
+        }
         String result = new BufferedReader(new InputStreamReader(inputStream))
                 .lines().collect(Collectors.joining("\n"));
         inputStream.close();
@@ -404,6 +469,7 @@ public class algTrainController {
         Date date = new Date();
         //SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         algTrain.setMissCreationTime(date);
+        algTrain.setMissStopTime(null);
         algTrainService.update(algTrain);
         return R.ok();
     }

+ 13 - 0
src/main/java/io/renren/modules/sys/entity/Message.java

@@ -0,0 +1,13 @@
+package io.renren.modules.sys.entity;
+
+import lombok.Data;
+
+@Data
+public class Message {
+    String Time;
+    String send;
+    String topic;
+    String text;
+    String receiver;
+    String isSend = "0";
+}

+ 3 - 0
src/main/java/io/renren/modules/sys/entity/algs/AlgTrain.java

@@ -1,5 +1,7 @@
 package io.renren.modules.sys.entity.algs;
 
+import com.baomidou.mybatisplus.annotation.FieldStrategy;
+import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.Data;
@@ -22,6 +24,7 @@ public class AlgTrain {
 
     private Date missCreationTime;
 
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Date missStopTime;
 
     private String remark;

+ 8 - 5
src/main/resources/application-dev.yml

@@ -6,9 +6,10 @@ spring:
             driver-class-name: com.mysql.cj.jdbc.Driver
             #43.143.221.128:6033/renren_fast
 #            url: jdbc:mysql://10.170.57.219:3306/renren_fast?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
-            url: jdbc:mysql://10.170.57.219:3306/renren_fast?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
+            url: jdbc:mysql://localhost:3306/renren_fast?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
             username: root
-            password: XDUxdu2023
+#            password: XDUxdu2023
+            password: 123456
             initial-size: 10
             max-active: 100
             min-idle: 10
@@ -51,9 +52,10 @@ dynamic:
         #            driver-class-name: com.mysql.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
         heBing:
 #            url: jdbc:mysql://10.170.57.219:3306/hebing_menu?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
-            url: jdbc:mysql://10.170.57.219:3306/hebing_menu?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
+            url: jdbc:mysql://localhost:3306/hebing_menu?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
             username: root
-            password: XDUxdu2023
+#            password: XDUxdu2023
+            password: 123456
             driver-class-name: com.mysql.cj.jdbc.Driver
 
 server:
@@ -63,7 +65,8 @@ server:
 
 hbase:
     config:
-        hbase.zookeeper.quorum: 10.170.7.125
+#        hbase.zookeeper.quorum: 10.204.8.22
+        hbase.zookeeper.quorum: 192.168.179.130
         hbase.zookeeper.property.clientPort: 2181
         zookeeper.znode.parent: /hbase
         hbase.client.keyvalue.maxsize: 1048576000

+ 15 - 13
src/main/resources/application.yml

@@ -9,39 +9,41 @@ server:
     context-path: /renren-fast
 # minio
 minio:
-  endpoint: http://10.170.7.125:9000
+#  endpoint: http://10.204.8.22:9000
+  endpoint: http://192.168.179.130:9000
   bucket: test0706
   access-key: minio
   secret-key: minio123
 
 # docker远程连接
 docker:
-  url: https://10.170.7.125:2376
-remote-docker: 10.170.7.125
+  url: https://192.168.179.130:2376
+remote-docker: 192.168.179.130
 # 上传文件到远程服务器地址,用户名及密码,上传文件的基础路径
-host: 10.170.7.125
+host: 192.168.179.130
 host-username: root
-host-password: kylin123
+host-password: 123456
+#host-password: 123456
 # host-basepath: D:/aiplat-mkcloud/aiplat/uploadFile
 host-basepath: /opt/uploadFile
 host-port: 22
 # 连接远程服务器是否需要秘钥
 key-needed: false
 # 如果需要秘钥,则将秘钥地址填写在此,这是ftp服务器的密钥不是ca的密钥
-key-location: D:/aiplat-mkcloud/aiplat/docker_ca
+key-location: C:/aiplat-mkcloud/aiplat/docker_ca/
 
 # docker证书存放路径
-docker_ca: D:/aiplat-mkcloud/aiplat/docker_ca
+docker_ca: C:/aiplat-mkcloud/aiplat/docker_ca/
 
 
 # 算法结果入库模板ip
-resultToDB: 10.170.7.125
+resultToDB: 10.204.8.22
 
 # 数据预处理数据集存放文件夹
-datasetLocation: D:/aiplat-mkcloud/aiplat/demoCSV
+datasetLocation: C:/aiplat-mkcloud/aiplat/demoCSV
 
 # 数据预处理python文件存放文件夹
-dataPreProcessLocation: D:/aiplat-mkcloud/aiplat/dataPreProcess
+dataPreProcessLocation: C:/aiplat-mkcloud/aiplat/dataPreProcess
 
 spring:
   # 环境 dev|test|prod
@@ -114,8 +116,8 @@ renren:
 
 # Argo
 argo:
-  basepath: https://10.170.7.125:32742
-  minioendpoint: http://10.170.7.125:32626
+  basepath: https://192.168.179.130:32742
+  minioendpoint: http://192.168.179.130:32626
   minioAccess-key: admin
   minioSecret-key: password
   sparkMasterRest: spark://150.158.138.99:16066
@@ -126,7 +128,7 @@ argo:
 # kubenetes  Argo部分获取pod日志
 Akubenetes:
   basepath: https://43.143.237.223:30880
-  kubeconfig-path: D:/aiplat-mkcloud/aiplat/akubenetes/config
+  kubeconfig-path: C:/aiplat-mkcloud/aiplat/akubenetes/config
 
 # griffin数据探查服务
 griffin: http://43.143.224.212:8091