| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 | 
							- import json
 
- import uhd
 
- import numpy as np
 
- import threading
 
- from dto.response_dto import ResponseDTO
 
- from model.surveillance_radar import SurveillanceRadar
 
- from model.jammer_radar import JammerRadar
 
- from algo.jamming_signal_algo import JammingSignalAlgo
 
- from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
 
- # 定义一组通道常量
 
- CHANNEL_1 = 0
 
- CHANNEL_2 = 1
 
- CHANNEL_3 = 2
 
- CHANNEL_4 = 3
 
- # 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量
 
- JAMMING_POLICY = {
 
-     "噪声调频", "噪声调幅", "噪声直放", "速度多假目标", "距离多假目标"
 
- }
 
- ANTI_JAMMING_POLICY = {
 
-     "频率捷变", "波形捷变", # 作用于发射端
 
-      "自适应极化滤波" # 作用于接收端
 
- }
 
- class Service:
 
-     usrp = None  # 静态类变量
 
-     status = 0  # 静态类变量
 
-     surveillance_radar = None  # 侦查雷达实例
 
-     jammer_radar = None  # 干扰雷达实例
 
-     _rlock = threading.RLock()  # 可重入锁
 
-     @staticmethod
 
-     def initialize_usrp():
 
-         with Service._rlock:
 
-             Service.get_sdr_status()
 
-             if Service.status == 1:
 
-                 print('USRP设备已经初始化')
 
-                 return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
 
-             try:
 
-                 Service.usrp = uhd.usrp.MultiUSRP()
 
-                 print('------SDR Devices initialize success!------')
 
-                 Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx=CHANNEL_1, tx=CHANNEL_1)
 
-                 Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2)
 
-                 Service.status = 1
 
-                 return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
 
-             except Exception as e:
 
-                 print('SDR设备异常', e)
 
-                 Service.status = 0  # 初始化失败,状态置为0
 
-                 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
 
-     @staticmethod
 
-     def get_sdr_status():
 
-         with Service._rlock:
 
-             if Service.status == 0:
 
-                 return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json()
 
-             try:
 
-                 samples = Service.usrp.recv_num_samps(1, 100e6, 1e6, [0], 50)
 
-                 if samples is None:
 
-                     Service.status = 0  # 获取状态失败,状态置为0
 
-                 else:
 
-                     Service.status = 1
 
-                 return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
 
-             except Exception as e:
 
-                 print('SDR设备异常', e)
 
-                 Service.status = 0  # 获取状态失败,状态置为0
 
-                 Service.usrp = None  # 重置USRP对象
 
-                 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
 
-     @staticmethod
 
-     def data(payload):
 
-         # 使用已初始化的雷达实例
 
-         surveillance_radar = Service.surveillance_radar
 
-         jammer_radar = Service.jammer_radar
 
-         
 
-         jamming_policy = payload.get('jamming_policy')
 
-         anti_jamming_policy = payload.get('anti_jamming_policy')
 
-         # 判断策略是否合法
 
-         if jamming_policy not in JAMMING_POLICY or anti_jamming_policy not in ANTI_JAMMING_POLICY:
 
-             return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json()
 
-         # 打印对应策略
 
-         print(' jamming_policy:', jamming_policy)
 
-         print(' anti_jamming_policy:', anti_jamming_policy)
 
-         # 根据策略选择算法
 
-         jam_algorithm = Service._get_jamming_algorithm_method(jamming_policy)
 
-         anti_jam_algorithm = Service._get_anti_jamming_algorithm(anti_jamming_policy, 'rx')
 
-         # 1. 应用发射端抗干扰算法(波形/频率捷变)
 
-         tx_original = np.random.randn(100)
 
-         if anti_jamming_policy in {"波形捷变", "频率捷变"}:
 
-             tx_processed = surveillance_radar.process_transmit_signal(
 
-                 tx_signal=tx_original,
 
-                 algorithm=Service._get_tx_algorithm_method(anti_jamming_policy),
 
-                 sample_rate=1e6
 
-             )
 
-         else:
 
-             tx_processed = tx_original
 
-         
 
-         # 2. 发送经过处理的发射信号
 
-         tx_signal = surveillance_radar.send_signal(
 
-             tx_signal=tx_processed,
 
-             duration=0.1,
 
-             center_freq=100e6,
 
-             sample_rate=1e6,
 
-             gain=20
 
-         )
 
