demo.py 11 KB


  1. import uhd
  2. import numpy as np
  3. import matplotlib.pyplot as plt
  4. from matplotlib.animation import FuncAnimation
  5. from queue import Queue, Full
  6. from threading import Thread, Event
  7. import argparse
  8. import time
  9. # 确保中文显示正常
  10. class USRPTransceiver:
  11. def __init__(self, args):
  12. self.args = args
  13. self.usrp = None
  14. self.tx_streamer = None
  15. self.rx_streamer = None
  16. self.stop_event = Event()
  17. # 固定长度缓冲队列 (300个信号)
  18. self.rx_signal_queue = Queue(maxsize=300)
  19. self.tx_signal_queue = Queue(maxsize=300)
  20. self.total_tx_signals = 0 # 总发送信号数
  21. self.total_rx_signals = 0 # 总接收信号数
  22. # 可视化相关
  23. self.fig, (self.ax_tx, self.ax_rx) = plt.subplots(2, 1, figsize=(10, 8))
  24. self.tx_line, = self.ax_tx.plot([], [], 'bo-', lw=1, label='发送信号强度')
  25. self.rx_line, = self.ax_rx.plot([], [], 'ro-', lw=1, label='接收信号强度')
  26. # 设置图表
  27. self.ax_tx.set_title('发送信号强度 (中间100个)')
  28. self.ax_rx.set_title('接收信号强度 (中间100个)')
  29. self.ax_tx.set_xlabel('信号序号')
  30. self.ax_rx.set_xlabel('信号序号')
  31. self.ax_tx.set_ylabel('信号强度')
  32. self.ax_rx.set_ylabel('信号强度')
  33. self.ax_tx.grid(True)
  34. self.ax_rx.grid(True)
  35. self.ax_tx.legend()
  36. self.ax_rx.legend()
  37. self.fig.tight_layout()
  38. def connect(self):
  39. """连接并配置USRP设备"""
  40. try:
  41. device_args = f"addr={self.args.ip},type=x4xx"
  42. self.usrp = uhd.usrp.MultiUSRP(device_args)
  43. print(f"成功连接到设备: {self.usrp.get_pp_string()}")
  44. # 配置发送端
  45. self.usrp.set_tx_rate(self.args.rate, self.args.channel)
  46. self.usrp.set_tx_freq(uhd.types.TuneRequest(self.args.freq), self.args.channel)
  47. self.usrp.set_tx_gain(self.args.gain, self.args.channel)
  48. self.usrp.set_tx_antenna("TX/RX", self.args.channel)
  49. # 配置接收端
  50. self.usrp.set_rx_rate(self.args.rate, self.args.channel)
  51. self.usrp.set_rx_freq(uhd.types.TuneRequest(self.args.freq), self.args.channel)
  52. self.usrp.set_rx_gain(self.args.gain, self.args.channel)
  53. self.usrp.set_rx_antenna("RX2", self.args.channel)
  54. # 打印实际配置
  55. print(f"实际采样率: TX = {self.usrp.get_tx_rate(self.args.channel) / 1e6} MHz, "
  56. f"RX = {self.usrp.get_rx_rate(self.args.channel) / 1e6} MHz")
  57. print(f"实际中心频率: TX = {self.usrp.get_tx_freq(self.args.channel) / 1e9} GHz, "
  58. f"RX = {self.usrp.get_rx_freq(self.args.channel) / 1e9} GHz")
  59. print(f"实际增益: TX = {self.usrp.get_tx_gain(self.args.channel)} dB, "
  60. f"RX = {self.usrp.get_rx_gain(self.args.channel)} dB")
  61. except Exception as e:
  62. print(f"设备连接错误: {e}")
  63. raise
  64. def setup_streams(self):
  65. """设置发送和接收流"""
  66. # 设置发送流
  67. tx_stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
  68. tx_stream_args.channels = [self.args.channel]
  69. self.tx_streamer = self.usrp.get_tx_stream(tx_stream_args)
  70. # 设置接收流
  71. rx_stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
  72. rx_stream_args.channels = [self.args.channel]
  73. self.rx_streamer = self.usrp.get_rx_stream(rx_stream_args)
  74. def generate_signal(self, signal_id):
  75. """生成具有随机强度的测试信号"""
  76. t = np.arange(int(self.args.rate * 0.1)) / self.args.rate # 0.1秒的数据
  77. # 基础幅度为0.5,添加随机波动 (±0.3)
  78. amplitude = 0.5 + 0.3 * np.random.randn()
  79. amplitude = max(0.1, min(0.9, amplitude)) # 限制在合理范围内
  80. # 添加随机相位偏移
  81. phase = np.random.uniform(0, 2 * np.pi)
  82. # 添加随机频率偏移 (±50kHz)
  83. freq_offset = np.random.uniform(-50e3, 50e3)
  84. signal = amplitude * np.exp(2j * np.pi * (1e6 + freq_offset) * t + 1j * phase)
  85. return signal.astype(np.complex64), amplitude
  86. def tx_worker(self):
  87. """发送线程函数"""
  88. try:
  89. tx_metadata = uhd.types.TXMetadata()
  90. tx_metadata.start_of_burst = True
  91. tx_metadata.end_of_burst = False
  92. tx_metadata.has_time_spec = False
  93. print("发送线程已启动")
  94. while not self.stop_event.is_set():
  95. # 生成具有随机强度的信号
  96. signal_id = self.total_tx_signals + 1
  97. signal, amplitude = self.generate_signal(signal_id)
  98. # 发送信号
  99. self.tx_streamer.send(signal, tx_metadata)
  100. # 将信号强度加入队列 (ID, 强度)
  101. try:
  102. self.tx_signal_queue.put((signal_id, amplitude), block=False)
  103. except Full:
  104. # 队列已满,移除最早的元素
  105. self.tx_signal_queue.get()
  106. self.tx_signal_queue.put((signal_id, amplitude))
  107. self.total_tx_signals += 1
  108. print(f"\r已发送信号: {self.total_tx_signals}", end="")
  109. # 控制发送频率
  110. time.sleep(0.05)
  111. # 发送结束标记
  112. tx_metadata.end_of_burst = True
  113. self.tx_streamer.send(np.zeros((1,), dtype=np.complex64), tx_metadata)
  114. print("\n发送线程已停止")
  115. except Exception as e:
  116. print(f"发送线程错误: {e}")
  117. def rx_worker(self):
  118. """接收线程函数"""
  119. try:
  120. # 启动接收流
  121. stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
  122. stream_cmd.stream_now = True
  123. self.rx_streamer.issue_stream_cmd(stream_cmd)
  124. buffer_size = 500000
  125. rx_buffer = np.zeros((buffer_size,), dtype=np.complex64)
  126. rx_metadata = uhd.types.RXMetadata()
  127. print("接收线程已启动")
  128. overflow_count = 0
  129. while not self.stop_event.is_set():
  130. # 接收数据
  131. num_recv = self.rx_streamer.recv(rx_buffer, rx_metadata)
  132. if rx_metadata.error_code != uhd.types.RXMetadataErrorCode.none:
  133. if rx_metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
  134. overflow_count += 1
  135. print(f"\r溢出错误计数: {overflow_count}", end="")
  136. else:
  137. print(f"接收错误: {rx_metadata.strerror()}")
  138. print(f"接收错误: {rx_metadata.error_code}")
  139. continue
  140. if num_recv > 0:
  141. print(f"接收到的信号: 形状={rx_buffer.shape}, 类型={rx_buffer.dtype}")
  142. # 计算接收信号的强度 (RMS)
  143. signal_rms = np.sqrt(np.mean(np.abs(rx_buffer[:num_recv]) ** 2))
  144. print(f"\r接收信号强度: {signal_rms:.3f}")
  145. signal_id = self.total_rx_signals + 1
  146. # 将信号强度加入队列 (ID, 强度)
  147. try:
  148. self.rx_signal_queue.put((signal_id, signal_rms), block=False)
  149. except Full:
  150. # 队列已满,移除最早的元素
  151. self.rx_signal_queue.get()
  152. self.rx_signal_queue.put((signal_id, signal_rms))
  153. self.total_rx_signals += 1
  154. # 停止接收流
  155. stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
  156. self.rx_streamer.issue_stream_cmd(stream_cmd)
  157. print(f"\n接收线程已停止 - 溢出错误总数: {overflow_count}")
  158. except Exception as e:
  159. print(f"接收线程错误: {e}")
  160. def init_animation(self):
  161. """初始化动画"""
  162. self.ax_tx.set_xlim(0, 100)
  163. self.ax_rx.set_xlim(0, 100)
  164. self.ax_tx.set_ylim(0, 1.0) # 强度范围
  165. self.ax_rx.set_ylim(0, 1.0) # 强度范围
  166. return self.tx_line, self.rx_line
  167. def get_middle_signals(self, queue):
  168. """从队列中提取中间100个信号"""
  169. if queue.qsize() < 100:
  170. return []
  171. # 转换为列表
  172. items = list(queue.queue)
  173. # 计算中间100个元素的索引范围
  174. start_idx = max(0, len(items) // 2 - 50)
  175. end_idx = min(len(items), start_idx + 100)
  176. return items[start_idx:end_idx]
  177. def update_plot(self, frame):
  178. """更新绘图"""
  179. try:
  180. # 获取中间100个发送信号
  181. tx_signals = self.get_middle_signals(self.tx_signal_queue)
  182. if tx_signals:
  183. # 调整X轴为相对序号 (1-100)
  184. x = list(range(1, len(tx_signals) + 1))
  185. y = [item[1] for item in tx_signals]
  186. self.tx_line.set_data(x, y)
  187. self.ax_tx.set_xlim(1, len(tx_signals))
  188. self.ax_tx.set_ylim(max(0, min(y) - 0.05), max(y) + 0.05)
  189. # 获取中间100个接收信号
  190. rx_signals = self.get_middle_signals(self.rx_signal_queue)
  191. if rx_signals:
  192. # 调整X轴为相对序号 (1-100)
  193. x = list(range(1, len(rx_signals) + 1))
  194. y = [item[1] for item in rx_signals]
  195. self.rx_line.set_data(x, y)
  196. self.ax_rx.set_xlim(1, len(rx_signals))
  197. self.ax_rx.set_ylim(max(0, min(y) - 0.05), max(y) + 0.05)
  198. # 更新标题
  199. self.fig.suptitle(f"USRP信号强度监控 - 总发送: {self.total_tx_signals}, 总接收: {self.total_rx_signals}")
  200. except Exception as e:
  201. print(f"绘图更新错误: {e}")
  202. return self.tx_line, self.rx_line
  203. def run(self):
  204. """运行收发器"""
  205. try:
  206. self.connect()
  207. self.setup_streams()
  208. # 创建并启动线程
  209. tx_thread = Thread(target=self.tx_worker)
  210. rx_thread = Thread(target=self.rx_worker)
  211. tx_thread.start()
  212. rx_thread.start()
  213. # 启动实时绘图
  214. ani = FuncAnimation(self.fig, self.update_plot, init_func=self.init_animation,
  215. interval=200, blit=True)
  216. plt.show()
  217. # 等待线程结束
  218. self.stop_event.set()
  219. tx_thread.join()
  220. rx_thread.join()
  221. print(f"完成收发 - 发送: {self.total_tx_signals}, 接收: {self.total_rx_signals}")
  222. except KeyboardInterrupt:
  223. print("用户中断,正在停止...")
  224. self.stop_event.set()
  225. finally:
  226. # 清理资源
  227. if self.usrp:
  228. self.usrp = None
  229. print("资源已清理")
  230. def main():
  231. parser = argparse.ArgumentParser(description='USRP多线程信号收发器')
  232. parser.add_argument('--ip', default='192.168.10.2', help='USRP IP地址')
  233. parser.add_argument('--rate', type=float, default=5e6, help='采样率 (Hz)')
  234. parser.add_argument('--freq', type=float, default=6e9, help='中心频率 (Hz)')
  235. parser.add_argument('--gain', type=float, default=50, help='增益 (dB)')
  236. parser.add_argument('--channel', type=int, default=0, help='通道号')
  237. args = parser.parse_args()
  238. transceiver = USRPTransceiver(args)
  239. transceiver.run()
  240. if __name__ == "__main__":
  241. main()