import uhd import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from queue import Queue, Full from threading import Thread, Event import argparse import time # 确保中文显示正常 class USRPTransceiver: def __init__(self, args): self.args = args self.usrp = None self.tx_streamer = None self.rx_streamer = None self.stop_event = Event() # 固定长度缓冲队列 (300个信号) self.rx_signal_queue = Queue(maxsize=300) self.tx_signal_queue = Queue(maxsize=300) self.total_tx_signals = 0 # 总发送信号数 self.total_rx_signals = 0 # 总接收信号数 # 可视化相关 self.fig, (self.ax_tx, self.ax_rx) = plt.subplots(2, 1, figsize=(10, 8)) self.tx_line, = self.ax_tx.plot([], [], 'bo-', lw=1, label='发送信号强度') self.rx_line, = self.ax_rx.plot([], [], 'ro-', lw=1, label='接收信号强度') # 设置图表 self.ax_tx.set_title('发送信号强度 (中间100个)') self.ax_rx.set_title('接收信号强度 (中间100个)') self.ax_tx.set_xlabel('信号序号') self.ax_rx.set_xlabel('信号序号') self.ax_tx.set_ylabel('信号强度') self.ax_rx.set_ylabel('信号强度') self.ax_tx.grid(True) self.ax_rx.grid(True) self.ax_tx.legend() self.ax_rx.legend() self.fig.tight_layout() def connect(self): """连接并配置USRP设备""" try: device_args = f"addr={self.args.ip},type=x4xx" self.usrp = uhd.usrp.MultiUSRP(device_args) print(f"成功连接到设备: {self.usrp.get_pp_string()}") # 配置发送端 self.usrp.set_tx_rate(self.args.rate, self.args.channel) self.usrp.set_tx_freq(uhd.types.TuneRequest(self.args.freq), self.args.channel) self.usrp.set_tx_gain(self.args.gain, self.args.channel) self.usrp.set_tx_antenna("TX/RX", self.args.channel) # 配置接收端 self.usrp.set_rx_rate(self.args.rate, self.args.channel) self.usrp.set_rx_freq(uhd.types.TuneRequest(self.args.freq), self.args.channel) self.usrp.set_rx_gain(self.args.gain, self.args.channel) self.usrp.set_rx_antenna("RX2", self.args.channel) # 打印实际配置 print(f"实际采样率: TX = {self.usrp.get_tx_rate(self.args.channel) / 1e6} MHz, " f"RX = {self.usrp.get_rx_rate(self.args.channel) / 1e6} MHz") print(f"实际中心频率: TX = {self.usrp.get_tx_freq(self.args.channel) / 1e9} GHz, " f"RX = {self.usrp.get_rx_freq(self.args.channel) / 1e9} GHz") print(f"实际增益: TX = {self.usrp.get_tx_gain(self.args.channel)} dB, " f"RX = {self.usrp.get_rx_gain(self.args.channel)} dB") except Exception as e: print(f"设备连接错误: {e}") raise def setup_streams(self): """设置发送和接收流""" # 设置发送流 tx_stream_args = uhd.usrp.StreamArgs("fc32", "sc16") tx_stream_args.channels = [self.args.channel] self.tx_streamer = self.usrp.get_tx_stream(tx_stream_args) # 设置接收流 rx_stream_args = uhd.usrp.StreamArgs("fc32", "sc16") rx_stream_args.channels = [self.args.channel] self.rx_streamer = self.usrp.get_rx_stream(rx_stream_args) def generate_signal(self, signal_id): """生成具有随机强度的测试信号""" t = np.arange(int(self.args.rate * 0.1)) / self.args.rate # 0.1秒的数据 # 基础幅度为0.5,添加随机波动 (±0.3) amplitude = 0.5 + 0.3 * np.random.randn() amplitude = max(0.1, min(0.9, amplitude)) # 限制在合理范围内 # 添加随机相位偏移 phase = np.random.uniform(0, 2 * np.pi) # 添加随机频率偏移 (±50kHz) freq_offset = np.random.uniform(-50e3, 50e3) signal = amplitude * np.exp(2j * np.pi * (1e6 + freq_offset) * t + 1j * phase) return signal.astype(np.complex64), amplitude def tx_worker(self): """发送线程函数""" try: tx_metadata = uhd.types.TXMetadata() tx_metadata.start_of_burst = True tx_metadata.end_of_burst = False tx_metadata.has_time_spec = False print("发送线程已启动") while not self.stop_event.is_set(): # 生成具有随机强度的信号 signal_id = self.total_tx_signals + 1 signal, amplitude = self.generate_signal(signal_id) # 发送信号 self.tx_streamer.send(signal, tx_metadata) # 将信号强度加入队列 (ID, 强度) try: self.tx_signal_queue.put((signal_id, amplitude), block=False) except Full: # 队列已满,移除最早的元素 self.tx_signal_queue.get() self.tx_signal_queue.put((signal_id, amplitude)) self.total_tx_signals += 1 print(f"\r已发送信号: {self.total_tx_signals}", end="") # 控制发送频率 time.sleep(0.05) # 发送结束标记 tx_metadata.end_of_burst = True self.tx_streamer.send(np.zeros((1,), dtype=np.complex64), tx_metadata) print("\n发送线程已停止") except Exception as e: print(f"发送线程错误: {e}") def rx_worker(self): """接收线程函数""" try: # 启动接收流 stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_cmd.stream_now = True self.rx_streamer.issue_stream_cmd(stream_cmd) buffer_size = 500000 rx_buffer = np.zeros((buffer_size,), dtype=np.complex64) rx_metadata = uhd.types.RXMetadata() print("接收线程已启动") overflow_count = 0 while not self.stop_event.is_set(): # 接收数据 num_recv = self.rx_streamer.recv(rx_buffer, rx_metadata) if rx_metadata.error_code != uhd.types.RXMetadataErrorCode.none: if rx_metadata.error_code == uhd.types.RXMetadataErrorCode.overflow: overflow_count += 1 print(f"\r溢出错误计数: {overflow_count}", end="") else: print(f"接收错误: {rx_metadata.strerror()}") print(f"接收错误: {rx_metadata.error_code}") continue if num_recv > 0: print(f"接收到的信号: 形状={rx_buffer.shape}, 类型={rx_buffer.dtype}") # 计算接收信号的强度 (RMS) signal_rms = np.sqrt(np.mean(np.abs(rx_buffer[:num_recv]) ** 2)) signal_id = self.total_rx_signals + 1 # 将信号强度加入队列 (ID, 强度) try: self.rx_signal_queue.put((signal_id, signal_rms), block=False) except Full: # 队列已满,移除最早的元素 self.rx_signal_queue.get() self.rx_signal_queue.put((signal_id, signal_rms)) self.total_rx_signals += 1 # 停止接收流 stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) self.rx_streamer.issue_stream_cmd(stream_cmd) print(f"\n接收线程已停止 - 溢出错误总数: {overflow_count}") except Exception as e: print(f"接收线程错误: {e}") def init_animation(self): """初始化动画""" self.ax_tx.set_xlim(0, 100) self.ax_rx.set_xlim(0, 100) self.ax_tx.set_ylim(0, 1.0) # 强度范围 self.ax_rx.set_ylim(0, 1.0) # 强度范围 return self.tx_line, self.rx_line def get_middle_signals(self, queue): """从队列中提取中间100个信号""" if queue.qsize() < 100: return [] # 转换为列表 items = list(queue.queue) # 计算中间100个元素的索引范围 start_idx = max(0, len(items) // 2 - 50) end_idx = min(len(items), start_idx + 100) return items[start_idx:end_idx] def update_plot(self, frame): """更新绘图""" try: # 获取中间100个发送信号 tx_signals = self.get_middle_signals(self.tx_signal_queue) if tx_signals: # 调整X轴为相对序号 (1-100) x = list(range(1, len(tx_signals) + 1)) y = [item[1] for item in tx_signals] self.tx_line.set_data(x, y) self.ax_tx.set_xlim(1, len(tx_signals)) self.ax_tx.set_ylim(max(0, min(y) - 0.05), max(y) + 0.05) # 获取中间100个接收信号 rx_signals = self.get_middle_signals(self.rx_signal_queue) if rx_signals: # 调整X轴为相对序号 (1-100) x = list(range(1, len(rx_signals) + 1)) y = [item[1] for item in rx_signals] self.rx_line.set_data(x, y) self.ax_rx.set_xlim(1, len(rx_signals)) self.ax_rx.set_ylim(max(0, min(y) - 0.05), max(y) + 0.05) # 更新标题 self.fig.suptitle(f"USRP信号强度监控 - 总发送: {self.total_tx_signals}, 总接收: {self.total_rx_signals}") except Exception as e: print(f"绘图更新错误: {e}") return self.tx_line, self.rx_line def run(self): """运行收发器""" try: self.connect() self.setup_streams() # 创建并启动线程 tx_thread = Thread(target=self.tx_worker) rx_thread = Thread(target=self.rx_worker) tx_thread.start() rx_thread.start() # 启动实时绘图 ani = FuncAnimation(self.fig, self.update_plot, init_func=self.init_animation, interval=200, blit=True) plt.show() # 等待线程结束 self.stop_event.set() tx_thread.join() rx_thread.join() print(f"完成收发 - 发送: {self.total_tx_signals}, 接收: {self.total_rx_signals}") except KeyboardInterrupt: print("用户中断,正在停止...") self.stop_event.set() finally: # 清理资源 if self.usrp: self.usrp = None print("资源已清理") def main(): parser = argparse.ArgumentParser(description='USRP多线程信号收发器') parser.add_argument('--ip', default='192.168.10.2', help='USRP IP地址') parser.add_argument('--rate', type=float, default=10e6, help='采样率 (Hz)') parser.add_argument('--freq', type=float, default=6e9, help='中心频率 (Hz)') parser.add_argument('--gain', type=float, default=50, help='增益 (dB)') parser.add_argument('--channel', type=int, default=0, help='通道号') args = parser.parse_args() transceiver = USRPTransceiver(args) transceiver.run() if __name__ == "__main__": main()