123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import uhd
- import numpy as np
- import matplotlib.pyplot as plt
- from matplotlib.animation import FuncAnimation
- from queue import Queue, Full
- from threading import Thread, Event
- import argparse
- import time
- # 确保中文显示正常
- class USRPTransceiver:
- def __init__(self, args):
- self.args = args
- self.usrp = None
- self.tx_streamer = None
- self.rx_streamer = None
- self.stop_event = Event()
- # 固定长度缓冲队列 (300个信号)
- self.rx_signal_queue = Queue(maxsize=300)
- self.tx_signal_queue = Queue(maxsize=300)
- self.total_tx_signals = 0 # 总发送信号数
- self.total_rx_signals = 0 # 总接收信号数
- # 可视化相关
- self.fig, (self.ax_tx, self.ax_rx) = plt.subplots(2, 1, figsize=(10, 8))
- self.tx_line, = self.ax_tx.plot([], [], 'bo-', lw=1, label='发送信号强度')
- self.rx_line, = self.ax_rx.plot([], [], 'ro-', lw=1, label='接收信号强度')
- # 设置图表
- self.ax_tx.set_title('发送信号强度 (中间100个)')
- self.ax_rx.set_title('接收信号强度 (中间100个)')
- self.ax_tx.set_xlabel('信号序号')
- self.ax_rx.set_xlabel('信号序号')
- self.ax_tx.set_ylabel('信号强度')
- self.ax_rx.set_ylabel('信号强度')
- self.ax_tx.grid(True)
- self.ax_rx.grid(True)
- self.ax_tx.legend()
- self.ax_rx.legend()
- self.fig.tight_layout()
- def connect(self):
- """连接并配置USRP设备"""
- try:
- device_args = f"addr={self.args.ip},type=x4xx"
- self.usrp = uhd.usrp.MultiUSRP(device_args)
- print(f"成功连接到设备: {self.usrp.get_pp_string()}")
- # 配置发送端
- self.usrp.set_tx_rate(self.args.rate, self.args.channel)
- self.usrp.set_tx_freq(uhd.types.TuneRequest(self.args.freq), self.args.channel)
- self.usrp.set_tx_gain(self.args.gain, self.args.channel)
- self.usrp.set_tx_antenna("TX/RX", self.args.channel)
- # 配置接收端
- self.usrp.set_rx_rate(self.args.rate, self.args.channel)
- self.usrp.set_rx_freq(uhd.types.TuneRequest(self.args.freq), self.args.channel)
- self.usrp.set_rx_gain(self.args.gain, self.args.channel)
- self.usrp.set_rx_antenna("RX2", self.args.channel)
- # 打印实际配置
- print(f"实际采样率: TX = {self.usrp.get_tx_rate(self.args.channel) / 1e6} MHz, "
- f"RX = {self.usrp.get_rx_rate(self.args.channel) / 1e6} MHz")
- print(f"实际中心频率: TX = {self.usrp.get_tx_freq(self.args.channel) / 1e9} GHz, "
- f"RX = {self.usrp.get_rx_freq(self.args.channel) / 1e9} GHz")
- print(f"实际增益: TX = {self.usrp.get_tx_gain(self.args.channel)} dB, "
- f"RX = {self.usrp.get_rx_gain(self.args.channel)} dB")
- except Exception as e:
- print(f"设备连接错误: {e}")
- raise
- def setup_streams(self):
- """设置发送和接收流"""
- # 设置发送流
- tx_stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
- tx_stream_args.channels = [self.args.channel]
- self.tx_streamer = self.usrp.get_tx_stream(tx_stream_args)
- # 设置接收流
- rx_stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
- rx_stream_args.channels = [self.args.channel]
- self.rx_streamer = self.usrp.get_rx_stream(rx_stream_args)
- def generate_signal(self, signal_id):
- """生成具有随机强度的测试信号"""
- t = np.arange(int(self.args.rate * 0.1)) / self.args.rate # 0.1秒的数据
- # 基础幅度为0.5,添加随机波动 (±0.3)
- amplitude = 0.5 + 0.3 * np.random.randn()
- amplitude = max(0.1, min(0.9, amplitude)) # 限制在合理范围内
- # 添加随机相位偏移
- phase = np.random.uniform(0, 2 * np.pi)
- # 添加随机频率偏移 (±50kHz)
- freq_offset = np.random.uniform(-50e3, 50e3)
- signal = amplitude * np.exp(2j * np.pi * (1e6 + freq_offset) * t + 1j * phase)
- return signal.astype(np.complex64), amplitude
- def tx_worker(self):
- """发送线程函数"""
- try:
- tx_metadata = uhd.types.TXMetadata()
- tx_metadata.start_of_burst = True
- tx_metadata.end_of_burst = False
- tx_metadata.has_time_spec = False
- print("发送线程已启动")
- while not self.stop_event.is_set():
- # 生成具有随机强度的信号
- signal_id = self.total_tx_signals + 1
- signal, amplitude = self.generate_signal(signal_id)
- # 发送信号
- self.tx_streamer.send(signal, tx_metadata)
- # 将信号强度加入队列 (ID, 强度)
- try:
- self.tx_signal_queue.put((signal_id, amplitude), block=False)
- except Full:
- # 队列已满,移除最早的元素
- self.tx_signal_queue.get()
- self.tx_signal_queue.put((signal_id, amplitude))
- self.total_tx_signals += 1
- print(f"\r已发送信号: {self.total_tx_signals}", end="")
- # 控制发送频率
- time.sleep(0.05)
- # 发送结束标记
- tx_metadata.end_of_burst = True
- self.tx_streamer.send(np.zeros((1,), dtype=np.complex64), tx_metadata)
- print("\n发送线程已停止")
- except Exception as e:
- print(f"发送线程错误: {e}")
- def rx_worker(self):
- """接收线程函数"""
- try:
- # 启动接收流
- stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
- stream_cmd.stream_now = True
- self.rx_streamer.issue_stream_cmd(stream_cmd)
- buffer_size = 500000
- rx_buffer = np.zeros((buffer_size,), dtype=np.complex64)
- rx_metadata = uhd.types.RXMetadata()
- print("接收线程已启动")
- overflow_count = 0
- while not self.stop_event.is_set():
- # 接收数据
- num_recv = self.rx_streamer.recv(rx_buffer, rx_metadata)
- if rx_metadata.error_code != uhd.types.RXMetadataErrorCode.none:
- if rx_metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
- overflow_count += 1
- print(f"\r溢出错误计数: {overflow_count}", end="")
- else:
- print(f"接收错误: {rx_metadata.strerror()}")
- print(f"接收错误: {rx_metadata.error_code}")
- continue
- if num_recv > 0:
- print(f"接收到的信号: 形状={rx_buffer.shape}, 类型={rx_buffer.dtype}")
- # 计算接收信号的强度 (RMS)
- signal_rms = np.sqrt(np.mean(np.abs(rx_buffer[:num_recv]) ** 2))
- print(f"\r接收信号强度: {signal_rms:.3f}")
- signal_id = self.total_rx_signals + 1
- # 将信号强度加入队列 (ID, 强度)
- try:
- self.rx_signal_queue.put((signal_id, signal_rms), block=False)
- except Full:
- # 队列已满,移除最早的元素
- self.rx_signal_queue.get()
- self.rx_signal_queue.put((signal_id, signal_rms))
- self.total_rx_signals += 1
- # 停止接收流
- stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
- self.rx_streamer.issue_stream_cmd(stream_cmd)
- print(f"\n接收线程已停止 - 溢出错误总数: {overflow_count}")
- except Exception as e:
- print(f"接收线程错误: {e}")
- def init_animation(self):
- """初始化动画"""
- self.ax_tx.set_xlim(0, 100)
- self.ax_rx.set_xlim(0, 100)
- self.ax_tx.set_ylim(0, 1.0) # 强度范围
- self.ax_rx.set_ylim(0, 1.0) # 强度范围
- return self.tx_line, self.rx_line
- def get_middle_signals(self, queue):
- """从队列中提取中间100个信号"""
- if queue.qsize() < 100:
- return []
- # 转换为列表
- items = list(queue.queue)
- # 计算中间100个元素的索引范围
- start_idx = max(0, len(items) // 2 - 50)
- end_idx = min(len(items), start_idx + 100)
- return items[start_idx:end_idx]
- def update_plot(self, frame):
- """更新绘图"""
- try:
- # 获取中间100个发送信号
- tx_signals = self.get_middle_signals(self.tx_signal_queue)
- if tx_signals:
- # 调整X轴为相对序号 (1-100)
- x = list(range(1, len(tx_signals) + 1))
- y = [item[1] for item in tx_signals]
- self.tx_line.set_data(x, y)
- self.ax_tx.set_xlim(1, len(tx_signals))
- self.ax_tx.set_ylim(max(0, min(y) - 0.05), max(y) + 0.05)
- # 获取中间100个接收信号
- rx_signals = self.get_middle_signals(self.rx_signal_queue)
- if rx_signals:
- # 调整X轴为相对序号 (1-100)
- x = list(range(1, len(rx_signals) + 1))
- y = [item[1] for item in rx_signals]
- self.rx_line.set_data(x, y)
- self.ax_rx.set_xlim(1, len(rx_signals))
- self.ax_rx.set_ylim(max(0, min(y) - 0.05), max(y) + 0.05)
- # 更新标题
- self.fig.suptitle(f"USRP信号强度监控 - 总发送: {self.total_tx_signals}, 总接收: {self.total_rx_signals}")
- except Exception as e:
- print(f"绘图更新错误: {e}")
- return self.tx_line, self.rx_line
- def run(self):
- """运行收发器"""
- try:
- self.connect()
- self.setup_streams()
- # 创建并启动线程
- tx_thread = Thread(target=self.tx_worker)
- rx_thread = Thread(target=self.rx_worker)
- tx_thread.start()
- rx_thread.start()
- # 启动实时绘图
- ani = FuncAnimation(self.fig, self.update_plot, init_func=self.init_animation,
- interval=200, blit=True)
- plt.show()
- # 等待线程结束
- self.stop_event.set()
- tx_thread.join()
- rx_thread.join()
- print(f"完成收发 - 发送: {self.total_tx_signals}, 接收: {self.total_rx_signals}")
- except KeyboardInterrupt:
- print("用户中断,正在停止...")
- self.stop_event.set()
- finally:
- # 清理资源
- if self.usrp:
- self.usrp = None
- print("资源已清理")
- def main():
- parser = argparse.ArgumentParser(description='USRP多线程信号收发器')
- parser.add_argument('--ip', default='192.168.10.2', help='USRP IP地址')
- parser.add_argument('--rate', type=float, default=5e6, help='采样率 (Hz)')
- parser.add_argument('--freq', type=float, default=6e9, help='中心频率 (Hz)')
- parser.add_argument('--gain', type=float, default=50, help='增益 (dB)')
- parser.add_argument('--channel', type=int, default=0, help='通道号')
- args = parser.parse_args()
- transceiver = USRPTransceiver(args)
- transceiver.run()
- if __name__ == "__main__":
- main()
|