Browse Source

[bug] 修改发布信息已经订阅消息部分的bug

zishang 3 months ago
parent
commit
bb3c40f91d

+ 1 - 0
src/main/java/io/renren/common/exception/RRExceptionHandler.java

@@ -37,6 +37,7 @@ public class RRExceptionHandler {
 		R r = new R();
 		r.put("code", e.getCode());
 		r.put("msg", e.getMessage());
+		logger.error(e.getMessage(), e);
 
 		return R.error(e.getCode(), e.getMessage());
 	}

+ 9 - 9
src/main/java/io/renren/dds/DataReaderListenerImpl.java

@@ -48,7 +48,7 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
 //    public Map<String, Position> radarPositionMap = new ConcurrentHashMap<>();
 //    public Map<String, List<String>> coopTeamMap = new ConcurrentHashMap<>();
     public final Map<String, List<io.renren.modules.sys.entity.Message>> messageList = new ConcurrentHashMap<>();
-    public Map<String, AlgTrain> algMap = new ConcurrentHashMap<>();
+    public Map<String, String> algMap = new ConcurrentHashMap<>();
 //    public Map<String, MessageStatus> messageStatusMap = new ConcurrentHashMap<>();
 
     @Resource
@@ -253,16 +253,20 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
 //                        messageStatusMap.put(platformID,m);
                     }
                 }
