123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- from math import e
- from pickletools import read_string1
- from socket import timeout
- import numpy as np
- import numpy as np
- from scipy.signal import find_peaks
- from algo import anti_jamming_signal_algo
- from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
- from threading import Thread, Event
- import queue
- import threading
- import time
- import uhd
- # ==================== 侦查雷达类 ====================
- class SurveillanceRadar:
- SAMPLING_RATE = 10e6
- CENTER_FREQ = 6e9
- GAIN = 50
- def __init__(self, usrp, rx_channel=0, tx_channel=0):
- self.usrp = usrp
- self.stop_event = Event()
- self.tx_buffer = queue.Queue(maxsize=300)
- self.rx_buffer = queue.Queue(maxsize=300)
- # 流配置参数(与demo.py保持一致)
- tx_stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
- tx_stream_args.channels = [tx_channel]
- self.tx_stream = self.usrp.get_tx_stream(tx_stream_args)
- rx_stream_args = uhd.usrp.StreamArgs('fc32','sc16')
- rx_stream_args.channels = [rx_channel]
- self.rx_stream = self.usrp.get_rx_stream(rx_stream_args)
- self.samps_per_packet = 500000
- # self.stream_timeout = 3.0
- # 配置设备参数(与demo.py同步)
- self.usrp.set_tx_rate(self.SAMPLING_RATE)
- self.usrp.set_tx_freq(uhd.types.TuneRequest(self.CENTER_FREQ),tx_channel)
- self.usrp.set_tx_gain(self.GAIN)
- self.usrp.set_tx_antenna("TX/RX", tx_channel)
- self.usrp.set_rx_rate(self.SAMPLING_RATE)
- self.usrp.set_rx_freq(uhd.types.TuneRequest(self.CENTER_FREQ),rx_channel)
- self.usrp.set_rx_gain(self.GAIN)
- self.usrp.set_rx_antenna("RX2", rx_channel)
- self.anti_jamming_algorithm = None # 存储抗干扰算法
- self.tx_signal = None # 存储干扰信号
- self.rx_signal = None # 接收到的信号
- self.jammed_signal = np.zeros((self.samps_per_packet,), dtype=np.complex64) # 初始化干扰信号缓冲区
- self.processed_signal = None # 存储处理后的信号
- self.tx_buffer = queue.Queue(maxsize=100)
- self.rx_buffer = queue.Queue(maxsize=100)
- self.processing_thread = None
- def generate_signal(self):
- """生成具有随机强度的测试信号"""
- t = np.arange(int(SurveillanceRadar.SAMPLING_RATE * 0.1)) / SurveillanceRadar.SAMPLING_RATE # 0.1秒的数据
- # 基础幅度为0.5,添加随机波动 (±0.3)
- amplitude = 0.5 + 0.3 * np.random.randn()
- amplitude = max(0.1, min(0.9, amplitude)) # 限制在合理范围内
- # 添加随机相位偏移
- phase = np.random.uniform(0, 2 * np.pi)
- # 添加随机频率偏移 (±50kHz)
- freq_offset = np.random.uniform(-50e3, 50e3)
- signal = amplitude * np.exp(2j * np.pi * (1e6 + freq_offset) * t + 1j * phase)
- return signal.astype(np.complex64), amplitude
- def tx_worker(self):
- """发送线程函数"""
- metadata = uhd.types.TXMetadata()
- metadata.start_of_burst = True
- metadata.end_of_burst = False
- metadata.has_time_spec = False
- print("发送线程已启动")
- while not self.stop_event.is_set():
- try:
- # 生成随机信号
- signal, amplitude = self.generate_signal()
- self.tx_signal = signal
- # 应用抗干扰算法
- samples = self.process_transmit_signal(self.tx_signal, self.anti_jamming_algorithm, self.SAMPLING_RATE)
- # print("samples:", samples)
- # 发送信号
- self.tx_stream.send(signal, metadata)
- print("数据已发送")
- # 将100个信号依次队列
- #如果队列已满,则去对头元素
- if self.tx_buffer.full():
- self.tx_buffer.get()
- self.tx_buffer.put(samples.copy())
- # 控制发送频率
- time.sleep(0.05)
- except Exception as e:
- # self.stop_event.set()
- # metadata.end_of_burst = True
- # self.tx_stream.send(np.zeros((1,), dtype=np.complex64), metadata)
- # raise Exception(f"发送线程出错: {e}")
- print(f"发送线程出错: {e}")
-
-
-
- def rx_worker(self):
-
- # 启动接收流
- stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
- stream_cmd.stream_now = True
- self.rx_stream.issue_stream_cmd(stream_cmd)
- metadata = uhd.types.RXMetadata()
- buff = np.zeros((self.samps_per_packet,), dtype=np.complex64)
- while not self.stop_event.is_set():
- try:
- num_rx = self.rx_stream.recv(buff, metadata)
- if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
- print(f"接收错误: {metadata.error_code}")
- continue
- # 将100个信号依次队列
- if num_rx > 0:
- #如果队列已满,则去对头元素
- if self.rx_buffer.full():
- self.rx_buffer.get()
- self.rx_buffer.put(buff.copy()[:num_rx])
- except Exception as e:
- # stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
- # self.usrp.issue_stream_cmd(stream_cmd)
- # raise Exception(f"接收线程出错: {e}")
- print(f"接收线程出错: {e}")
- def process_transmit_signal(self, tx_signal: np.ndarray, algorithm: callable, sample_rate: float) -> np.ndarray:
- """
- 发射端信号处理
- :param tx_signal: 发射信号
- :param algorithm: 发射端抗干扰算法
- :param sample_rate: 采样率
- """
-
- #如果为频率捷变算法
- if algorithm == AntiJammingSignalAlgo.frequency_agility:
- hop_sequence = [(10e6, 0.001), (20e6, 0.001)] # 跳频序列,频率点和驻留时间
- process_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, hop_sequence=hop_sequence)
-
- elif algorithm == AntiJammingSignalAlgo.waveform_agility:
- waveform_params = {'type': 'LFM', 'bandwidth': 100e6, 'duration': 0.1} # 波形参数
- process_signal = algorithm(tx_signal=tx_signal, sample_rate=sample_rate, waveform_params=waveform_params)
- else:
- process_signal = tx_signal
- # print("processed_signal:", processed_signal)
- return process_signal.astype(np.complex64)
-
-
- def apply_anti_jamming_processing(self, rx_signal: np.ndarray, algorithm: callable, sample_rate: float, **kwargs) -> np.ndarray:
- """
- 应用抗干扰信号处理
- :param rx_signal: 接收信号
- :param algorithm: 接收端抗干扰算法
- :param sample_rate: 采样率
- """
- # 极化参数配置
- polar_params = {
- 'angle': 45, # 极化角(单位:度)
- 'ellipticity': 0.8 # 椭圆率(0~1之间)
- }
-
- if algorithm == AntiJammingSignalAlgo.polarization_filter:
- process_signal = algorithm(rx_signal=rx_signal, sample_rate=sample_rate, polarization_params=polar_params)
- else:
- process_signal = rx_signal
- return process_signal.astype(np.complex64)
- def anti_jamming_worker(self):
- # try:
- while not self.stop_event.is_set():
- if not self.rx_buffer.qsize() > 150:
- list = np.array([])
- for i in range(100):
- buff = self.rx_buffer.get()
- combined_signal = buff
- # processed_signal = self.apply_anti_jamming_processing(
- # combined_signal,
- # self.anti_jamming_algorithm,
- # self.SAMPLING_RATE
- # )
- # 计算接收信号的强度 (RMS)
- signal_rms = np.sqrt(np.mean(np.abs(buff) ** 2))
- print(f"\r接收信号强度: {signal_rms:.3f}")
- list = np.append(list,signal_rms)
- print("list:",list.tolist())
- self.processed_signal = list
- time.sleep(0.05)
- # except Exception as e:
- # self.stop_event.set()
- # raise Exception(f"信号处理线程出错: {e}")
-
-
- def run(self):
- """启动雷达"""
- print("雷达启动")
- tx_thread = Thread(target=self.tx_worker)
- rx_thread = Thread(target=self.rx_worker)
- tx_thread.start()
- rx_thread.start()
- self.processing_thread = threading.Thread(target=self.anti_jamming_worker)
- self.processing_thread.start()
- print("雷达运行完毕")
- def inject(self,anti_jam_algorithm,jammed_signal):
- self.jammed_signal = jammed_signal
- self.anti_jamming_algorithm = anti_jam_algorithm
|