anti_jamming_singal_algo_20250411163209.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import numpy as np
  2. from scipy import signal, fft, linalg
  3. from typing import Tuple, List
  4. class AntiJammingSignalAlgo:
  5. """
  6. 雷达抗干扰算法库
  7. 支持对抗噪声干扰、欺骗干扰、DRFM干扰等
  8. """
  9. @staticmethod
  10. def adaptive_filter(
  11. rx_signal: np.ndarray,
  12. reference_signal: np.ndarray,
  13. filter_length: int = 32,
  14. mu: float = 0.01
  15. ) -> np.ndarray:
  16. """
  17. LMS自适应滤波(对抗噪声干扰)
  18. :param rx_signal: 含干扰的接收信号
  19. :param reference_signal: 参考信号(干扰样本)
  20. :param filter_length: 滤波器阶数
  21. :param mu: 收敛因子 (0 < mu < 1)
  22. :return: 滤波后信号
  23. """
  24. n = len(rx_signal)
  25. w = np.zeros(filter_length, dtype=np.complex64)
  26. output = np.zeros(n, dtype=np.complex64)
  27. for k in range(filter_length, n):
  28. x = reference_signal[k - filter_length:k][::-1]
  29. output[k] = rx_signal[k] - np.dot(w, x)
  30. w += mu * output[k] * np.conj(x)
  31. return output
  32. @staticmethod
  33. def pca_denoise(
  34. signal_matrix: np.ndarray,
  35. n_components: int = 3
  36. ) -> np.ndarray:
  37. """
  38. 基于PCA的信号增强(对抗宽带噪声)
  39. :param signal_matrix: 多通道信号矩阵 [n_channels, n_samples]
  40. :param n_components: 保留的主成分数量
  41. :return: 降噪后信号
  42. """
  43. cov_matrix = np.cov(signal_matrix)
  44. eig_vals, eig_vecs = linalg.eigh(cov_matrix)
  45. idx = eig_vals.argsort()[::-1]
  46. principal_components = eig_vecs[:, idx[:n_components]]
  47. return np.dot(principal_components.T, signal_matrix)
  48. @staticmethod
  49. def cfar_detection(
  50. spectrum: np.ndarray,
  51. guard_cells: int = 2,
  52. training_cells: int = 10,
  53. threshold_factor: float = 3.0
  54. ) -> np.ndarray:
  55. """
  56. CFAR目标检测(对抗欺骗干扰)
  57. :param spectrum: 距离/多普勒谱
  58. :param guard_cells: 保护单元数
  59. :param training_cells: 训练单元数
  60. :param threshold_factor: 阈值系数
  61. :return: 目标掩码(True表示目标)
  62. """
  63. mask = np.zeros_like(spectrum, dtype=bool)
  64. n = len(spectrum)
  65. for i in range(n):
  66. left = max(0, i - guard_cells - training_cells)
  67. right = min(n, i + guard_cells + training_cells)
  68. noise_est = np.concatenate([
  69. spectrum[left:i - guard_cells],
  70. spectrum[i + guard_cells:right]
  71. ])
  72. threshold = threshold_factor * np.mean(noise_est)
  73. mask[i] = spectrum[i] > threshold
  74. return mask
  75. @staticmethod
  76. def time_frequency_filter(
  77. rx_signal: np.ndarray,
  78. sample_rate: float,
  79. jamming_freq: float,
  80. bandwidth: float = 1e6
  81. ) -> np.ndarray:
  82. """
  83. 时频联合滤波(对抗点频干扰)
  84. :param rx_signal: 接收信号
  85. :param sample_rate: 采样率
  86. :param jamming_freq: 干扰中心频率
  87. :param bandwidth: 抑制带宽
  88. :return: 滤波后信号
  89. """
  90. # STFT参数
  91. nperseg = 256
  92. noverlap = nperseg // 2
  93. # 计算STFT
  94. f, t, Zxx = signal.stft(rx_signal, fs=sample_rate,
  95. nperseg=nperseg, noverlap=noverlap)
  96. # 构建干扰掩码
  97. jamming_mask = (np.abs(f - jamming_freq) < bandwidth / 2)
  98. # 干扰置零
  99. Zxx_clean = Zxx.copy()
  100. Zxx_clean[jamming_mask, :] = 0
  101. # 逆STFT
  102. _, clean_signal = signal.istft(Zxx_clean, fs=sample_rate,
  103. nperseg=nperseg, noverlap=noverlap)
  104. return clean_signal[:len(rx_signal)]
  105. @staticmethod
  106. def mti_filter(
  107. pulse_matrix: np.ndarray,
  108. clutter_rejection: float = 30.0
  109. ) -> np.ndarray:
  110. """
  111. 动目标显示(MTI)滤波(对抗地物杂波)
  112. :param pulse_matrix: 多脉冲信号矩阵 [n_pulses, n_samples]
  113. :param clutter_rejection: 杂波抑制深度(dB)
  114. :return: 滤波后信号矩阵
  115. """
  116. # 构造对消器 (3脉冲对消器)
  117. canceler = np.array([1, -2, 1])
  118. output = np.apply_along_axis(
  119. lambda x: np.convolve(x, canceler, mode='valid'),
  120. 0, pulse_matrix
  121. )
  122. return output * (10 ** (clutter_rejection / 20))
  123. @staticmethod
  124. def spoofing_detection(
  125. pulse_train: List[np.ndarray],
  126. prf: float,
  127. max_velocity: float = 300.0
  128. ) -> np.ndarray:
  129. """
  130. 欺骗干扰检测(基于多普勒连续性)
  131. :param pulse_train: 脉冲序列
  132. :param prf: 脉冲重复频率
  133. :param max_velocity: 最大合理速度(m/s)
  134. :return: 干扰标记数组
  135. """
  136. n_pulses = len(pulse_train)
  137. lambd = 3e8 / 10e9 # 假设雷达波长(10GHz)
  138. max_doppler = 2 * max_velocity / lambd
  139. # 计算多普勒谱
  140. doppler_spectra = []
  141. for pulse in pulse_train:
  142. spec = np.abs(fft.fftshift(fft.fft(pulse)))
  143. doppler_spectra.append(spec)
  144. # 检测异常多普勒跳变
  145. is_spoofed = np.zeros(n_pulses, dtype=bool)
  146. for i in range(1, n_pulses):
  147. corr = np.correlate(doppler_spectra[i - 1], doppler_spectra[i], 'same')
  148. peak_shift = np.argmax(corr) - len(corr) // 2
  149. if abs(peak_shift) > max_doppler / prf * len(corr):
  150. is_spoofed[i] = True
  151. return is_spoofed
  152. # ==================== 使用示例 ====================
  153. if __name__ == "__main__":
  154. # 模拟含干扰信号
  155. fs = 100e6
  156. t = np.arange(0, 1e-3, 1 / fs)
  157. true_signal = np.exp(1j * 2 * np.pi * 1e6 * t)
  158. noise_jam = 0.5 * (np.random.normal(0, 1, len(t)) + 1j * np.random.normal(0, 1, len(t)))
  159. spoof_jam = 0.3 * np.exp(1j * 2 * np.pi * 3e6 * t)
  160. rx_signal = true_signal + noise_jam + spoof_jam
  161. # 示例1:自适应滤波
  162. filtered = AntiJammingSignalAlgo.adaptive_filter(
  163. rx_signal,
  164. reference_signal=noise_jam,
  165. filter_length=16,
  166. mu=0.05
  167. )
  168. print(f"自适应滤波后SNR提升: {10 * np.log10(np.var(filtered) / np.var(rx_signal - filtered)):.1f} dB")
  169. # 示例2:CFAR检测
  170. spectrum = np.abs(fft.fft(rx_signal))
  171. targets = AntiJammingSignalAlgo.cfar_detection(spectrum)
  172. print(f"CFAR检测到目标位置: {np.where(targets)[0]}")
  173. # 示例3:欺骗干扰检测
  174. pulse_train = [rx_signal[i * 100:(i + 1) * 100] for i in range(10)]
  175. spoof_flags = AntiJammingSignalAlgo.spoofing_detection(pulse_train, prf=1e3)
  176. print(f"受干扰脉冲索引: {np.where(spoof_flags)[0]}")