surveillance_radar.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import numpy as np
  2. from scipy.signal import find_peaks
  3. from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
  4. # ==================== 侦查雷达类 ====================
  5. class SurveillanceRadar:
  6. #初始化函数,传入usrp,输入的通道,输出的通道
  7. def __init__(self, usrp, rx, tx):
  8. self.usrp = usrp
  9. self.rx = rx
  10. self.tx = tx
  11. # 封装一个发送信号的函数
  12. def send_signal(self, tx_signal, duration, center_freq, sample_rate, gain):
  13. # 发送信号并返回发射信号
  14. self.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [self.tx], gain)
  15. print('侦查雷达已发送信号')
  16. return tx_signal
  17. # 封装一个接收信号的函数
  18. def recv_signal(self, num_samples, sample_rate, center_freq):
  19. rx_signal = self.usrp.recv_num_samps(num_samples, center_freq, sample_rate, [self.rx])
  20. print('侦查雷达已接收信号')
  21. return rx_signal
  22. def process_transmit_signal(self, tx_signal: np.ndarray, algorithm: callable, sample_rate: float) -> np.ndarray:
  23. """
  24. 发射端信号处理
  25. :param tx_signal: 发射信号
  26. :param algorithm: 发射端抗干扰算法
  27. :param sample_rate: 采样率
  28. """
  29. #如果为频率捷变算法
  30. if algorithm == AntiJammingSignalAlgo.frequency_agility:
  31. hop_sequence = [(100e6, 0.001), (200e6, 0.001)] # 跳频序列,频率点和驻留时间
  32. processed_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, hop_sequence=hop_sequence)
  33. if algorithm == AntiJammingSignalAlgo.waveform_agility:
  34. waveform_params = {'type': 'LFM', 'bandwidth': 100e6, 'duration': 0.1} # 波形参数
  35. processed_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, waveform_params=waveform_params)
  36. print("processed_signal:", processed_signal)
  37. return processed_signal.astype(np.complex64)
  38. def apply_anti_jamming_processing(self, rx_signal: np.ndarray, algorithm: callable, sample_rate: float, **kwargs) -> np.ndarray:
  39. """
  40. 应用抗干扰信号处理
  41. :param rx_signal: 接收信号
  42. :param algorithm: 接收端抗干扰算法
  43. :param sample_rate: 采样率
  44. """
  45. processed_signal = algorithm(rx_signal=rx_signal, sample_rate=sample_rate, **kwargs)
  46. return processed_signal.astype(np.complex64)
  47. # 分析信号,获取目标距离
  48. def analyze_signal(self, rx_signal: np.ndarray, sample_rate: float,
  49. cfar_threshold: float = 20.0) -> list:
  50. """
  51. 分析接收信号并返回目标距离列表
  52. :param rx_signal: 接收信号(复数形式)
  53. :param sample_rate: 采样率(Hz)
  54. :param cfar_threshold: CFAR检测阈值(dB)
  55. :return: 目标距离列表(米)
  56. """
  57. # 1. 去斜处理(Dechirping)
  58. mixed = rx_signal * np.conj(self.tx_signal[:len(rx_signal)])
  59. # 2. 加窗处理(Hamming窗)
  60. window = np.hamming(len(mixed))
  61. windowed = mixed * window
  62. # 3. 距离FFT
  63. range_fft = np.fft.fft(windowed)
  64. spectrum_db = 20 * np.log10(np.abs(range_fft) + 1e-10) # 转换为dB
  65. # 4. 计算距离轴
  66. freq_bins = np.fft.fftfreq(len(spectrum_db), 1/sample_rate)
  67. ranges = (self.c * self.T * freq_bins) / (2 * self.B)
  68. # 5. CFAR目标检测(简化版)
  69. noise_floor = np.median(spectrum_db)
  70. peaks, _ = find_peaks(spectrum_db, height=noise_floor + cfar_threshold)
  71. # 6. 提取目标距离(取前向部分)
  72. valid_peaks = peaks[peaks <= len(ranges)//2]
  73. print(f"检测到目标距离:{[abs(ranges[p]) for p in valid_peaks]} 米")
  74. return [abs(ranges[p]) for p in valid_peaks]