service_20250421113900.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import json
  2. import uhd
  3. import numpy as np
  4. import threading
  5. from dto.response_dto import ResponseDTO
  6. from model.surveillance_radar import SurveillanceRadar
  7. from model.jammer_radar import JammerRadar
  8. from algo.jamming_signal_algo import JammingSignalAlgo
  9. # 定义一组通道常量
  10. CHANNEL_1 = 0
  11. CHANNEL_2 = 1
  12. CHANNEL_3 = 2
  13. CHANNEL_4 = 3
  14. # 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量
  15. JAMMING_POLICY = {
  16. "噪声调频", "噪声调幅", "噪声直放", "速度多假目标", "距离多假目标"
  17. }
  18. ANTI_JAMMING_POLICY = {
  19. "频率捷变", "波形捷变", "自适应极化滤波"
  20. }
  21. class Service:
  22. usrp = None # 静态类变量
  23. status = 0 # 静态类变量
  24. surveillance_radar = None # 侦查雷达实例
  25. jammer_radar = None # 干扰雷达实例
  26. _rlock = threading.RLock() # 可重入锁
  27. @staticmethod
  28. def initialize_usrp():
  29. with Service._rlock:
  30. Service.get_sdr_status()
  31. if Service.status == 1:
  32. print('USRP设备已经初始化')
  33. return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
  34. try:
  35. Service.usrp = uhd.usrp.MultiUSRP()
  36. print('------SDR Devices initialize success!------')
  37. Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx=CHANNEL_1, tx=CHANNEL_1)
  38. Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2)
  39. Service.status = 1
  40. return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
  41. except Exception as e:
  42. print('SDR设备异常', e)
  43. Service.status = 0 # 初始化失败,状态置为0
  44. return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
  45. @staticmethod
  46. def get_sdr_status():
  47. with Service._rlock:
  48. if Service.status == 0:
  49. return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json()
  50. try:
  51. samples = Service.usrp.recv_num_samps(1, 100e6, 1e6, [0], 50)
  52. if samples is None:
  53. Service.status = 0 # 获取状态失败,状态置为0
  54. else:
  55. Service.status = 1
  56. return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
  57. except Exception as e:
  58. print('SDR设备异常', e)
  59. Service.status = 0 # 获取状态失败,状态置为0
  60. Service.usrp = None # 重置USRP对象
  61. return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
  62. @staticmethod
  63. def data(payload):
  64. # 使用已初始化的雷达实例
  65. surveillance_radar = Service.surveillance_radar
  66. jammer_radar = Service.jammer_radar
  67. jamming_policy = payload['jamming_policy']
  68. anti_jamming_policy = payload['anti_jamming_policy']
  69. # 判断策略是否合法
  70. if jamming_policy not in JAMMING_POLICY or anti_jamming_policy not in ANTI_JAMMING_POLICY:
  71. return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json()
  72. # 打印对应策略
  73. print(' jamming_policy:', jamming_policy)
  74. print(' anti_jamming_policy:', anti_jamming_policy)
  75. # 根据策略选择算法
  76. jam_algorithm = Service._get_algorithm_mapping(jamming_policy)
  77. anti_jam_algorithm = Service._get_algorithm_mapping(anti_jamming_policy)
  78. # 生成实际干扰信号和抗干扰处理
  79. surveillance_signal = surveillance_radar.execute_jamming(
  80. algorithm=jam_algorithm,
  81. bandwidth=100e6,
  82. duration=0.1,
  83. sample_rate=1e6
  84. ) if jam_algorithm else np.random.randn(100)
  85. processed_signal = jammer_radar.execute_countermeasures(
  86. rx_signal=surveillance_signal,
  87. algorithm=anti_jam_algorithm,
  88. sample_rate=1e6
  89. ) if anti_jam_algorithm else surveillance_signal
  90. # 返回结果
  91. return ResponseDTO.SUCCESS({"jamming_signal": surveillance_signal.tolist(),
  92. "anti_jamming_signal": processed_signal.tolist(),
  93. "status": Service.status}).to_json()
  94. @staticmethod
  95. def send():
  96. try:
  97. # 设置中心频率、采样率和增益
  98. center_freq = 100e6 # 2.4 GHz
  99. sample_rate = 1e6 # 1 MS/s
  100. duration = 10 # 以秒为单位
  101. gain = 20 # [dB] 建议一开始设置小一点,按照实际情况调整
  102. # 生成发送信号
  103. num_samples = 100
  104. tx_signal = np.random.randn(num_samples) + 0.1j * np.random.randn(num_samples) # 修复部分
  105. # 发送信号
  106. Service.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [0], gain)
  107. # 接收信号
  108. rx_signal = Service.usrp.recv_num_samps(num_samples, center_freq, sample_rate)
  109. print('信号已发送:')
  110. print(rx_signal)
  111. print('信号已接收')
  112. except Exception as e:
  113. print('发送或接收信号异常', e)
  114. Service.status = 0 # 发送或接收信号失败,状态置为0
  115. Service.usrp = None # 重置USRP对象
  116. @staticmethod
  117. def _get_algorithm_mapping(policy: str) -> callable:
  118. # 策略与算法映射关系
  119. algorithm_map = {
  120. "噪声调频": JammingSignalAlgo.generate_noise_jam,
  121. "噪声调幅": JammingSignalAlgo.generate_amplitude_noise_jam,
  122. "噪声直放": JammingSignalAlgo.generate_noise_jam,
  123. "速度多假目标": JammingSignalAlgo.generate_velocity_deceptive_jam,
  124. "距离多假目标": JammingSignalAlgo.generate_deceptive_jam,
  125. "频率捷变": AntiJammingSignalAlgo.time_frequency_filter,
  126. "波形捷变": AntiJammingSignalAlgo.adaptive_filter,
  127. "自适应极化滤波": AntiJammingSignalAlgo.polarization_filter
  128. }
  129. return algorithm_map.get(policy)
  130. # main方法
  131. if __name__ == '__main__':
  132. Service.initialize_usrp()
  133. Service.data({"jamming_policy": "噪声调频", "anti_jamming_policy": "频率捷变"})