import numpy as np from scipy import signal, fft, linalg from typing import Tuple, List class AntiJammingSignalAlgo: """ 雷达抗干扰算法库 支持对抗噪声干扰、欺骗干扰、DRFM干扰等 """ @staticmethod def adaptive_filter( rx_signal: np.ndarray, reference_signal: np.ndarray, filter_length: int = 32, mu: float = 0.01 ) -> np.ndarray: """ LMS自适应滤波(对抗噪声干扰) :param rx_signal: 含干扰的接收信号 :param reference_signal: 参考信号(干扰样本) :param filter_length: 滤波器阶数 :param mu: 收敛因子 (0 < mu < 1) :return: 滤波后信号 """ n = len(rx_signal) w = np.zeros(filter_length, dtype=np.complex64) output = np.zeros(n, dtype=np.complex64) for k in range(filter_length, n): x = reference_signal[k - filter_length:k][::-1] output[k] = rx_signal[k] - np.dot(w, x) w += mu * output[k] * np.conj(x) return output @staticmethod def pca_denoise( signal_matrix: np.ndarray, n_components: int = 3 ) -> np.ndarray: """ 基于PCA的信号增强(对抗宽带噪声) :param signal_matrix: 多通道信号矩阵 [n_channels, n_samples] :param n_components: 保留的主成分数量 :return: 降噪后信号 """ cov_matrix = np.cov(signal_matrix) eig_vals, eig_vecs = linalg.eigh(cov_matrix) idx = eig_vals.argsort()[::-1] principal_components = eig_vecs[:, idx[:n_components]] return np.dot(principal_components.T, signal_matrix) @staticmethod def cfar_detection( spectrum: np.ndarray, guard_cells: int = 2, training_cells: int = 10, threshold_factor: float = 3.0 ) -> np.ndarray: """ CFAR目标检测(对抗欺骗干扰) :param spectrum: 距离/多普勒谱 :param guard_cells: 保护单元数 :param training_cells: 训练单元数 :param threshold_factor: 阈值系数 :return: 目标掩码(True表示目标) """ mask = np.zeros_like(spectrum, dtype=bool) n = len(spectrum) for i in range(n): left = max(0, i - guard_cells - training_cells) right = min(n, i + guard_cells + training_cells) noise_est = np.concatenate([ spectrum[left:i - guard_cells], spectrum[i + guard_cells:right] ]) threshold = threshold_factor * np.mean(noise_est) mask[i] = spectrum[i] > threshold return mask @staticmethod def time_frequency_filter( rx_signal: np.ndarray, sample_rate: float, jamming_freq: float, bandwidth: float = 1e6 ) -> np.ndarray: """ 时频联合滤波(对抗点频干扰) :param rx_signal: 接收信号 :param sample_rate: 采样率 :param jamming_freq: 干扰中心频率 :param bandwidth: 抑制带宽 :return: 滤波后信号 """ # STFT参数 nperseg = 256 noverlap = nperseg // 2 # 计算STFT f, t, Zxx = signal.stft(rx_signal, fs=sample_rate, nperseg=nperseg, noverlap=noverlap) # 构建干扰掩码 jamming_mask = (np.abs(f - jamming_freq) < bandwidth / 2) # 干扰置零 Zxx_clean = Zxx.copy() Zxx_clean[jamming_mask, :] = 0 # 逆STFT _, clean_signal = signal.istft(Zxx_clean, fs=sample_rate, nperseg=nperseg, noverlap=noverlap) return clean_signal[:len(rx_signal)] @staticmethod def mti_filter( pulse_matrix: np.ndarray, clutter_rejection: float = 30.0 ) -> np.ndarray: """ 动目标显示(MTI)滤波(对抗地物杂波) :param pulse_matrix: 多脉冲信号矩阵 [n_pulses, n_samples] :param clutter_rejection: 杂波抑制深度(dB) :return: 滤波后信号矩阵 """ # 构造对消器 (3脉冲对消器) canceler = np.array([1, -2, 1]) output = np.apply_along_axis( lambda x: np.convolve(x, canceler, mode='valid'), 0, pulse_matrix ) return output * (10 ** (clutter_rejection / 20)) @staticmethod def spoofing_detection( pulse_train: List[np.ndarray], prf: float, max_velocity: float = 300.0 ) -> np.ndarray: """ 欺骗干扰检测(基于多普勒连续性) :param pulse_train: 脉冲序列 :param prf: 脉冲重复频率 :param max_velocity: 最大合理速度(m/s) :return: 干扰标记数组 """ n_pulses = len(pulse_train) lambd = 3e8 / 10e9 # 假设雷达波长(10GHz) max_doppler = 2 * max_velocity / lambd # 计算多普勒谱 doppler_spectra = [] for pulse in pulse_train: spec = np.abs(fft.fftshift(fft.fft(pulse))) doppler_spectra.append(spec) # 检测异常多普勒跳变 is_spoofed = np.zeros(n_pulses, dtype=bool) for i in range(1, n_pulses): corr = np.correlate(doppler_spectra[i - 1], doppler_spectra[i], 'same') peak_shift = np.argmax(corr) - len(corr) // 2 if abs(peak_shift) > max_doppler / prf * len(corr): is_spoofed[i] = True return is_spoofed # ==================== 使用示例 ==================== if __name__ == "__main__": # 模拟含干扰信号 fs = 100e6 t = np.arange(0, 1e-3, 1 / fs) true_signal = np.exp(1j * 2 * np.pi * 1e6 * t) noise_jam = 0.5 * (np.random.normal(0, 1, len(t)) + 1j * np.random.normal(0, 1, len(t))) spoof_jam = 0.3 * np.exp(1j * 2 * np.pi * 3e6 * t) rx_signal = true_signal + noise_jam + spoof_jam # 示例1:自适应滤波 filtered = AntiJammingSignalAlgo.adaptive_filter( rx_signal, reference_signal=noise_jam, filter_length=16, mu=0.05 ) print(f"自适应滤波后SNR提升: {10 * np.log10(np.var(filtered) / np.var(rx_signal - filtered)):.1f} dB") # 示例2:CFAR检测 spectrum = np.abs(fft.fft(rx_signal)) targets = AntiJammingSignalAlgo.cfar_detection(spectrum) print(f"CFAR检测到目标位置: {np.where(targets)[0]}") # 示例3:欺骗干扰检测 pulse_train = [rx_signal[i * 100:(i + 1) * 100] for i in range(10)] spoof_flags = AntiJammingSignalAlgo.spoofing_detection(pulse_train, prf=1e3) print(f"受干扰脉冲索引: {np.where(spoof_flags)[0]}")