import json import uhd import numpy as np import threading from dto.response_dto import ResponseDTO from model.surveillance_radar import SurveillanceRadar from model.jammer_radar import JammerRadar # 定义一组通道常量 CHANNEL_1 = 0 CHANNEL_2 = 1 CHANNEL_3 = 2 CHANNEL_4 = 3 # 定义干扰策略集合,确保集合中的元素为合法的字符串常量 JAMMING_POLICY = { "噪声调频", "噪声调幅", "噪声直放", "速度多假目标", "距离多假目标" } ANTI_JAMMING_POLICY = { "频率捷变", "波形捷变", "自适应极化滤波" } class Service: usrp = None # 静态类变量 status = 0 # 静态类变量,用于存储设备状态,初始化为0 _rlock = threading.RLock() # 可重入锁,用于线程锁 @staticmethod def initialize_usrp(): with Service._rlock: Service.get_sdr_status() if Service.status == 1: print('USRP设备已经初始化') return ResponseDTO.SUCCESS({"status": Service.status}).to_json() try: Service.usrp = uhd.usrp.MultiUSRP() print('------SDR Devices initialize success!------') Service.status = 1 # 初始化成功,状态置为1 return ResponseDTO.SUCCESS({"status": Service.status}).to_json() except Exception as e: print('SDR设备异常', e) Service.status = 0 # 初始化失败,状态置为0 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json() @staticmethod def get_sdr_status(): with Service._rlock: if Service.status == 0: return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json() try: samples = Service.usrp.recv_num_samps(1, 100e6, 1e6, [0], 50) if samples is None: Service.status = 0 # 获取状态失败,状态置为0 else: Service.status = 1 return ResponseDTO.SUCCESS({"status": Service.status}).to_json() except Exception as e: print('SDR设备异常', e) Service.status = 0 # 获取状态失败,状态置为0 Service.usrp = None # 重置USRP对象 return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json() @staticmethod def data(payload): jamming_policy = payload['jamming_policy'] anti_jamming_policy = payload['anti_jamming_policy'] # 判断策略是否合法 if jamming_policy not in JAMMING_POLICY or anti_jamming_policy not in ANTI_JAMMING_POLICY: return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json() # 打印对应策略 print(' jamming_policy:', jamming_policy) print(' anti_jamming_policy:', anti_jamming_policy) Service.send() # 生成两个长度为一百的浮点数组,代表侦察信号和返回的信号,数组元素随机 surveillance_signal = np.random.randn(100) surveillance_signal_back = np.random.randn(100) # 返回结果 return ResponseDTO.SUCCESS({"jamming_signal": surveillance_signal.tolist(), "anti_jamming_signal": surveillance_signal_back.tolist(), "status": Service.status}).to_json() @staticmethod def send(): try: # 设置中心频率、采样率和增益 center_freq = 100e6 # 2.4 GHz sample_rate = 1e6 # 1 MS/s duration = 10 # 以秒为单位 gain = 20 # [dB] 建议一开始设置小一点,按照实际情况调整 # 生成发送信号 num_samples = 100 tx_signal = np.random.randn(num_samples) + 0.1j * np.random.randn(num_samples) # 修复部分 # 发送信号 Service.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [0], gain) # 接收信号 rx_signal = Service.usrp.recv_num_samps(num_samples, center_freq, sample_rate) print('信号已发送:') print(rx_signal) print('信号已接收') except Exception as e: print('发送或接收信号异常', e) Service.status = 0 # 发送或接收信号失败,状态置为0 Service.usrp = None # 重置USRP对象 @staticmethod def demo(): surveillance_radar = SurveillanceRadar(Service.usrp, rx=CHANNEL_1, tx=CHANNEL_1) # main方法 if __name__ == '__main__': Service.initialize_usrp()