from math import e from pickletools import read_string1 from socket import timeout import numpy as np import numpy as np from scipy.signal import find_peaks from algo import anti_jamming_signal_algo from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo from threading import Thread, Event import queue import threading import time import uhd # ==================== 侦查雷达类 ==================== class SurveillanceRadar: SAMPLING_RATE = 10e6 CENTER_FREQ = 6e9 GAIN = 50 def __init__(self, usrp, rx_channel=0, tx_channel=0): self.usrp = usrp self.stop_event = Event() self.tx_buffer = queue.Queue(maxsize=300) self.rx_buffer = queue.Queue(maxsize=300) # 流配置参数(与demo.py保持一致) tx_stream_args = uhd.usrp.StreamArgs('fc32', 'sc16') tx_stream_args.channels = [tx_channel] self.tx_stream = self.usrp.get_tx_stream(tx_stream_args) rx_stream_args = uhd.usrp.StreamArgs('fc32','sc16') rx_stream_args.channels = [rx_channel] self.rx_stream = self.usrp.get_rx_stream(rx_stream_args) self.samps_per_packet = 500000 # self.stream_timeout = 3.0 # 配置设备参数(与demo.py同步) self.usrp.set_tx_rate(self.SAMPLING_RATE) self.usrp.set_tx_freq(uhd.types.TuneRequest(self.CENTER_FREQ),tx_channel) self.usrp.set_tx_gain(self.GAIN) self.usrp.set_tx_antenna("TX/RX", tx_channel) self.usrp.set_rx_rate(self.SAMPLING_RATE) self.usrp.set_rx_freq(uhd.types.TuneRequest(self.CENTER_FREQ),rx_channel) self.usrp.set_rx_gain(self.GAIN) self.usrp.set_rx_antenna("RX2", rx_channel) self.anti_jamming_algorithm = None # 存储抗干扰算法 self.tx_signal = None # 存储干扰信号 self.rx_signal = None # 接收到的信号 self.jammed_signal = np.zeros((self.samps_per_packet,), dtype=np.complex64) # 初始化干扰信号缓冲区 self.processed_signal = None # 存储处理后的信号 self.tx_buffer = queue.Queue(maxsize=100) self.rx_buffer = queue.Queue(maxsize=100) self.processing_thread = None def generate_signal(self): """生成具有随机强度的测试信号""" t = np.arange(int(SurveillanceRadar.SAMPLING_RATE * 0.1)) / SurveillanceRadar.SAMPLING_RATE # 0.1秒的数据 # 基础幅度为0.5,添加随机波动 (±0.3) amplitude = 0.5 + 0.3 * np.random.randn() amplitude = max(0.1, min(0.9, amplitude)) # 限制在合理范围内 # 添加随机相位偏移 phase = np.random.uniform(0, 2 * np.pi) # 添加随机频率偏移 (±50kHz) freq_offset = np.random.uniform(-50e3, 50e3) signal = amplitude * np.exp(2j * np.pi * (1e6 + freq_offset) * t + 1j * phase) return signal.astype(np.complex64), amplitude def tx_worker(self): """发送线程函数""" metadata = uhd.types.TXMetadata() metadata.start_of_burst = True metadata.end_of_burst = False metadata.has_time_spec = False print("发送线程已启动") while not self.stop_event.is_set(): try: # 生成随机信号 signal, amplitude = self.generate_signal() self.tx_signal = signal # 应用抗干扰算法 samples = self.process_transmit_signal(self.tx_signal, self.anti_jamming_algorithm, self.SAMPLING_RATE) # print("samples:", samples) # 发送信号 self.tx_stream.send(signal, metadata) print("数据已发送") # 将100个信号依次队列 #如果队列已满,则去对头元素 if self.tx_buffer.full(): self.tx_buffer.get() self.tx_buffer.put(samples.copy()) # 控制发送频率 time.sleep(0.05) except Exception as e: # self.stop_event.set() # metadata.end_of_burst = True # self.tx_stream.send(np.zeros((1,), dtype=np.complex64), metadata) # raise Exception(f"发送线程出错: {e}") print(f"发送线程出错: {e}") def rx_worker(self): # 启动接收流 stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_cmd.stream_now = True self.rx_stream.issue_stream_cmd(stream_cmd) metadata = uhd.types.RXMetadata() buff = np.zeros((self.samps_per_packet,), dtype=np.complex64) while not self.stop_event.is_set(): try: num_rx = self.rx_stream.recv(buff, metadata) if metadata.error_code != uhd.types.RXMetadataErrorCode.none: print(f"接收错误: {metadata.error_code}") continue # 将100个信号依次队列 if num_rx > 0: #如果队列已满,则去对头元素 if self.rx_buffer.full(): self.rx_buffer.get() self.rx_buffer.put(buff.copy()[:num_rx]) except Exception as e: # stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) # self.usrp.issue_stream_cmd(stream_cmd) # raise Exception(f"接收线程出错: {e}") print(f"接收线程出错: {e}") def process_transmit_signal(self, tx_signal: np.ndarray, algorithm: callable, sample_rate: float) -> np.ndarray: """ 发射端信号处理 :param tx_signal: 发射信号 :param algorithm: 发射端抗干扰算法 :param sample_rate: 采样率 """ #如果为频率捷变算法 if algorithm == AntiJammingSignalAlgo.frequency_agility: hop_sequence = [(10e6, 0.001), (20e6, 0.001)] # 跳频序列,频率点和驻留时间 process_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, hop_sequence=hop_sequence) elif algorithm == AntiJammingSignalAlgo.waveform_agility: waveform_params = {'type': 'LFM', 'bandwidth': 100e6, 'duration': 0.1} # 波形参数 process_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, waveform_params=waveform_params) else: process_signal = tx_signal # print("processed_signal:", processed_signal) return process_signal.astype(np.complex64) def apply_anti_jamming_processing(self, rx_signal: np.ndarray, algorithm: callable, sample_rate: float, **kwargs) -> np.ndarray: """ 应用抗干扰信号处理 :param rx_signal: 接收信号 :param algorithm: 接收端抗干扰算法 :param sample_rate: 采样率 """ # 极化参数配置 polar_params = { 'angle': 45, # 极化角(单位:度) 'ellipticity': 0.8 # 椭圆率(0~1之间) } if algorithm == AntiJammingSignalAlgo.polarization_filter: process_signal = algorithm(rx_signal=rx_signal, sample_rate=sample_rate, polarization_params=polar_params) else: process_signal = rx_signal return process_signal.astype(np.complex64) def anti_jamming_worker(self): # try: while not self.stop_event.is_set(): if not self.rx_buffer.qsize() > 150: list = np.array([]) for i in range(100): buff = self.rx_buffer.get() combined_signal = buff # processed_signal = self.apply_anti_jamming_processing( # combined_signal, # self.anti_jamming_algorithm, # self.SAMPLING_RATE # ) # 计算接收信号的强度 (RMS) signal_rms = np.sqrt(np.mean(np.abs(buff) ** 2)) print(f"\r接收信号强度: {signal_rms:.3f}") list = np.append(list,signal_rms) print("list:",list.tolist()) self.processed_signal = list time.sleep(0.05) # except Exception as e: # self.stop_event.set() # raise Exception(f"信号处理线程出错: {e}") def run(self): """启动雷达""" print("雷达启动") tx_thread = Thread(target=self.tx_worker) rx_thread = Thread(target=self.rx_worker) tx_thread.start() rx_thread.start() self.processing_thread = threading.Thread(target=self.anti_jamming_worker) self.processing_thread.start() print("雷达运行完毕") def inject(self,anti_jam_algorithm,jammed_signal): self.jammed_signal = jammed_signal self.anti_jamming_algorithm = anti_jam_algorithm