123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- import numpy as np
- from scipy import signal, fft, linalg
- from typing import Tuple, List
- class AntiJammingSignalAlgo:
- """
- 雷达抗干扰算法库
- 支持对抗噪声干扰、欺骗干扰、DRFM干扰等
- """
- @staticmethod
- def adaptive_filter(
- rx_signal: np.ndarray,
- reference_signal: np.ndarray,
- filter_length: int = 32,
- mu: float = 0.01
- ) -> np.ndarray:
- """
- LMS自适应滤波(对抗噪声干扰)
- :param rx_signal: 含干扰的接收信号
- :param reference_signal: 参考信号(干扰样本)
- :param filter_length: 滤波器阶数
- :param mu: 收敛因子 (0 < mu < 1)
- :return: 滤波后信号
- """
- n = len(rx_signal)
- w = np.zeros(filter_length, dtype=np.complex64)
- output = np.zeros(n, dtype=np.complex64)
- for k in range(filter_length, n):
- x = reference_signal[k - filter_length:k][::-1]
- output[k] = rx_signal[k] - np.dot(w, x)
- w += mu * output[k] * np.conj(x)
- return output
- @staticmethod
- def pca_denoise(
- signal_matrix: np.ndarray,
- n_components: int = 3
- ) -> np.ndarray:
- """
- 基于PCA的信号增强(对抗宽带噪声)
- :param signal_matrix: 多通道信号矩阵 [n_channels, n_samples]
- :param n_components: 保留的主成分数量
- :return: 降噪后信号
- """
- cov_matrix = np.cov(signal_matrix)
- eig_vals, eig_vecs = linalg.eigh(cov_matrix)
- idx = eig_vals.argsort()[::-1]
- principal_components = eig_vecs[:, idx[:n_components]]
- return np.dot(principal_components.T, signal_matrix)
- @staticmethod
- def cfar_detection(
- spectrum: np.ndarray,
- guard_cells: int = 2,
- training_cells: int = 10,
- threshold_factor: float = 3.0
- ) -> np.ndarray:
- """
- CFAR目标检测(对抗欺骗干扰)
- :param spectrum: 距离/多普勒谱
- :param guard_cells: 保护单元数
- :param training_cells: 训练单元数
- :param threshold_factor: 阈值系数
- :return: 目标掩码(True表示目标)
- """
- mask = np.zeros_like(spectrum, dtype=bool)
- n = len(spectrum)
- for i in range(n):
- left = max(0, i - guard_cells - training_cells)
- right = min(n, i + guard_cells + training_cells)
- noise_est = np.concatenate([
- spectrum[left:i - guard_cells],
- spectrum[i + guard_cells:right]
- ])
- threshold = threshold_factor * np.mean(noise_est)
- mask[i] = spectrum[i] > threshold
- return mask
- @staticmethod
- def time_frequency_filter(
- rx_signal: np.ndarray,
- sample_rate: float,
- jamming_freq: float,
- bandwidth: float = 1e6
- ) -> np.ndarray:
- """
- 时频联合滤波(对抗点频干扰)
- :param rx_signal: 接收信号
- :param sample_rate: 采样率
- :param jamming_freq: 干扰中心频率
- :param bandwidth: 抑制带宽
- :return: 滤波后信号
- """
- # STFT参数
- nperseg = 256
- noverlap = nperseg // 2
- # 计算STFT
- f, t, Zxx = signal.stft(rx_signal, fs=sample_rate,
- nperseg=nperseg, noverlap=noverlap)
- # 构建干扰掩码
- jamming_mask = (np.abs(f - jamming_freq) < bandwidth / 2)
- # 干扰置零
- Zxx_clean = Zxx.copy()
- Zxx_clean[jamming_mask, :] = 0
- # 逆STFT
- _, clean_signal = signal.istft(Zxx_clean, fs=sample_rate,
- nperseg=nperseg, noverlap=noverlap)
- return clean_signal[:len(rx_signal)]
- @staticmethod
- def mti_filter(
- pulse_matrix: np.ndarray,
- clutter_rejection: float = 30.0
- ) -> np.ndarray:
- """
- 动目标显示(MTI)滤波(对抗地物杂波)
- :param pulse_matrix: 多脉冲信号矩阵 [n_pulses, n_samples]
- :param clutter_rejection: 杂波抑制深度(dB)
- :return: 滤波后信号矩阵
- """
- # 构造对消器 (3脉冲对消器)
- canceler = np.array([1, -2, 1])
- output = np.apply_along_axis(
- lambda x: np.convolve(x, canceler, mode='valid'),
- 0, pulse_matrix
- )
- return output * (10 ** (clutter_rejection / 20))
- @staticmethod
- def spoofing_detection(
- pulse_train: List[np.ndarray],
- prf: float,
- max_velocity: float = 300.0
- ) -> np.ndarray:
- """
- 欺骗干扰检测(基于多普勒连续性)
- :param pulse_train: 脉冲序列
- :param prf: 脉冲重复频率
- :param max_velocity: 最大合理速度(m/s)
- :return: 干扰标记数组
- """
- n_pulses = len(pulse_train)
- lambd = 3e8 / 10e9 # 假设雷达波长(10GHz)
- max_doppler = 2 * max_velocity / lambd
- # 计算多普勒谱
- doppler_spectra = []
- for pulse in pulse_train:
- spec = np.abs(fft.fftshift(fft.fft(pulse)))
- doppler_spectra.append(spec)
- # 检测异常多普勒跳变
- is_spoofed = np.zeros(n_pulses, dtype=bool)
- for i in range(1, n_pulses):
- corr = np.correlate(doppler_spectra[i - 1], doppler_spectra[i], 'same')
- peak_shift = np.argmax(corr) - len(corr) // 2
- if abs(peak_shift) > max_doppler / prf * len(corr):
- is_spoofed[i] = True
- return is_spoofed
- # ==================== 使用示例 ====================
- if __name__ == "__main__":
- # 模拟含干扰信号
- fs = 100e6
- t = np.arange(0, 1e-3, 1 / fs)
- true_signal = np.exp(1j * 2 * np.pi * 1e6 * t)
- noise_jam = 0.5 * (np.random.normal(0, 1, len(t)) + 1j * np.random.normal(0, 1, len(t)))
- spoof_jam = 0.3 * np.exp(1j * 2 * np.pi * 3e6 * t)
- rx_signal = true_signal + noise_jam + spoof_jam
- # 示例1:自适应滤波
- filtered = AntiJammingSignalAlgo.adaptive_filter(
- rx_signal,
- reference_signal=noise_jam,
- filter_length=16,
- mu=0.05
- )
- print(f"自适应滤波后SNR提升: {10 * np.log10(np.var(filtered) / np.var(rx_signal - filtered)):.1f} dB")
- # 示例2:CFAR检测
- spectrum = np.abs(fft.fft(rx_signal))
- targets = AntiJammingSignalAlgo.cfar_detection(spectrum)
- print(f"CFAR检测到目标位置: {np.where(targets)[0]}")
- # 示例3:欺骗干扰检测
- pulse_train = [rx_signal[i * 100:(i + 1) * 100] for i in range(10)]
- spoof_flags = AntiJammingSignalAlgo.spoofing_detection(pulse_train, prf=1e3)
- print(f"受干扰脉冲索引: {np.where(spoof_flags)[0]}")
|