|
@@ -8,6 +8,7 @@ import Messenger.MessageTypeSupportImpl;
|
|
|
import OpenDDS.DCPS.DEFAULT_STATUS_MASK;
|
|
|
import OpenDDS.DCPS.TheParticipantFactory;
|
|
|
import com.dc.datachange.core.common.ConnectionParams;
|
|
|
+import com.dc.datachange.exception.InitialFailedException;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.omg.CORBA.StringSeqHolder;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -30,122 +31,136 @@ public class DDSPublisher {
|
|
|
private int instanceHandle;
|
|
|
private MessageDataWriter mdw;
|
|
|
private final ExecutorService matchService = Executors.newSingleThreadExecutor();
|
|
|
+ private static final int MAX_RETRIES = 5; // 最大重试次数
|
|
|
+ private int retryCount = 0; // 当前重试计数
|
|
|
|
|
|
@PostConstruct
|
|
|
private void initialize(){
|
|
|
- try {
|
|
|
- // ... 原有初始化代码到创建DataWriter为止...
|
|
|
- log.info("Start Publisher");
|
|
|
-
|
|
|
- DomainParticipantFactory dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(params.toArgs()));
|
|
|
- if (dpf == null) {
|
|
|
- log.error("ERROR: Domain Participant Factory not found");
|
|
|
- return;
|
|
|
- }
|
|
|
- DomainParticipant dp = dpf.create_participant(4,
|
|
|
- PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
|
|
|
- if (dp == null) {
|
|
|
- log.error("ERROR: Domain Participant creation failed");
|
|
|
+ while (retryCount <= MAX_RETRIES) {
|
|
|
+ try {
|
|
|
+ // 核心初始化逻辑封装
|
|
|
+ doInitialize();
|
|
|
+ isInitialized = true;
|
|
|
+ log.info("Publisher initialization succeeded after {} retries", retryCount);
|
|
|
return;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e.getMessage());
|
|
|
+ retryCount++;
|
|
|
+ if (retryCount > MAX_RETRIES) {
|
|
|
+ log.error("Max retries {} exceeded, initialization failed", MAX_RETRIES);
|
|
|
+ // 抛出Bean初始化异常
|
|
|
+ throw new IllegalStateException("DDS Publisher initialization failed after " + MAX_RETRIES + " retries", e);
|
|
|
+ }
|
|
|
+ log.warn("Initialization failed, retrying ({}/{})", retryCount, MAX_RETRIES);
|
|
|
+ // 可添加重试间隔(如指数退避)
|
|
|
+ try { Thread.sleep(1000L * retryCount); }
|
|
|
+ catch (InterruptedException ex) { Thread.currentThread().interrupt(); }
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- MessageTypeSupportImpl servant = new MessageTypeSupportImpl();
|
|
|
- if (servant.register_type(dp, "") != RETCODE_OK.value) {
|
|
|
- log.error("ERROR: register_type failed");
|
|
|
- return;
|
|
|
- }
|
|
|
+ private void doInitialize() {
|
|
|
+ log.info("Start Publisher");
|
|
|
|
|
|
- Topic top = dp.create_topic("Movie Discussion List",
|
|
|
- servant.get_type_name(),
|
|
|
- TOPIC_QOS_DEFAULT.get(),
|
|
|
- null,
|
|
|
- DEFAULT_STATUS_MASK.value);
|
|
|
- if (top == null) {
|
|
|
- log.error("ERROR: Topic creation failed");
|
|
|
- return;
|
|
|
- }
|
|
|
+ DomainParticipantFactory dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(params.toArgs()));
|
|
|
+ if (dpf == null) {
|
|
|
+ throw new InitialFailedException("ERROR: Domain Participant Factory not found");
|
|
|
+ }
|
|
|
|
|
|
- Publisher pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
|
|
|
- DEFAULT_STATUS_MASK.value);
|
|
|
- if (pub == null) {
|
|
|
- log.error("ERROR: Publisher creation failed");
|
|
|
- return;
|
|
|
- }
|
|
|
+ DomainParticipant dp = dpf.create_participant(4,
|
|
|
+ PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
|
|
|
+ if (dp == null) {
|
|
|
+ throw new InitialFailedException("ERROR: Domain Participant creation failed");
|
|
|
+ }
|
|
|
|
|
|
- // Use the default transport configuration (do nothing)
|
|
|
-
|
|
|
- DataWriterQos dw_qos = new DataWriterQos();
|
|
|
- dw_qos.durability = new DurabilityQosPolicy();
|
|
|
- dw_qos.durability.kind = DurabilityQosPolicyKind.from_int(0);
|
|
|
- dw_qos.durability_service = new DurabilityServiceQosPolicy();
|
|
|
- dw_qos.durability_service.history_kind = HistoryQosPolicyKind.from_int(0);
|
|
|
- dw_qos.durability_service.service_cleanup_delay = new Duration_t();
|
|
|
- dw_qos.deadline = new DeadlineQosPolicy();
|
|
|
- dw_qos.deadline.period = new Duration_t();
|
|
|
- dw_qos.latency_budget = new LatencyBudgetQosPolicy();
|
|
|
- dw_qos.latency_budget.duration = new Duration_t();
|
|
|
- dw_qos.liveliness = new LivelinessQosPolicy();
|
|
|
- dw_qos.liveliness.kind = LivelinessQosPolicyKind.from_int(0);
|
|
|
- dw_qos.liveliness.lease_duration = new Duration_t();
|
|
|
- dw_qos.reliability = new ReliabilityQosPolicy();
|
|
|
- dw_qos.reliability.kind = ReliabilityQosPolicyKind.from_int(0);
|
|
|
- dw_qos.reliability.max_blocking_time = new Duration_t();
|
|
|
- dw_qos.destination_order = new DestinationOrderQosPolicy();
|
|
|
- dw_qos.destination_order.kind = DestinationOrderQosPolicyKind.from_int(0);
|
|
|
- dw_qos.history = new HistoryQosPolicy();
|
|
|
- dw_qos.history.kind = HistoryQosPolicyKind.from_int(0);
|
|
|
- dw_qos.resource_limits = new ResourceLimitsQosPolicy();
|
|
|
- dw_qos.transport_priority = new TransportPriorityQosPolicy();
|
|
|
- dw_qos.lifespan = new LifespanQosPolicy();
|
|
|
- dw_qos.lifespan.duration = new Duration_t();
|
|
|
- dw_qos.user_data = new UserDataQosPolicy();
|
|
|
- dw_qos.user_data.value = new byte[0];
|
|
|
- dw_qos.ownership = new OwnershipQosPolicy();
|
|
|
- dw_qos.ownership.kind = OwnershipQosPolicyKind.from_int(0);
|
|
|
- dw_qos.ownership_strength = new OwnershipStrengthQosPolicy();
|
|
|
- dw_qos.writer_data_lifecycle = new WriterDataLifecycleQosPolicy();
|
|
|
- dw_qos.representation = new DataRepresentationQosPolicy();
|
|
|
- dw_qos.representation.value = new short[0];
|
|
|
-
|
|
|
- DataWriterQosHolder qosh = new DataWriterQosHolder(dw_qos);
|
|
|
- pub.get_default_datawriter_qos(qosh);
|
|
|
- qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
|
|
|
- if (params.isReliable()) {
|
|
|
- qosh.value.reliability.kind =
|
|
|
- ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
|
|
|
- }
|
|
|
- DataWriter dw = pub.create_datawriter(top,
|
|
|
- qosh.value,
|
|
|
- null,
|
|
|
- DEFAULT_STATUS_MASK.value);
|
|
|
- if (dw == null) {
|
|
|
- log.error("ERROR: DataWriter creation failed");
|
|
|
- return;
|
|
|
- }
|
|
|
- log.info("Publisher Created DataWriter");
|
|
|
+ MessageTypeSupportImpl servant = new MessageTypeSupportImpl();
|
|
|
+ if (servant.register_type(dp, "") != RETCODE_OK.value) {
|
|
|
+ throw new InitialFailedException("ERROR: register_type failed");
|
|
|
+ }
|
|
|
+
|
|
|
+ Topic top = dp.create_topic("Movie Discussion List",
|
|
|
+ servant.get_type_name(),
|
|
|
+ TOPIC_QOS_DEFAULT.get(),
|
|
|
+ null,
|
|
|
+ DEFAULT_STATUS_MASK.value);
|
|
|
+ if (top == null) {
|
|
|
+ throw new InitialFailedException("ERROR: Topic creation failed");
|
|
|
+ }
|
|
|
|
|
|
- StatusCondition sc = dw.get_statuscondition();
|
|
|
- sc.set_enabled_statuses(PUBLICATION_MATCHED_STATUS.value);
|
|
|
- WaitSet ws = new WaitSet();
|
|
|
- ws.attach_condition(sc);
|
|
|
+ Publisher pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
|
|
|
+ DEFAULT_STATUS_MASK.value);
|
|
|
+ if (pub == null) {
|
|
|
+ throw new InitialFailedException("ERROR: Publisher creation failed");
|
|
|
+ }
|
|
|
|
|
|
- PublicationMatchedStatusHolder matched =
|
|
|
- new PublicationMatchedStatusHolder(new PublicationMatchedStatus());
|
|
|
- ws.detach_condition(sc);
|
|
|
- mdw = MessageDataWriterHelper.narrow(dw);
|
|
|
+ // Use the default transport configuration (do nothing)
|
|
|
+
|
|
|
+ DataWriterQos dw_qos = new DataWriterQos();
|
|
|
+ dw_qos.durability = new DurabilityQosPolicy();
|
|
|
+ dw_qos.durability.kind = DurabilityQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.durability_service = new DurabilityServiceQosPolicy();
|
|
|
+ dw_qos.durability_service.history_kind = HistoryQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.durability_service.service_cleanup_delay = new Duration_t();
|
|
|
+ dw_qos.deadline = new DeadlineQosPolicy();
|
|
|
+ dw_qos.deadline.period = new Duration_t();
|
|
|
+ dw_qos.latency_budget = new LatencyBudgetQosPolicy();
|
|
|
+ dw_qos.latency_budget.duration = new Duration_t();
|
|
|
+ dw_qos.liveliness = new LivelinessQosPolicy();
|
|
|
+ dw_qos.liveliness.kind = LivelinessQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.liveliness.lease_duration = new Duration_t();
|
|
|
+ dw_qos.reliability = new ReliabilityQosPolicy();
|
|
|
+ dw_qos.reliability.kind = ReliabilityQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.reliability.max_blocking_time = new Duration_t();
|
|
|
+ dw_qos.destination_order = new DestinationOrderQosPolicy();
|
|
|
+ dw_qos.destination_order.kind = DestinationOrderQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.history = new HistoryQosPolicy();
|
|
|
+ dw_qos.history.kind = HistoryQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.resource_limits = new ResourceLimitsQosPolicy();
|
|
|
+ dw_qos.transport_priority = new TransportPriorityQosPolicy();
|
|
|
+ dw_qos.lifespan = new LifespanQosPolicy();
|
|
|
+ dw_qos.lifespan.duration = new Duration_t();
|
|
|
+ dw_qos.user_data = new UserDataQosPolicy();
|
|
|
+ dw_qos.user_data.value = new byte[0];
|
|
|
+ dw_qos.ownership = new OwnershipQosPolicy();
|
|
|
+ dw_qos.ownership.kind = OwnershipQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.ownership_strength = new OwnershipStrengthQosPolicy();
|
|
|
+ dw_qos.writer_data_lifecycle = new WriterDataLifecycleQosPolicy();
|
|
|
+ dw_qos.representation = new DataRepresentationQosPolicy();
|
|
|
+ dw_qos.representation.value = new short[0];
|
|
|
+
|
|
|
+ DataWriterQosHolder qosh = new DataWriterQosHolder(dw_qos);
|
|
|
+ pub.get_default_datawriter_qos(qosh);
|
|
|
+ qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
|
|
|
+ if (params.isReliable()) {
|
|
|
+ qosh.value.reliability.kind =
|
|
|
+ ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
|
|
|
+ }
|
|
|
+ DataWriter dw = pub.create_datawriter(top,
|
|
|
+ qosh.value,
|
|
|
+ null,
|
|
|
+ DEFAULT_STATUS_MASK.value);
|
|
|
+ if (dw == null) {
|
|
|
+ throw new InitialFailedException("ERROR: DataWriter creation failed");
|
|
|
+ }
|
|
|
+ log.info("Publisher Created DataWriter");
|
|
|
|
|
|
- matchService.submit(()->waitMatch(dw,ws,matched));
|
|
|
+ StatusCondition sc = dw.get_statuscondition();
|
|
|
+ sc.set_enabled_statuses(PUBLICATION_MATCHED_STATUS.value);
|
|
|
+ WaitSet ws = new WaitSet();
|
|
|
+ ws.attach_condition(sc);
|
|
|
|
|
|
- Message template = new Message();
|
|
|
- template.subject_id = 99;
|
|
|
- instanceHandle = mdw.register_instance(template);
|
|
|
-// executorService.submit(this::processMessageQueue);
|
|
|
- isInitialized = true;
|
|
|
+ PublicationMatchedStatusHolder matched =
|
|
|
+ new PublicationMatchedStatusHolder(new PublicationMatchedStatus());
|
|
|
+ ws.detach_condition(sc);
|
|
|
+ mdw = MessageDataWriterHelper.narrow(dw);
|
|
|
|
|
|
- } catch (Exception e) {
|
|
|
- log.error(e.getMessage());
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
+ matchService.submit(() -> waitMatch(dw, ws, matched));
|
|
|
+
|
|
|
+ Message template = new Message();
|
|
|
+ template.subject_id = 99;
|
|
|
+ instanceHandle = mdw.register_instance(template);
|
|
|
+// executorService.submit(this::processMessageQueue);
|
|
|
+ isInitialized = true;
|
|
|
}
|
|
|
|
|
|
private void waitMatch(DataWriter dw, WaitSet ws, PublicationMatchedStatusHolder matched) {
|