|
@@ -0,0 +1,200 @@
|
|
|
+import numpy as np
|
|
|
+from scipy import signal, fft, linalg
|
|
|
+from typing import Tuple, List
|
|
|
+
|
|
|
+
|
|
|
+class AntiJammingSignalAlgo:
|
|
|
+ """
|
|
|
+ 雷达抗干扰算法库
|
|
|
+ 支持对抗噪声干扰、欺骗干扰、DRFM干扰等
|
|
|
+ """
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def adaptive_filter(
|
|
|
+ rx_signal: np.ndarray,
|
|
|
+ reference_signal: np.ndarray,
|
|
|
+ filter_length: int = 32,
|
|
|
+ mu: float = 0.01
|
|
|
+ ) -> np.ndarray:
|
|
|
+ """
|
|
|
+ LMS自适应滤波(对抗噪声干扰)
|
|
|
+ :param rx_signal: 含干扰的接收信号
|
|
|
+ :param reference_signal: 参考信号(干扰样本)
|
|
|
+ :param filter_length: 滤波器阶数
|
|
|
+ :param mu: 收敛因子 (0 < mu < 1)
|
|
|
+ :return: 滤波后信号
|
|
|
+ """
|
|
|
+ n = len(rx_signal)
|
|
|
+ w = np.zeros(filter_length, dtype=np.complex64)
|
|
|
+ output = np.zeros(n, dtype=np.complex64)
|
|
|
+
|
|
|
+ for k in range(filter_length, n):
|
|
|
+ x = reference_signal[k - filter_length:k][::-1]
|
|
|
+ output[k] = rx_signal[k] - np.dot(w, x)
|
|
|
+ w += mu * output[k] * np.conj(x)
|
|
|
+
|
|
|
+ return output
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def pca_denoise(
|
|
|
+ signal_matrix: np.ndarray,
|
|
|
+ n_components: int = 3
|
|
|
+ ) -> np.ndarray:
|
|
|
+ """
|
|
|
+ 基于PCA的信号增强(对抗宽带噪声)
|
|
|
+ :param signal_matrix: 多通道信号矩阵 [n_channels, n_samples]
|
|
|
+ :param n_components: 保留的主成分数量
|
|
|
+ :return: 降噪后信号
|
|
|
+ """
|
|
|
+ cov_matrix = np.cov(signal_matrix)
|
|
|
+ eig_vals, eig_vecs = linalg.eigh(cov_matrix)
|
|
|
+ idx = eig_vals.argsort()[::-1]
|
|
|
+ principal_components = eig_vecs[:, idx[:n_components]]
|
|
|
+ return np.dot(principal_components.T, signal_matrix)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def cfar_detection(
|
|
|
+ spectrum: np.ndarray,
|
|
|
+ guard_cells: int = 2,
|
|
|
+ training_cells: int = 10,
|
|
|
+ threshold_factor: float = 3.0
|
|
|
+ ) -> np.ndarray:
|
|
|
+ """
|
|
|
+ CFAR目标检测(对抗欺骗干扰)
|
|
|
+ :param spectrum: 距离/多普勒谱
|
|
|
+ :param guard_cells: 保护单元数
|
|
|
+ :param training_cells: 训练单元数
|
|
|
+ :param threshold_factor: 阈值系数
|
|
|
+ :return: 目标掩码(True表示目标)
|
|
|
+ """
|
|
|
+ mask = np.zeros_like(spectrum, dtype=bool)
|
|
|
+ n = len(spectrum)
|
|
|
+
|
|
|
+ for i in range(n):
|
|
|
+ left = max(0, i - guard_cells - training_cells)
|
|
|
+ right = min(n, i + guard_cells + training_cells)
|
|
|
+ noise_est = np.concatenate([
|
|
|
+ spectrum[left:i - guard_cells],
|
|
|
+ spectrum[i + guard_cells:right]
|
|
|
+ ])
|
|
|
+ threshold = threshold_factor * np.mean(noise_est)
|
|
|
+ mask[i] = spectrum[i] > threshold
|
|
|
+
|
|
|
+ return mask
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def time_frequency_filter(
|
|
|
+ rx_signal: np.ndarray,
|
|
|
+ sample_rate: float,
|
|
|
+ jamming_freq: float,
|
|
|
+ bandwidth: float = 1e6
|
|
|
+ ) -> np.ndarray:
|
|
|
+ """
|
|
|
+ 时频联合滤波(对抗点频干扰)
|
|
|
+ :param rx_signal: 接收信号
|
|
|
+ :param sample_rate: 采样率
|
|
|
+ :param jamming_freq: 干扰中心频率
|
|
|
+ :param bandwidth: 抑制带宽
|
|
|
+ :return: 滤波后信号
|
|
|
+ """
|
|
|
+ # STFT参数
|
|
|
+ nperseg = 256
|
|
|
+ noverlap = nperseg // 2
|
|
|
+
|
|
|
+ # 计算STFT
|
|
|
+ f, t, Zxx = signal.stft(rx_signal, fs=sample_rate,
|
|
|
+ nperseg=nperseg, noverlap=noverlap)
|
|
|
+
|
|
|
+ # 构建干扰掩码
|
|
|
+ jamming_mask = (np.abs(f - jamming_freq) < bandwidth / 2)
|
|
|
+
|
|
|
+ # 干扰置零
|
|
|
+ Zxx_clean = Zxx.copy()
|
|
|
+ Zxx_clean[jamming_mask, :] = 0
|
|
|
+
|
|
|
+ # 逆STFT
|
|
|
+ _, clean_signal = signal.istft(Zxx_clean, fs=sample_rate,
|
|
|
+ nperseg=nperseg, noverlap=noverlap)
|
|
|
+ return clean_signal[:len(rx_signal)]
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def mti_filter(
|
|
|
+ pulse_matrix: np.ndarray,
|
|
|
+ clutter_rejection: float = 30.0
|
|
|
+ ) -> np.ndarray:
|
|
|
+ """
|
|
|
+ 动目标显示(MTI)滤波(对抗地物杂波)
|
|
|
+ :param pulse_matrix: 多脉冲信号矩阵 [n_pulses, n_samples]
|
|
|
+ :param clutter_rejection: 杂波抑制深度(dB)
|
|
|
+ :return: 滤波后信号矩阵
|
|
|
+ """
|
|
|
+ # 构造对消器 (3脉冲对消器)
|
|
|
+ canceler = np.array([1, -2, 1])
|
|
|
+ output = np.apply_along_axis(
|
|
|
+ lambda x: np.convolve(x, canceler, mode='valid'),
|
|
|
+ 0, pulse_matrix
|
|
|
+ )
|
|
|
+ return output * (10 ** (clutter_rejection / 20))
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def spoofing_detection(
|
|
|
+ pulse_train: List[np.ndarray],
|
|
|
+ prf: float,
|
|
|
+ max_velocity: float = 300.0
|
|
|
+ ) -> np.ndarray:
|
|
|
+ """
|
|
|
+ 欺骗干扰检测(基于多普勒连续性)
|
|
|
+ :param pulse_train: 脉冲序列
|
|
|
+ :param prf: 脉冲重复频率
|
|
|
+ :param max_velocity: 最大合理速度(m/s)
|
|
|
+ :return: 干扰标记数组
|
|
|
+ """
|
|
|
+ n_pulses = len(pulse_train)
|
|
|
+ lambd = 3e8 / 10e9 # 假设雷达波长(10GHz)
|
|
|
+ max_doppler = 2 * max_velocity / lambd
|
|
|
+
|
|
|
+ # 计算多普勒谱
|
|
|
+ doppler_spectra = []
|
|
|
+ for pulse in pulse_train:
|
|
|
+ spec = np.abs(fft.fftshift(fft.fft(pulse)))
|
|
|
+ doppler_spectra.append(spec)
|
|
|
+
|
|
|
+ # 检测异常多普勒跳变
|
|
|
+ is_spoofed = np.zeros(n_pulses, dtype=bool)
|
|
|
+ for i in range(1, n_pulses):
|
|
|
+ corr = np.correlate(doppler_spectra[i - 1], doppler_spectra[i], 'same')
|
|
|
+ peak_shift = np.argmax(corr) - len(corr) // 2
|
|
|
+ if abs(peak_shift) > max_doppler / prf * len(corr):
|
|
|
+ is_spoofed[i] = True
|
|
|
+
|
|
|
+ return is_spoofed
|
|
|
+
|
|
|
+
|
|
|
+# ==================== 使用示例 ====================
|
|
|
+if __name__ == "__main__":
|
|
|
+ # 模拟含干扰信号
|
|
|
+ fs = 100e6
|
|
|
+ t = np.arange(0, 1e-3, 1 / fs)
|
|
|
+ true_signal = np.exp(1j * 2 * np.pi * 1e6 * t)
|
|
|
+ noise_jam = 0.5 * (np.random.normal(0, 1, len(t)) + 1j * np.random.normal(0, 1, len(t)))
|
|
|
+ spoof_jam = 0.3 * np.exp(1j * 2 * np.pi * 3e6 * t)
|
|
|
+ rx_signal = true_signal + noise_jam + spoof_jam
|
|
|
+
|
|
|
+ # 示例1:自适应滤波
|
|
|
+ filtered = AntiJammingSignalAlgo.adaptive_filter(
|
|
|
+ rx_signal,
|
|
|
+ reference_signal=noise_jam,
|
|
|
+ filter_length=16,
|
|
|
+ mu=0.05
|
|
|
+ )
|
|
|
+ print(f"自适应滤波后SNR提升: {10 * np.log10(np.var(filtered) / np.var(rx_signal - filtered)):.1f} dB")
|
|
|
+
|
|
|
+ # 示例2:CFAR检测
|
|
|
+ spectrum = np.abs(fft.fft(rx_signal))
|
|
|
+ targets = AntiJammingSignalAlgo.cfar_detection(spectrum)
|
|
|
+ print(f"CFAR检测到目标位置: {np.where(targets)[0]}")
|
|
|
+
|
|
|
+ # 示例3:欺骗干扰检测
|
|
|
+ pulse_train = [rx_signal[i * 100:(i + 1) * 100] for i in range(10)]
|
|
|
+ spoof_flags = AntiJammingSignalAlgo.spoofing_detection(pulse_train, prf=1e3)
|
|
|
+ print(f"受干扰脉冲索引: {np.where(spoof_flags)[0]}")
|