123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- import json
- import uhd
- import numpy as np
- import threading
- from dto.response_dto import ResponseDTO
- from model.surveillance_radar import SurveillanceRadar
- from model.jammer_radar import JammerRadar
- from algo.jamming_signal_algo import JammingSignalAlgo
- from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
- # 定义一组通道常量
- CHANNEL_1 = 0
- CHANNEL_2 = 1
- CHANNEL_3 = 2
- CHANNEL_4 = 3
- # 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量
- JAMMING_POLICY = {
- "噪声调频", "噪声调幅", "噪声直放", "速度多假目标", "距离多假目标"
- }
- ANTI_JAMMING_POLICY = {
- "频率捷变", "波形捷变", # 作用于发射端
- "自适应极化滤波" # 作用于接收端
- }
- class Service:
- usrp = None # 静态类变量
- status = 0 # 静态类变量
- surveillance_radar = None # 侦查雷达实例
- jammer_radar = None # 干扰雷达实例
- _rlock = threading.RLock() # 可重入锁
- @staticmethod
- def initialize_usrp():
- with Service._rlock:
- Service.get_sdr_status()
- if Service.status == 1:
- print('USRP设备已经初始化')
- return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- try:
- Service.usrp = uhd.usrp.MultiUSRP()
- print('------SDR Devices initialize success!------')
- Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx=CHANNEL_1, tx=CHANNEL_1)
- Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2)
- Service.status = 1
- return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- except Exception as e:
- print('SDR设备异常', e)
- Service.status = 0 # 初始化失败,状态置为0
- return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
- @staticmethod
- def get_sdr_status():
- with Service._rlock:
- if Service.status == 0:
- return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json()
- try:
- samples = Service.usrp.recv_num_samps(1, 100e6, 1e6, [0], 50)
- if samples is None:
- Service.status = 0 # 获取状态失败,状态置为0
- else:
- Service.status = 1
- return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- except Exception as e:
- print('SDR设备异常', e)
- Service.status = 0 # 获取状态失败,状态置为0
- Service.usrp = None # 重置USRP对象
- return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
- @staticmethod
- def data(payload):
- # 使用已初始化的雷达实例
- surveillance_radar = Service.surveillance_radar
- jammer_radar = Service.jammer_radar
-
- jamming_policy = payload.get('jamming_policy')
- anti_jamming_policy = payload.get('anti_jamming_policy')
- # 判断策略是否合法
- if jamming_policy not in JAMMING_POLICY or anti_jamming_policy not in ANTI_JAMMING_POLICY:
- return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json()
- # 打印对应策略
- print(' jamming_policy:', jamming_policy)
- print(' anti_jamming_policy:', anti_jamming_policy)
- # 根据策略选择算法
- jam_algorithm = Service._get_algorithm_mapping(jamming_policy)
- anti_jam_algorithm = Service._get_algorithm_mapping(anti_jamming_policy)
- # 1. 应用发射端抗干扰算法(波形/频率捷变)
- tx_original = np.random.randn(100)
- if anti_jamming_policy in {"波形捷变", "频率捷变"}:
- tx_processed = surveillance_radar.process_transmit_signal(
- tx_signal=tx_original,
- algorithm=Service._get_tx_algorithm(anti_jamming_policy),
- sample_rate=1e6
- )
- else:
- tx_processed = tx_original
-
- # 2. 发送经过处理的发射信号
- tx_signal = surveillance_radar.send_signal(
- tx_signal=tx_processed,
- duration=0.1,
- center_freq=100e6,
- sample_rate=1e6,
- gain=20
- )
-
- # 3. 干扰雷达接收信号并生成干扰
- jammed_signal = jammer_radar.execute_jamming(
- rx_signal=tx_signal,
- algorithm=jam_algorithm,
- bandwidth=100e6,
- sample_rate=1e6
- ) if jam_algorithm else tx_signal
-
- # 接收端抗干扰(频率捷变等)
- processed_signal = surveillance_radar.execute_anti_jamming(
- rx_signal=jammed_signal,
- algorithm=anti_jam_algorithm,
- sample_rate=1e6
- ) if anti_jam_algorithm else jammed_signal
-
- # 类型检查并转换复数信号为实数
- if np.iscomplexobj(processed_signal):
- processed_signal = np.real(processed_signal)
-
- # 返回结果
- return ResponseDTO.SUCCESS({"jamming_signal": np.real(jammed_signal).tolist(),
- "anti_jamming_signal": np.real(processed_signal).tolist(),
- "status": Service.status}).to_json()
- @staticmethod
- def send():
- try:
- # 设置中心频率、采样率和增益
- center_freq = 100e6 # 2.4 GHz
- sample_rate = 1e6 # 1 MS/s
- duration = 10 # 以秒为单位
- gain = 20 # [dB] 建议一开始设置小一点,按照实际情况调整
- # 生成发送信号
- num_samples = 100
- tx_signal = np.random.randn(num_samples) + 0.1j * np.random.randn(num_samples) # 修复部分
- # 发送信号
- Service.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [0], gain)
- # 接收信号
- rx_signal = Service.usrp.recv_num_samps(num_samples, center_freq, sample_rate)
- print('信号已发送:')
- print(rx_signal)
- print('信号已接收')
- except Exception as e:
- print('发送或接收信号异常', e)
- Service.status = 0 # 发送或接收信号失败,状态置为0
- Service.usrp = None # 重置USRP对象
- @staticmethod
- def _get_algorithm_mapping(policy: str) -> callable:
- # 接收端抗干扰算法映射(自动处理非发射端策略)
- rx_algorithm_map = {
- "自适应极化滤波": AntiJammingSignalAlgo.polarization_filter
- }
- # 排除已在发射端处理的策略
- tx_policies = {"波形捷变", "频率捷变"}
- return None if policy in tx_policies else rx_algorithm_map.get(policy)
- @staticmethod
- def _get_tx_algorithm(policy: str) -> callable:
- # 发射端抗干扰算法映射
- tx_algorithm_map = {
- "波形捷变": AntiJammingSignalAlgo.adaptive_filter,
- "频率捷变": AntiJammingSignalAlgo.time_frequency_filter
- }
- return tx_algorithm_map.get(policy)
- # main方法
- if __name__ == '__main__':
- Service.initialize_usrp()
- Service.data({"jamming_policy": "噪声调频", "anti_jamming_policy": "频率捷变"})
|