123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from pickletools import read_string1
- import numpy as np
- import numpy as np
- from scipy.signal import find_peaks
- from algo import anti_jamming_signal_algo
- from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
- from threading import Thread, Event
- import queue
- import threading
- import time
- import uhd
- # ==================== 侦查雷达类 ====================
- class SurveillanceRadar:
- SAMPLING_RATE = 10e6
- CENTER_FREQ = 6e9
- GAIN = 50
- def __init__(self, usrp, rx_channel=0, tx_channel=0):
- self.usrp = usrp
- self.stop_event = Event()
- self.tx_buffer = queue.Queue(maxsize=10)
- self.rx_buffer = queue.Queue(maxsize=10)
- # 流配置参数(与demo.py保持一致)
- tx_stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
- tx_stream_args.channels = [tx_channel]
- self.tx_stream = self.usrp.get_tx_stream(tx_stream_args)
- rx_stream_args = uhd.usrp.StreamArgs('fc32','sc16')
- rx_stream_args.channels = [rx_channel]
- self.rx_stream = self.usrp.get_rx_stream(rx_stream_args)
- self.samps_per_packet = 100
- # self.stream_timeout = 3.0
- # 配置设备参数(与demo.py同步)
- self.usrp.set_tx_rate(self.SAMPLING_RATE)
- self.usrp.set_tx_freq(uhd.types.TuneRequest(self.CENTER_FREQ),tx_channel)
- self.usrp.set_tx_gain(self.GAIN)
- self.usrp.set_tx_antenna("TX/RX", tx_channel)
- self.usrp.set_rx_rate(self.SAMPLING_RATE)
- self.usrp.set_rx_freq(uhd.types.TuneRequest(self.CENTER_FREQ),rx_channel)
- self.usrp.set_rx_gain(self.GAIN)
- self.usrp.set_rx_antenna("RX2", rx_channel)
- self.anti_jamming_algorithm = None # 存储抗干扰算法
- self.tx_signal = None # 存储干扰信号
- self.rx_signal = None # 接收到的信号
- self.jammed_signal = np.zeros((self.samps_per_packet,), dtype=np.complex64) # 初始化干扰信号缓冲区
- self.processed_signal = None # 存储处理后的信号
- self.tx_buffer = queue.Queue(maxsize=100)
- self.rx_buffer = queue.Queue(maxsize=100)
- self.processing_thread = None
- def tx_worker(self):
- """发送线程函数"""
- try:
- metadata = uhd.types.TXMetadata()
- metadata.start_of_burst = True
- metadata.end_of_burst = False
- metadata.has_time_spec = False
- print("发送线程已启动")
- while not self.stop_event.is_set():
- # 生成随机信号
- self.tx_signal = np.random.randn(100)
- # 应用抗干扰算法
- samples = self.process_transmit_signal(self.tx_signal, self.anti_jamming_algorithm, self.SAMPLING_RATE)
- # print("samples:", samples)
- # 发送信号
- self.tx_stream.send(samples, metadata)
- # 将100个信号依次队列
- #如果队列已满,则去对头元素
- if self.tx_buffer.full():
- self.tx_buffer.get()
- self.tx_buffer.put(samples.copy())
- # 控制发送频率
- time.sleep(0.05)
- except Exception as e:
- self.stop_event.set()
- metadata.end_of_burst = True
- self.tx_stream.send(np.zeros((1,), dtype=np.complex64), metadata)
- raise Exception(f"发送线程出错: {e}")
-
-
- def rx_worker(self):
- # try:
- # 启动接收流
- stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
- stream_cmd.stream_now = True
- self.rx_stream.issue_stream_cmd(stream_cmd)
- metadata = uhd.types.RXMetadata()
- buff = np.zeros((self.samps_per_packet,), dtype=np.complex64)
- while not self.stop_event.is_set():
- num_rx = self.rx_stream.recv(buff, metadata)
- if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
- print(f"接收错误: {metadata.error_code}")
- continue
- # 将100个信号依次队列
- if num_rx > 0:
- #如果队列已满,则去对头元素
- if self.rx_buffer.full():
- self.rx_buffer.get()
- self.rx_buffer.put(buff.copy())
- # except Exception as e:
- # stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
- # self.usrp.issue_stream_cmd(stream_cmd)
- # raise Exception(f"接收线程出错: {e}")
- def process_transmit_signal(self, tx_signal: np.ndarray, algorithm: callable, sample_rate: float) -> np.ndarray:
- """
- 发射端信号处理
- :param tx_signal: 发射信号
- :param algorithm: 发射端抗干扰算法
- :param sample_rate: 采样率
- """
-
- #如果为频率捷变算法
- if algorithm == AntiJammingSignalAlgo.frequency_agility:
- hop_sequence = [(10e6, 0.001), (20e6, 0.001)] # 跳频序列,频率点和驻留时间
- process_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, hop_sequence=hop_sequence)
-
- elif algorithm == AntiJammingSignalAlgo.waveform_agility:
- waveform_params = {'type': 'LFM', 'bandwidth': 100e6, 'duration': 0.1} # 波形参数
- process_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, waveform_params=waveform_params)
- else:
- process_signal = tx_signal
- # print("processed_signal:", processed_signal)
- return process_signal.astype(np.complex64)
-
-
- def apply_anti_jamming_processing(self, rx_signal: np.ndarray, algorithm: callable, sample_rate: float, **kwargs) -> np.ndarray:
- """
- 应用抗干扰信号处理
- :param rx_signal: 接收信号
- :param algorithm: 接收端抗干扰算法
- :param sample_rate: 采样率
- """
-
- if algorithm == AntiJammingSignalAlgo.polarization_filter:
- process_signal = algorithm(rx_signal=rx_signal, sample_rate=sample_rate, **kwargs)
- else:
- process_signal = rx_signal
- return process_signal.astype(np.complex64)
- def anti_jamming_worker(self):
- while not self.stop_event.is_set():
- if not self.rx_buffer.empty():
- rx_signal = self.rx_buffer.get()
- combined_signal = rx_signal + self.jammed_signal
- # print("原始信号:", rx_signal)
- # print("干扰信号:", self.jammed_signal)
- # print("合成信号:", combined_signal)
- self.processed_signal = self.apply_anti_jamming_processing(
- combined_signal,
- self.anti_jamming_algorithm,
- self.SAMPLING_RATE
- )
- # print("处理后的信号:", self.processed_signal)
-
- def run(self):
- """启动雷达"""
- print("雷达启动")
- tx_thread = Thread(target=self.tx_worker)
- rx_thread = Thread(target=self.rx_worker)
- tx_thread.start()
- rx_thread.start()
- self.processing_thread = threading.Thread(target=self.anti_jamming_worker)
- self.processing_thread.start()
- print("雷达运行完毕")
- def inject(self,anti_jam_algorithm,jammed_signal):
- self.jammed_signal = jammed_signal
- self.anti_jamming_algorithm = anti_jam_algorithm
|