import numpy as np from scipy.signal import find_peaks from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo # ==================== 侦查雷达类 ==================== class SurveillanceRadar: #初始化函数,传入usrp,输入的通道,输出的通道 def __init__(self, usrp, rx, tx): self.usrp = usrp self.rx = rx self.tx = tx # 封装一个发送信号的函数 def send_signal(self, tx_signal, duration, center_freq, sample_rate, gain): # 发送信号并返回发射信号 self.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [self.tx], gain) print('侦查雷达已发送信号') return tx_signal # 封装一个接收信号的函数 def recv_signal(self, num_samples, sample_rate, center_freq): rx_signal = self.usrp.recv_num_samps(num_samples, center_freq, sample_rate, [self.rx]) print('侦查雷达已接收信号') return rx_signal def process_transmit_signal(self, tx_signal: np.ndarray, algorithm: callable, sample_rate: float) -> np.ndarray: """ 发射端信号处理 :param tx_signal: 发射信号 :param algorithm: 发射端抗干扰算法 :param sample_rate: 采样率 """ #如果为频率捷变算法 if algorithm == AntiJammingSignalAlgo.frequency_agility: hop_sequence = [(100e6, 0.001), (200e6, 0.001)] # 跳频序列,频率点和驻留时间 processed_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, hop_sequence=hop_sequence) if algorithm == AntiJammingSignalAlgo.waveform_agility: waveform_params = {'type': 'LFM', 'bandwidth': 100e6, 'duration': 0.1} # 波形参数 processed_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, waveform_params=waveform_params) print("processed_signal:", processed_signal) return processed_signal.astype(np.complex64) def apply_anti_jamming_processing(self, rx_signal: np.ndarray, algorithm: callable, sample_rate: float, **kwargs) -> np.ndarray: """ 应用抗干扰信号处理 :param rx_signal: 接收信号 :param algorithm: 接收端抗干扰算法 :param sample_rate: 采样率 """ processed_signal = algorithm(rx_signal=rx_signal, sample_rate=sample_rate, **kwargs) return processed_signal.astype(np.complex64) # 分析信号,获取目标距离 def analyze_signal(self, rx_signal: np.ndarray, sample_rate: float, cfar_threshold: float = 20.0) -> list: """ 分析接收信号并返回目标距离列表 :param rx_signal: 接收信号(复数形式) :param sample_rate: 采样率(Hz) :param cfar_threshold: CFAR检测阈值(dB) :return: 目标距离列表(米) """ # 1. 去斜处理(Dechirping) mixed = rx_signal * np.conj(self.tx_signal[:len(rx_signal)]) # 2. 加窗处理(Hamming窗) window = np.hamming(len(mixed)) windowed = mixed * window # 3. 距离FFT range_fft = np.fft.fft(windowed) spectrum_db = 20 * np.log10(np.abs(range_fft) + 1e-10) # 转换为dB # 4. 计算距离轴 freq_bins = np.fft.fftfreq(len(spectrum_db), 1/sample_rate) ranges = (self.c * self.T * freq_bins) / (2 * self.B) # 5. CFAR目标检测(简化版) noise_floor = np.median(spectrum_db) peaks, _ = find_peaks(spectrum_db, height=noise_floor + cfar_threshold) # 6. 提取目标距离(取前向部分) valid_peaks = peaks[peaks <= len(ranges)//2] print(f"检测到目标距离:{[abs(ranges[p]) for p in valid_peaks]} 米") return [abs(ranges[p]) for p in valid_peaks]