瀏覽代碼

测试bug修改,完善断开连接

zlSun 3 月之前
父節點
當前提交
d4d3e81935

+ 21 - 5
src/main/java/com/dc/datachange/core/connection/DDSPublisher.java

@@ -33,6 +33,10 @@ public class DDSPublisher {
     private final ExecutorService matchService = Executors.newSingleThreadExecutor();
     private static final int MAX_RETRIES = 5;  // 最大重试次数
     private int retryCount = 0;                // 当前重试计数
+    private DataWriter dw;
+    private Publisher pub;
+    private DomainParticipant dp;
+    private DomainParticipantFactory dpf;
 
     @PostConstruct
     private void initialize(){
@@ -62,12 +66,12 @@ public class DDSPublisher {
     private void doInitialize() {
         log.info("Start Publisher");
 
-        DomainParticipantFactory dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(params.toArgs()));
+        dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(params.toArgs()));
         if (dpf == null) {
             throw new InitialFailedException("ERROR: Domain Participant Factory not found");
         }
 
-        DomainParticipant dp = dpf.create_participant(4,
+        dp = dpf.create_participant(4,
                 PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
         if (dp == null) {
             throw new InitialFailedException("ERROR: Domain Participant creation failed");
@@ -87,7 +91,7 @@ public class DDSPublisher {
             throw new InitialFailedException("ERROR: Topic creation failed");
         }
 
-        Publisher pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
+        pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
                 DEFAULT_STATUS_MASK.value);
         if (pub == null) {
             throw new InitialFailedException("ERROR: Publisher creation failed");
@@ -135,7 +139,7 @@ public class DDSPublisher {
             qosh.value.reliability.kind =
                     ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
         }
-        DataWriter dw = pub.create_datawriter(top,
+        dw = pub.create_datawriter(top,
                 qosh.value,
                 null,
                 DEFAULT_STATUS_MASK.value);
@@ -198,10 +202,22 @@ public class DDSPublisher {
         isInitialized=false;
         running=false;
         log.info("终止publisher运行");
-//        disconnection();
+        disconnection();
         log.info("关闭publisher连接完成");
     }
 
+    private void disconnection() {
+        if (dw != null) {
+            pub.delete_datawriter(dw);
+        }
+        if (pub != null) {
+            dp.delete_publisher(pub);
+        }
+        if (dp != null) {
+            dpf.delete_participant(dp);
+        }
+    }
+
     @Bean
     @Async
     public void processMessageQueue() {

+ 21 - 4
src/main/java/com/dc/datachange/core/connection/DDSSubscriber.java

@@ -24,6 +24,10 @@ public class DDSSubscriber {
     @Autowired
     private DDSListener listener;
     private volatile boolean running = true;
+    private DomainParticipantFactory dpf;
+    private DomainParticipant dp;
+    private Subscriber sub;
+    private DataReader dr;
 
     @PostConstruct
     public void initialized(){
@@ -32,12 +36,12 @@ public class DDSSubscriber {
 
             log.info("Start Subscriber");
 
-            DomainParticipantFactory dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(params.toArgs()));
+            dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(params.toArgs()));
             if (dpf == null) {
                 System.err.println("ERROR: Domain Participant Factory not found");
                 return;
             }
-            DomainParticipant dp = dpf.create_participant(4,
+            dp = dpf.create_participant(4,
                     PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
             if (dp == null) {
                 System.err.println("ERROR: Domain Participant creation failed");
@@ -59,7 +63,7 @@ public class DDSSubscriber {
                 return;
             }
 
-            Subscriber sub = dp.create_subscriber(SUBSCRIBER_QOS_DEFAULT.get(),
+            sub = dp.create_subscriber(SUBSCRIBER_QOS_DEFAULT.get(),
                     null, DEFAULT_STATUS_MASK.value);
             if (sub == null) {
                 System.err.println("ERROR: Subscriber creation failed");
@@ -106,7 +110,7 @@ public class DDSSubscriber {
             }
             qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
 
-            DataReader dr = sub.create_datareader(top,
+            dr = sub.create_datareader(top,
                     qosh.value,
                     listener,
                     DEFAULT_STATUS_MASK.value);
@@ -155,6 +159,19 @@ public class DDSSubscriber {
     public void preDestroy(){
         running=false;
         log.info("终止subscriber运行");
+        disconnection();
         log.info("关闭subscriber连接完成");
     }
+
+    private void disconnection() {
+        if (dr != null) {
+            sub.delete_datareader(dr);
+        }
+        if (sub != null) {
+            dp.delete_subscriber(sub);
+        }
+        if (dp != null) {
+            dpf.delete_participant(dp);
+        }
+    }
 }

+ 3 - 0
src/main/java/com/dc/datachange/core/entity/message/sendMessage/NetGroupMsg.java

@@ -48,6 +48,9 @@ public class NetGroupMsg extends SendMessage implements ReactiveMsg, SelfMsg {
         List<NetworkGroup> networkGroups = dataManager.getAllData(NetworkGroup.class);
         List<NetworkGroup> interferGroups=networkGroups.stream().filter(NetworkGroup::isInterfer).collect(Collectors.toList());
         List<NetworkGroup> radarGroups=networkGroups.stream().filter(NetworkGroup::isRadar).collect(Collectors.toList());
+        if(interferGroups.size()==0 && radarGroups.size()==0){
+            return null;
+        }
         //第一行
         stringBuilder.append(interferGroups.size()).append(" ")
                 .append(radarGroups.size()).append(" ")

+ 2 - 2
src/main/java/com/dc/datachange/core/exchange/DDSListener.java

@@ -37,11 +37,11 @@ public class DDSListener extends DDS._DataReaderListenerLocalBase{
 
         if (status == RETCODE_OK.value) {
             if (sih.value.valid_data) {
-                //核心处理逻辑
+                //根据配置的类型进行输出。但是或许应该写成反向过滤,而不是正向?
                 if(printConfig.getPrintMsgType().contains(mh.value.subject)){
                     log.info(printMessage(mh.value));
                 }
-
+                //核心处理逻辑
                 ReceivedMessage DDSMessage = executor.getSpecMsg(mh.value);
                 if(!(DDSMessage instanceof ErrorMessage)){
                     executor.invoke(DDSMessage);

+ 5 - 1
src/main/java/com/dc/datachange/core/exchange/SendExecutor.java

@@ -1,5 +1,6 @@
 package com.dc.datachange.core.exchange;
 
+import Messenger.Message;
 import com.dc.datachange.core.connection.DDSPublisher;
 import com.dc.datachange.core.entity.message.ReceivedMessage;
 import com.dc.datachange.core.entity.message.sendMessage.*;
@@ -28,7 +29,10 @@ public class SendExecutor {
         }
     }
     public void sendDDS(ReceivedMessage message){
-        ddsPublisher.sendMessage(msgMap.get(message.getMsgType()).toMessage(message));
+        Message DDSMessage = msgMap.get(message.getMsgType()).toMessage(message);
+        if(DDSMessage != null){
+            ddsPublisher.sendMessage(DDSMessage);
+        }
     }
     public void sendDDS(SelfMsg message){
         ddsPublisher.sendMessage(message.toMessage());

+ 2 - 1
src/main/java/com/dc/datachange/core/exchange/strategy/GroupParamsReqStrategy.java

@@ -1,6 +1,7 @@
 package com.dc.datachange.core.exchange.strategy;
 
 import Messenger.Message;
+import com.dc.datachange.core.common.MsgConstants;
 import com.dc.datachange.core.connection.DDSStatusManger;
 import com.dc.datachange.core.entity.message.ReceivedMessage;
 import com.dc.datachange.core.entity.message.receivedMessage.GroupParamsReq;
@@ -34,6 +35,6 @@ public class GroupParamsReqStrategy implements MessageStrategy{
 
     @Override
     public String getMsgType() {
-        return null;
+        return MsgConstants.GROUP_PARAMS_REQ;
     }
 }

+ 2 - 2
src/main/java/com/dc/datachange/utils/MessageUtils.java

@@ -46,8 +46,8 @@ public class MessageUtils {
 
     public static String printMessage(Message message){
         return "\n"
-                +"Message: subject    = " + MsgConstants.MSG_MAP.get(message.subject)+" "+message.subject+"\n"
-                +"         from       = " + PlatConstants.PLAT_MAP.get(message.from) + " "+ message.from+"\n"
+                +"Message: subject    = " + MsgConstants.MSG_MAP.get(message.subject)+"|"+message.subject+"\n"
+                +"         from       = " + PlatConstants.PLAT_MAP.get(message.from) + "|"+ message.from+"\n"
                 +"         text       = " + message.text+"\n";
     }
 }

+ 2 - 2
src/main/resources/application.yml

@@ -18,7 +18,7 @@ dds:
   #  不知道
     DCPSBit: 0
   #  DDS实例位置
-    local: true
+    local: false
     DCPSConfigFile: D:\rocket\OpenDDS-3.16\java\tests\messenger\tcp.ini
     DCPSDefaultDiscovery: DEFAULT_REPO
     DCPSInfoRepo: corbaloc:iiop:10.195.84.22:12345/DCPSInfoRepo
@@ -38,7 +38,7 @@ dds:
       - "5" #雷达脉冲信息
       - "6" #电子干扰信息
       - "7" #综合情报信息
-      - "8" #心跳
+#      - "8" #心跳
       - "9" #平台参数单条数据
       - "10" #平台参数集合数据
       - "101" #平台参数集合请求