-         
 
-         # 3. 独立接收原始信号
 
-         rx_original = surveillance_radar.recv_signal(
 
-             num_samples=100,
 
-             sample_rate=1e6,
 
-             center_freq=100e6
 
-         )
 
-         
 
-         # 4. 干扰雷达生成干扰
 
-         jammed_signal = jammer_radar.execute_jamming(
 
-             algorithm=jam_algorithm,
 
-             bandwidth=100e6,
 
-             sample_rate=1e6,
 
-             duration=0.1,
 
-             gain=20
 
-         ) if jam_algorithm else tx_signal
 
-         
 
-         # 接收端抗干扰处理(仅限接收端策略)
 
-         processed_signal = rx_original
 
-         if anti_jam_algorithm:
 
-             processed_signal = surveillance_radar.execute_anti_jamming(
 
-                 rx_signal=rx_original,
 
-                 algorithm=anti_jam_algorithm,
 
-                 sample_rate=1e6,
 
-                 polarization_params={'angle': 45, 'ellipticity': 0.5}
 
-             )
 
-         
 
-         # 类型检查并转换复数信号为实数
 
-         if np.iscomplexobj(processed_signal):
 
-             processed_signal = np.real(processed_signal)
 
-         
 
-         # 返回结果
 
-         return ResponseDTO.SUCCESS({"jamming_signal": np.real(jammed_signal).tolist(),
 
-                                     "anti_jamming_signal": np.real(processed_signal).tolist(),
 
-                                     "status": Service.status}).to_json()
 
-     @staticmethod
 
-     def send():
 
-         try:
 
-             # 设置中心频率、采样率和增益
 
-             center_freq = 100e6  # 2.4 GHz
 
-             sample_rate = 1e6  # 1 MS/s
 
-             duration = 10  # 以秒为单位
 
-             gain = 20  # [dB] 建议一开始设置小一点,按照实际情况调整
 
-             # 生成发送信号
 
-             num_samples = 100
 
-             tx_signal = np.random.randn(num_samples) + 0.1j * np.random.randn(num_samples)  # 修复部分
 
-             # 发送信号
 
-             Service.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [0], gain)
 
-             # 接收信号
 
-             rx_signal = Service.usrp.recv_num_samps(num_samples, center_freq, sample_rate)
 
-             print('信号已发送:')
 
-             print(rx_signal)
 
-             print('信号已接收')
 
-         except Exception as e:
 
-             print('发送或接收信号异常', e)
 
-             Service.status = 0  # 发送或接收信号失败,状态置为0
 
-             Service.usrp = None  # 重置USRP对象
 
-     @staticmethod
 
-     def _get_jamming_algorithm_method(policy: str) -> callable:
 
-         # 干扰算法映射
 
-         jam_algorithm_map = {
 
-             "噪声调频": JammingSignalAlgo.frequency_modulation,
 
-             "噪声调幅": JammingSignalAlgo.amplitude_modulation,
 
-             "噪声直放": JammingSignalAlgo.repeater_jamming,
 
-             "速度多假目标": JammingSignalAlgo.velocity_deception,
 
-             "距离多假目标": JammingSignalAlgo.range_deception
 
-         }
 
-         return jam_algorithm_map.get(policy)
 
-     @staticmethod
 
-     def _get_anti_jamming_algorithm(policy: str, algorithm_type: str) -> callable:
 
-         """
 
-         统一抗干扰策略路由方法
 
-         :param policy: 策略名称
 
-         :param algorithm_type: 算法类型['tx'发射端/'rx'接收端]
 
-         """
 
-         tx_algorithms = {
 
-             "波形捷变": AntiJammingSignalAlgo.adaptive_filter,
 
-             "频率捷变": AntiJammingSignalAlgo.time_frequency_filter
 
-         }
 
-         rx_algorithms = {
 
-             "自适应极化滤波": AntiJammingSignalAlgo.polarization_filter
 
-         }
 
-         
 
-         if algorithm_type == 'tx':
 
-             return tx_algorithms.get(policy)
 
-         elif algorithm_type == 'rx':
 
-             return None if policy in {"波形捷变", "频率捷变"} else rx_algorithms.get(policy)
 
- # main方法
 
- if __name__ == '__main__':
 
-     Service.initialize_usrp()
 
-     Service.data({"jamming_policy": "噪声调频", "anti_jamming_policy": "频率捷变"})
 
 
  |