-                if(mh.value.subject.equals("11")){
+                if(mh.value.subject.equals("110")){
                     // 使用算法文件消息
+                    isRequest = 1;
                     String message = mh.value.text;
                     String[] messageList = message.split("\n");
                     String[] countList = messageList[0].split(" ");
-                    for(int i=1;i<=Integer.parseInt(countList[0]);i++){
+                    for(int i=0;i<messageList.length;i++){
                         String[] s = messageList[i].split(" ");
                         String platformID = s[0];
-                        AlgTrain algTrain = algTrainService.selectByMissName(s[1]);
-                        algMap.put(platformID,algTrain);
+                        if (s[1].equals("无")){continue;}
+//                        AlgTrain algTrain1 = new AlgTrain();
+//                        algTrain1.setMissName(s[1]);
+//                        AlgTrain algTrain = algTrainService.selectByMissName(s[1]);
+                        algMap.put(platformID,s[1]);
 //                        if (algTrain != null){
 //                            try {
 //                                algTrainController.startTraining(String.valueOf(algTrain.getAlgorithmTrainingId()));
@@ -273,10 +277,6 @@ public class DataReaderListenerImpl extends DDS._DataReaderListenerLocalBase {
 //                        }
                     }
                 }
-                if(mh.value.subject.equals("111")){
-                    //收到算法请求信息
-                    isRequest = 1;
-                }
 
                 if (invalid_count == true) {
                     System.out.println("ERROR: Invalid message.count (" + mh.value.count + ")");

+ 9 - 6
src/main/java/io/renren/dds/TestPublisher.java

@@ -8,6 +8,7 @@ package io.renren.dds;/*
 import DDS.*;
 import Messenger.*;
 import OpenDDS.DCPS.*;
+import io.renren.common.exception.RRException;
 import org.omg.CORBA.StringSeqHolder;
 import org.springframework.stereotype.Component;
 
@@ -38,11 +39,13 @@ public class TestPublisher {
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
     private volatile boolean running = true;
     public TestPublisher() throws Exception {
-        String[] s = new String[4];
-        s[0] = "-DCPSBit";
-        s[1] = "0";
-        s[2] = "-DCPSConfigFile";
-        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
+        String[] s = new String[6];
+        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
+        s[1] = "0";                      // Bit值
+        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
+        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
+        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
+        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
         executor.submit(() -> initialize(s)); // 初始化放在后台线程
     }
     public static boolean checkReliable(String[] args) {
@@ -205,7 +208,7 @@ public class TestPublisher {
     }
     public void publishMessage(String from, String subject, String text) {
         if (!isInitialized) {
-            throw new IllegalStateException("Publisher not initialized");
+            throw new RRException("Publisher not initialized");
         }
 
         Message msg = new Message();

+ 8 - 6
src/main/java/io/renren/dds/TestSubscriber.java

@@ -42,12 +42,14 @@ public class TestSubscriber {
         return false;
     }
     public TestSubscriber() throws Exception {
-        String[] s = new String[4];
-        s[0] = "-DCPSBit";
-        s[1] = "0";
-        s[2] = "-DCPSConfigFile";
-        s[3] = "C:/software/OpenDDS-3.16/java/tests/messenger/tcp.ini";
-        executor.submit(() -> initialize(s));
+        String[] s = new String[6];
+        s[0] = "-DCPSBit";               // 启用Bit通信(保持原有配置)
+        s[1] = "0";                      // Bit值
+        s[2] = "-DCPSDefaultDiscovery";  // 指定发现机制为InfoRepo
+        s[3] = "DEFAULT_REPO";           // 使用集中式InfoRepo
+        s[4] = "-DCPSInfoRepo";          // InfoRepo的corbaloc地址
+        s[5] = "corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo"; // 替换为实际IP和端口
+        executor.submit(() -> initialize(s)); // 初始化放在后台线程
     }
     public String getText(){
         return listener.text;

+ 0 - 2
src/main/java/io/renren/modules/dataSet/controller/DynamicSystemController.java

@@ -13,8 +13,6 @@ import java.util.List;
 import java.util.Map;
 
 @RestController
-
-
 @RequestMapping("/dataset/dynamic")
 public class DynamicSystemController {
 

+ 32 - 21
src/main/java/io/renren/modules/sys/controller/algs/algTrainController.java

@@ -271,11 +271,11 @@ public class algTrainController {
         algTrainLogService.save(algTrainLog);
 
         // 产生数据
-        publisher.publishMessage(
-                "1",
-                "11",
-                "算法结果" + algorithmTrainingLogContent
-        );
+//        publisher.publishMessage(
+//                "1",
+//                "11",
+//                "算法结果" + algorithmTrainingLogContent
+//        );
 
 //        System.out.println(algTrainLogService.selectByAlgTrainId(Long.parseLong(algorithmTrainingId)).getAlgorithmTrainingLogContent());
 
@@ -297,9 +297,6 @@ public class algTrainController {
     @Async
     @Scheduled(fixedRate = 6000)//每6秒执行一次,获取消息
     public void checkAlgRequest() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
-        if(!publisher.isInitialized){
-            return;
-        }
         if (subscriber.listener != null && subscriber.listener.isRequest == 0) {
             publisher.publishMessage(
                     "4",
@@ -308,26 +305,40 @@ public class algTrainController {
             );
         }
         publisher.publishMessage(
-                "2",
+                "4",
                 "8",
-                ""
+                "这是心跳信息"
         );
-        if(subscriber.listener.algMap != null) {
-            for(Map.Entry<String, AlgTrain> map : subscriber.listener.algMap.entrySet()){
+
+        if(subscriber.listener != null && subscriber.listener.isRequest == 1) {
+            for(Map.Entry<String, String> map : subscriber.listener.algMap.entrySet()){
                 String key = map.getKey();
-                AlgTrain algTrain = map.getValue();
-                startTraining(String.valueOf(algTrain.getAlgorithmTrainingId()));
+                String missName = map.getValue();
+                AlgTrain algTrain = algTrainService.selectByMissName(missName);
+                Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
+                startTraining(String.valueOf(algorithmTrainingId));
             }
         }
     }
 
-//    @PostConstruct
-//    public void init() {
-//        // 初始化 publisher 和 subscriber
-//        publisher.initialize();
-//        subscriber.initialize(); // 假设有初始化方法
-//        initialized = true;
-//    }
+    @PostConstruct
+    public void init() throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
+        // 初始化时运行
+        if (subscriber.listener.isRequest == 1) {
+            algRun(subscriber.listener.algMap);
+        }
+        subscriber.listener.isRequest = 0;
+    }
+
+    private void algRun(Map<String, String> algMap) throws DockerException, InvalidBucketNameException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InterruptedException, InvalidResponseException, XmlParserException, InternalException {
+        for(Map.Entry<String, String> map : algMap.entrySet()){
+            String key = map.getKey();
+            String missName = map.getValue();
+            AlgTrain algTrain = algTrainService.selectByMissName(missName);
+            Long algorithmTrainingId = algTrain.getAlgorithmTrainingId();
+            startTraining(String.valueOf(algorithmTrainingId));
+        }
+    }
 //
 //    private void resolverDDS(){
 //        subscriber.listener.algMap.get()