import json from math import e from re import S from matplotlib.colors import CenteredNorm import uhd import numpy as np import threading from dto.response_dto import ResponseDTO from model.surveillance_radar import SurveillanceRadar from model.jammer_radar import JammerRadar from algo.jamming_signal_algo import JammingSignalAlgo from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo # 定义一组通道常量 CHANNEL_1 = 0 CHANNEL_2 = 1 CHANNEL_3 = 2 CHANNEL_4 = 3 SAMPLING_RATE = 20e6 CENTER_FREQ = 6e9 GAIN = 50 # 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量 from common.strategy_enum import JammingPolicy, AntiJammingPolicy class Service: usrp = None # 静态类变量 status = 0 # 静态类变量 surveillance_radar = None # 侦查雷达实例 jammer_radar = None # 干扰雷达实例 _rlock = threading.RLock() # 可重入锁 @staticmethod def initialize_usrp(): with Service._rlock: Service.get_sdr_status() if Service.status == 1: print('USRP设备已经初始化') return ResponseDTO.SUCCESS({"status": Service.status}).to_json() try: Service.usrp = uhd.usrp.MultiUSRP() print('------SDR Devices initialize success!------') #侦查雷达初始化 Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx_channel=CHANNEL_1, tx_channel=CHANNEL_1) #侦察类雷达启动 Service.surveillance_radar.run() # 干扰雷达初始化 Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2) Service.status = 1 return ResponseDTO.SUCCESS({"status": Service.status}).to_json() except Exception as e: print('SDR设备异常', e) Service.usrp = uhd.usrp.MultiUSRP("type=x4xx,reset") Service.status = 0 # 初始化失败,状态置为0 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json() # @staticmethod # def get_sdr_status(): # with Service._rlock: # if Service.status == 0: # return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json() # try: # rx_buffer = np.zeros((500000,), dtype=np.complex64) # Service.surveillance_radar.rx_stream.recv(rx_buffer, uhd.types.RXMetadata()) # Service.status = 1 # return ResponseDTO.SUCCESS({"status": Service.status}).to_json() # except Exception as e: # print('SDR设备异常', e) # Service.status = 0 # 获取状态失败,状态置为0 # Service.usrp = None # 重置USRP对象 # return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json() @staticmethod def get_sdr_status(): with Service._rlock: return ResponseDTO.SUCCESS({"status": Service.status}).to_json() @staticmethod def data(payload): with Service._rlock: if Service.status == 0: return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": "SDR设备未初始化"}).to_json() # try: # 使用已初始化的雷达实例 surveillance_radar = Service.surveillance_radar jammer_radar = Service.jammer_radar jamming_policy = payload.get('jamming_policy') anti_jamming_policy = payload.get('anti_jamming_policy') # 判断策略是否合法 if not JammingPolicy.is_valid(jamming_policy) or not AntiJammingPolicy.is_valid(anti_jamming_policy): return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json() # 打印对应策略 print(' jamming_policy:', jamming_policy) print(' anti_jamming_policy:', anti_jamming_policy) # 根据策略选择算法 jam_algorithm = Service._get_jamming_algorithm_method(jamming_policy) anti_jam_algorithm = Service._get_anti_jamming_algorithm(anti_jamming_policy) print(' jamming_algorithm:', jam_algorithm) print(' anti_jamming_algorithm:', anti_jam_algorithm) # 生成干扰信号 jammed_signal, lower_bound, upper_bound = jammer_radar.execute_jamming( algorithm=jam_algorithm, bandwidth=CENTER_FREQ, sample_rate=SAMPLING_RATE, duration=0.1, gain=GAIN ) # 抗干扰算法 、 干扰信号注入到侦察雷达 surveillance_radar.inject(anti_jam_algorithm,jammed_signal) # 返回结果 return ResponseDTO.SUCCESS({ "jamming_signal": np.real(jammed_signal).tolist(), # "anti_jamming_signal": np.real(surveillance_radar.processed_signal).flatten().tolist(), "anti_jamming_signal": surveillance_radar.processed_signal.tolist(), "jamming_range": { "upper": upper_bound, "lower": lower_bound } }).to_json() # except Exception as e: # print('数据处理异常', e) # return ResponseDTO.ERROR_MS_DATA('数据处理异常', {"status": Service.status, "Error": str(e)}).to_json() @staticmethod def _get_jamming_algorithm_method(policy: str) -> callable: # 干扰算法映射 jam_algorithm_map = { JammingPolicy.FREQUENCY_MODULATION.value: JammingSignalAlgo.frequency_modulation, JammingPolicy.AMPLITUDE_MODULATION.value: JammingSignalAlgo.amplitude_modulation, JammingPolicy.REPEATER.value: JammingSignalAlgo.repeater_jamming, JammingPolicy.VELOCITY_DECEPTION.value: JammingSignalAlgo.velocity_deception, JammingPolicy.RANGE_DECEPTION.value: JammingSignalAlgo.range_deception } return jam_algorithm_map.get(policy) @staticmethod def _get_anti_jamming_algorithm(policy: str) -> callable: """ 统一抗干扰策略路由方法 :param policy: 策略名称 """ anti_jam_algorithm_map = { AntiJammingPolicy.WAVEFORM_AGILITY.value: AntiJammingSignalAlgo.waveform_agility, AntiJammingPolicy.FREQUENCY_AGILITY.value: AntiJammingSignalAlgo.frequency_agility, AntiJammingPolicy.POLARIZATION_FILTER.value: AntiJammingSignalAlgo.polarization_filter } return anti_jam_algorithm_map.get(policy)