|
@@ -0,0 +1,248 @@
|
|
|
+package com.orangeforms.webadmin.dds;/*
|
|
|
+ *
|
|
|
+ *
|
|
|
+ * Distributed under the OpenDDS License.
|
|
|
+ * See: http://www.opendds.org/license.html
|
|
|
+ */
|
|
|
+
|
|
|
+import DDS.*;
|
|
|
+import OpenDDS.DCPS.*;
|
|
|
+import org.omg.CORBA.StringSeqHolder;
|
|
|
+import Messenger.*;
|
|
|
+
|
|
|
+import java.util.concurrent.*;
|
|
|
+
|
|
|
+public class TestPublisher {
|
|
|
+
|
|
|
+ private static final int N_MSGS = 20;
|
|
|
+ private int count = 0;
|
|
|
+ private DomainParticipantFactory dpf;
|
|
|
+ private DomainParticipant dp;
|
|
|
+ private MessageTypeSupportImpl servant;
|
|
|
+ private Topic top;
|
|
|
+ private Publisher pub;
|
|
|
+ private DataWriterQos dw_qos;
|
|
|
+ private DataWriterQosHolder qosh;
|
|
|
+ private DataReaderListenerImpl listener;
|
|
|
+ private DataWriter dw ;
|
|
|
+ private StatusCondition sc;
|
|
|
+ private WaitSet ws;
|
|
|
+ private PublicationMatchedStatusHolder matched;
|
|
|
+ private Duration_t timeout;
|
|
|
+ private MessageDataWriter mdw;
|
|
|
+ private int instanceHandle;
|
|
|
+ private BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
|
|
|
+ private volatile boolean isInitialized = false;
|
|
|
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
+ private volatile boolean running = true;
|
|
|
+ public TestPublisher() throws Exception {
|
|
|
+// String[] s = new String[4];
|
|
|
+// s[0] = "-DCPSBit";
|
|
|
+// s[1] = "0";
|
|
|
+// s[2] = "-DCPSConfigFile";
|
|
|
+// s[3] = "D:/dds/open-dds_3.16/OpenDDS-3.16/OpenDDS-3.16/java/tests/messenger/tcp.ini";
|
|
|
+ String[] s = new String[6];
|
|
|
+ s[0] = "-DCPSBit"; // 启用Bit通信(保持原有配置)
|
|
|
+ s[1] = "0"; // Bit值
|
|
|
+ s[2] = "-DCPSDefaultDiscovery"; // 指定发现机制为InfoRepo
|
|
|
+ s[3] = "DEFAULT_REPO"; // 使用集中式InfoRepo
|
|
|
+ s[4] = "-DCPSInfoRepo"; // InfoRepo的corbaloc地址
|
|
|
+ s[5] = "corbaloc:iiop:192.168.5.81:12345/DCPSInfoRepo"; // 替换为实际IP和端口
|
|
|
+ executor.submit(() -> initialize(s)); // 初始化放在后台线程
|
|
|
+ }
|
|
|
+ public static boolean checkReliable(String[] args) {
|
|
|
+ for (int i = 0; i < args.length; ++i) {
|
|
|
+ if (args[i].equals("-r")) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static boolean checkWaitForAcks(String[] args) {
|
|
|
+ for (int i = 0; i < args.length; ++i) {
|
|
|
+ if (args[i].equals("-w")) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initialize(String[] args) {
|
|
|
+ try {
|
|
|
+ // ... 原有初始化代码到创建DataWriter为止...
|
|
|
+ System.out.println("Start Publisher");
|
|
|
+ boolean reliable = checkReliable(args);
|
|
|
+ boolean waitForAcks = checkWaitForAcks(args);
|
|
|
+
|
|
|
+ dpf =
|
|
|
+ TheParticipantFactory.WithArgs(new StringSeqHolder(args));
|
|
|
+ if (dpf == null) {
|
|
|
+ System.err.println("ERROR: Domain Participant Factory not found");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ dp = dpf.create_participant(4,
|
|
|
+ PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
|
|
|
+ if (dp == null) {
|
|
|
+ System.err.println("ERROR: Domain Participant creation failed");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ servant = new MessageTypeSupportImpl();
|
|
|
+ if (servant.register_type(dp, "") != RETCODE_OK.value) {
|
|
|
+ System.err.println("ERROR: register_type failed");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ top = dp.create_topic("Movie Discussion List",
|
|
|
+ servant.get_type_name(),
|
|
|
+ TOPIC_QOS_DEFAULT.get(),
|
|
|
+ null,
|
|
|
+ DEFAULT_STATUS_MASK.value);
|
|
|
+ if (top == null) {
|
|
|
+ System.err.println("ERROR: Topic creation failed");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
|
|
|
+ DEFAULT_STATUS_MASK.value);
|
|
|
+ if (pub == null) {
|
|
|
+ System.err.println("ERROR: Publisher creation failed");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Use the default transport configuration (do nothing)
|
|
|
+
|
|
|
+ dw_qos = new DataWriterQos();
|
|
|
+ dw_qos.durability = new DurabilityQosPolicy();
|
|
|
+ dw_qos.durability.kind = DurabilityQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.durability_service = new DurabilityServiceQosPolicy();
|
|
|
+ dw_qos.durability_service.history_kind = HistoryQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.durability_service.service_cleanup_delay = new Duration_t();
|
|
|
+ dw_qos.deadline = new DeadlineQosPolicy();
|
|
|
+ dw_qos.deadline.period = new Duration_t();
|
|
|
+ dw_qos.latency_budget = new LatencyBudgetQosPolicy();
|
|
|
+ dw_qos.latency_budget.duration = new Duration_t();
|
|
|
+ dw_qos.liveliness = new LivelinessQosPolicy();
|
|
|
+ dw_qos.liveliness.kind = LivelinessQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.liveliness.lease_duration = new Duration_t();
|
|
|
+ dw_qos.reliability = new ReliabilityQosPolicy();
|
|
|
+ dw_qos.reliability.kind = ReliabilityQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.reliability.max_blocking_time = new Duration_t();
|
|
|
+ dw_qos.destination_order = new DestinationOrderQosPolicy();
|
|
|
+ dw_qos.destination_order.kind = DestinationOrderQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.history = new HistoryQosPolicy();
|
|
|
+ dw_qos.history.kind = HistoryQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.resource_limits = new ResourceLimitsQosPolicy();
|
|
|
+ dw_qos.transport_priority = new TransportPriorityQosPolicy();
|
|
|
+ dw_qos.lifespan = new LifespanQosPolicy();
|
|
|
+ dw_qos.lifespan.duration = new Duration_t();
|
|
|
+ dw_qos.user_data = new UserDataQosPolicy();
|
|
|
+ dw_qos.user_data.value = new byte[0];
|
|
|
+ dw_qos.ownership = new OwnershipQosPolicy();
|
|
|
+ dw_qos.ownership.kind = OwnershipQosPolicyKind.from_int(0);
|
|
|
+ dw_qos.ownership_strength = new OwnershipStrengthQosPolicy();
|
|
|
+ dw_qos.writer_data_lifecycle = new WriterDataLifecycleQosPolicy();
|
|
|
+ dw_qos.representation = new DataRepresentationQosPolicy();
|
|
|
+ dw_qos.representation.value = new short[0];
|
|
|
+
|
|
|
+ qosh = new DataWriterQosHolder(dw_qos);
|
|
|
+ pub.get_default_datawriter_qos(qosh);
|
|
|
+ qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
|
|
|
+ if (reliable) {
|
|
|
+ qosh.value.reliability.kind =
|
|
|
+ ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
|
|
|
+ }
|
|
|
+ dw = pub.create_datawriter(top,
|
|
|
+ qosh.value,
|
|
|
+ null,
|
|
|
+ DEFAULT_STATUS_MASK.value);
|
|
|
+ if (dw == null) {
|
|
|
+ System.err.println("ERROR: DataWriter creation failed");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ System.out.println("Publisher Created DataWriter");
|
|
|
+
|
|
|
+ sc = dw.get_statuscondition();
|
|
|
+ sc.set_enabled_statuses(PUBLICATION_MATCHED_STATUS.value);
|
|
|
+ ws = new WaitSet();
|
|
|
+ ws.attach_condition(sc);
|
|
|
+ matched =
|
|
|
+ new PublicationMatchedStatusHolder(new PublicationMatchedStatus());
|
|
|
+ timeout = new Duration_t(DURATION_INFINITE_SEC.value,
|
|
|
+ DURATION_INFINITE_NSEC.value);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ ws.detach_condition(sc);
|
|
|
+
|
|
|
+ mdw = MessageDataWriterHelper.narrow(dw);
|
|
|
+ // 等待匹配的逻辑保持原样
|
|
|
+ while (running) {
|
|
|
+ System.out.println("等待match中");
|
|
|
+ final int result = dw.get_publication_matched_status(matched);
|
|
|
+ if (result != RETCODE_OK.value) {
|
|
|
+ System.err.println("ERROR: get_publication_matched_status()" +
|
|
|
+ "failed.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 原有匹配等待逻辑...
|
|
|
+ if (matched.value.current_count >= 1) {
|
|
|
+ System.out.println("Publisher Matched");
|
|
|
+ isInitialized = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ // 修改等待时间为有限等待
|
|
|
+ Duration_t timeout = new Duration_t(1, 0);
|
|
|
+ ConditionSeqHolder cond = new ConditionSeqHolder(new Condition[]{});
|
|
|
+ ws.wait(cond, timeout);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 创建消息模板
|
|
|
+ Message template = new Message();
|
|
|
+ template.subject_id = 99;
|
|
|
+ instanceHandle = mdw.register_instance(template);
|
|
|
+
|
|
|
+ // 启动消息处理线程
|
|
|
+ executor.submit(this::processMessageQueue);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 异常处理...
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public void publishMessage(String from, String subject, String text) {
|
|
|
+ if (!isInitialized) {
|
|
|
+ throw new IllegalStateException("Publisher not initialized");
|
|
|
+ }
|
|
|
+
|
|
|
+ Message msg = new Message();
|
|
|
+ msg.subject_id = 99;
|
|
|
+ msg.from = from;
|
|
|
+ msg.subject = subject;
|
|
|
+ msg.text = text;
|
|
|
+ msg.count = count++; // 根据需求调整
|
|
|
+
|
|
|
+ messageQueue.offer(msg); // 非阻塞式添加
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processMessageQueue() {
|
|
|
+ try {
|
|
|
+ while (running) {
|
|
|
+ Message msg = messageQueue.poll(1, TimeUnit.SECONDS); // 带超时的获取
|
|
|
+ if (msg != null) {
|
|
|
+ int ret;
|
|
|
+ do {
|
|
|
+ ret = mdw.write(msg, instanceHandle);
|
|
|
+ if (ret != RETCODE_OK.value) {
|
|
|
+ Thread.sleep(100); // 失败时短暂等待
|
|
|
+ }
|
|
|
+ } while (running && ret == RETCODE_TIMEOUT.value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|