import json import uhd import numpy as np import threading from dto.response_dto import ResponseDTO from model.surveillance_radar import SurveillanceRadar from model.jammer_radar import JammerRadar from algo.jamming_signal_algo import JammingSignalAlgo from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo # 定义一组通道常量 CHANNEL_1 = 0 CHANNEL_2 = 1 CHANNEL_3 = 2 CHANNEL_4 = 3 # 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量 from enums.strategy_enum import JammingPolicy, AntiJammingPolicy class Service: usrp = None # 静态类变量 status = 0 # 静态类变量 surveillance_radar = None # 侦查雷达实例 jammer_radar = None # 干扰雷达实例 _rlock = threading.RLock() # 可重入锁 @staticmethod def initialize_usrp(): with Service._rlock: Service.get_sdr_status() if Service.status == 1: print('USRP设备已经初始化') return ResponseDTO.SUCCESS({"status": Service.status}).to_json() try: Service.usrp = uhd.usrp.MultiUSRP() print('------SDR Devices initialize success!------') Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx=CHANNEL_1, tx=CHANNEL_1) Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2) Service.status = 1 return ResponseDTO.SUCCESS({"status": Service.status}).to_json() except Exception as e: print('SDR设备异常', e) Service.status = 0 # 初始化失败,状态置为0 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json() @staticmethod def get_sdr_status(): with Service._rlock: if Service.status == 0: return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json() try: samples = Service.usrp.recv_num_samps(1, 100e6, 1e6, [0], 50) if samples is None: Service.status = 0 # 获取状态失败,状态置为0 else: Service.status = 1 return ResponseDTO.SUCCESS({"status": Service.status}).to_json() except Exception as e: print('SDR设备异常', e) Service.status = 0 # 获取状态失败,状态置为0 Service.usrp = None # 重置USRP对象 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json() @staticmethod def data(payload): # 使用已初始化的雷达实例 surveillance_radar = Service.surveillance_radar jammer_radar = Service.jammer_radar jamming_policy = payload.get('jamming_policy') anti_jamming_policy = payload.get('anti_jamming_policy') # 判断策略是否合法 if not JammingPolicy.is_valid(jamming_policy) or not AntiJammingPolicy.is_valid(anti_jamming_policy): return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json() # 打印对应策略 print(' jamming_policy:', jamming_policy) print(' anti_jamming_policy:', anti_jamming_policy) # 根据策略选择算法 jam_algorithm = Service._get_jamming_algorithm_method(jamming_policy) anti_jam_algorithm = Service._get_anti_jamming_algorithm(anti_jamming_policy, 'rx') # 1. 应用发射端抗干扰算法(波形/频率捷变) tx_original = np.random.randn(100) if anti_jamming_policy in {AntiJammingPolicy.WAVEFORM_AGILITY, AntiJammingPolicy.FREQUENCY_AGILITY}: tx_processed = surveillance_radar.process_transmit_signal( tx_signal=tx_original, algorithm=Service._get_tx_algorithm_method(anti_jamming_policy.value), sample_rate=1e6 ) else: tx_processed = tx_original # 2. 发送经过处理的发射信号 tx_signal = surveillance_radar.send_signal( tx_signal=tx_processed, duration=0.1, center_freq=100e6, sample_rate=1e6, gain=20 ) # 4. 干扰雷达生成干扰 jammed_signal = jammer_radar.execute_jamming( algorithm=jam_algorithm, bandwidth=100e6, sample_rate=1e6, duration=0.1, gain=20 ) if jam_algorithm else tx_signal # 3. 独立接收原始信号 rx_original = surveillance_radar.recv_signal( num_samples=100, sample_rate=1e6, center_freq=100e6 ) # 接收端抗干扰处理(仅限接收端策略) processed_signal = rx_original if anti_jam_algorithm: processed_signal = surveillance_radar.execute_anti_jamming( rx_signal=rx_original, algorithm=anti_jam_algorithm, sample_rate=1e6, polarization_params={'angle': 45, 'ellipticity': 0.5} ) # 类型检查并转换复数信号为实数 if np.iscomplexobj(processed_signal): processed_signal = np.real(processed_signal) if np.iscomplexobj(jammed_signal): jammed_signal = np.real(jammed_signal) # 返回结果 return ResponseDTO.SUCCESS({"jamming_signal": np.real(jammed_signal).tolist(), "anti_jamming_signal": np.real(processed_signal).tolist()}).to_json() @staticmethod def send(): try: # 设置中心频率、采样率和增益 center_freq = 100e6 # 2.4 GHz sample_rate = 1e6 # 1 MS/s duration = 10 # 以秒为单位 gain = 20 # [dB] 建议一开始设置小一点,按照实际情况调整 # 生成发送信号 num_samples = 100 tx_signal = np.random.randn(num_samples) + 0.1j * np.random.randn(num_samples) # 修复部分 # 发送信号 Service.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [0], gain) # 接收信号 rx_signal = Service.usrp.recv_num_samps(num_samples, center_freq, sample_rate) print('信号已发送:') print(rx_signal) print('信号已接收') except Exception as e: print('发送或接收信号异常', e) Service.status = 0 # 发送或接收信号失败,状态置为0 Service.usrp = None # 重置USRP对象 @staticmethod def _get_jamming_algorithm_method(policy: str) -> callable: # 干扰算法映射 jam_algorithm_map = { JammingPolicy.FREQUENCY_MODULATION: JammingSignalAlgo.frequency_modulation, JammingPolicy.AMPLITUDE_MODULATION: JammingSignalAlgo.amplitude_modulation, JammingPolicy.REPEATER: JammingSignalAlgo.repeater_jamming, JammingPolicy.VELOCITY_DECEPTION: JammingSignalAlgo.velocity_deception, JammingPolicy.RANGE_DECEPTION: JammingSignalAlgo.range_deception } return jam_algorithm_map.get(policy) @staticmethod def _get_anti_jamming_algorithm(policy: AntiJammingPolicy, algorithm_type: str) -> callable: """ 统一抗干扰策略路由方法 :param policy: 策略名称 :param algorithm_type: 算法类型['tx'发射端/'rx'接收端] """ tx_algorithms = { AntiJammingPolicy.WAVEFORM_AGILITY: AntiJammingSignalAlgo.adaptive_filter, AntiJammingPolicy.FREQUENCY_AGILITY: AntiJammingSignalAlgo.time_frequency_filter } rx_algorithms = { AntiJammingPolicy.POLARIZATION_FILTER: AntiJammingSignalAlgo.polarization_filter } if algorithm_type == 'tx': return tx_algorithms.get(policy) elif algorithm_type == 'rx': return None if policy in {AntiJammingPolicy.WAVEFORM_AGILITY, AntiJammingPolicy.FREQUENCY_AGILITY} else rx_algorithms.get(policy) # main方法 if __name__ == '__main__': Service.initialize_usrp() Service.data({"jamming_policy": "噪声调频", "anti_jamming_policy": "频率捷变"})