|
@@ -0,0 +1,199 @@
|
|
|
+import json
|
|
|
+import uhd
|
|
|
+import numpy as np
|
|
|
+import threading
|
|
|
+
|
|
|
+from dto.response_dto import ResponseDTO
|
|
|
+from model.surveillance_radar import SurveillanceRadar
|
|
|
+from model.jammer_radar import JammerRadar
|
|
|
+from algo.jamming_signal_algo import JammingSignalAlgo
|
|
|
+from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
|
|
|
+
|
|
|
+# 定义一组通道常量
|
|
|
+CHANNEL_1 = 0
|
|
|
+CHANNEL_2 = 1
|
|
|
+CHANNEL_3 = 2
|
|
|
+CHANNEL_4 = 3
|
|
|
+
|
|
|
+# 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量
|
|
|
+JAMMING_POLICY = {
|
|
|
+ "噪声调频", "噪声调幅", "噪声直放", "速度多假目标", "距离多假目标"
|
|
|
+}
|
|
|
+ANTI_JAMMING_POLICY = {
|
|
|
+ "频率捷变", "波形捷变", # 作用于发射端
|
|
|
+ "自适应极化滤波" # 作用于接收端
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+class Service:
|
|
|
+ usrp = None # 静态类变量
|
|
|
+ status = 0 # 静态类变量
|
|
|
+ surveillance_radar = None # 侦查雷达实例
|
|
|
+ jammer_radar = None # 干扰雷达实例
|
|
|
+ _rlock = threading.RLock() # 可重入锁
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def initialize_usrp():
|
|
|
+ with Service._rlock:
|
|
|
+ Service.get_sdr_status()
|
|
|
+ if Service.status == 1:
|
|
|
+ print('USRP设备已经初始化')
|
|
|
+ return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
|
|
|
+ try:
|
|
|
+ Service.usrp = uhd.usrp.MultiUSRP()
|
|
|
+ print('------SDR Devices initialize success!------')
|
|
|
+ Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx=CHANNEL_1, tx=CHANNEL_1)
|
|
|
+ Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2)
|
|
|
+ Service.status = 1
|
|
|
+ return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
|
|
|
+ except Exception as e:
|
|
|
+ print('SDR设备异常', e)
|
|
|
+ Service.status = 0 # 初始化失败,状态置为0
|
|
|
+ return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_sdr_status():
|
|
|
+ with Service._rlock:
|
|
|
+ if Service.status == 0:
|
|
|
+ return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json()
|
|
|
+ try:
|
|
|
+ samples = Service.usrp.recv_num_samps(1, 100e6, 1e6, [0], 50)
|
|
|
+ if samples is None:
|
|
|
+ Service.status = 0 # 获取状态失败,状态置为0
|
|
|
+ else:
|
|
|
+ Service.status = 1
|
|
|
+ return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
|
|
|
+ except Exception as e:
|
|
|
+ print('SDR设备异常', e)
|
|
|
+ Service.status = 0 # 获取状态失败,状态置为0
|
|
|
+ Service.usrp = None # 重置USRP对象
|
|
|
+ return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
|
|
|
+
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def data(payload):
|
|
|
+ # 使用已初始化的雷达实例
|
|
|
+ surveillance_radar = Service.surveillance_radar
|
|
|
+ jammer_radar = Service.jammer_radar
|
|
|
+
|
|
|
+ jamming_policy = payload.get('jamming_policy')
|
|
|
+ anti_jamming_policy = payload.get('anti_jamming_policy')
|
|
|
+ # 判断策略是否合法
|
|
|
+ if jamming_policy not in JAMMING_POLICY or anti_jamming_policy not in ANTI_JAMMING_POLICY:
|
|
|
+ return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json()
|
|
|
+ # 打印对应策略
|
|
|
+ print(' jamming_policy:', jamming_policy)
|
|
|
+ print(' anti_jamming_policy:', anti_jamming_policy)
|
|
|
+ # 根据策略选择算法
|
|
|
+ jam_algorithm = Service._get_jamming_algorithm_method(jamming_policy)
|
|
|
+ anti_jam_algorithm = Service._get_anti_jamming_algorithm(anti_jamming_policy, 'rx')
|
|
|
+
|
|
|
+ # 1. 应用发射端抗干扰算法(波形/频率捷变)
|
|
|
+ tx_original = np.random.randn(100)
|
|
|
+ if anti_jamming_policy in {"波形捷变", "频率捷变"}:
|
|
|
+ tx_processed = surveillance_radar.process_transmit_signal(
|
|
|
+ tx_signal=tx_original,
|
|
|
+ algorithm=Service._get_tx_algorithm_method(anti_jamming_policy),
|
|
|
+ sample_rate=1e6
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ tx_processed = tx_original
|
|
|
+
|
|
|
+ # 2. 发送经过处理的发射信号
|
|
|
+ tx_signal = surveillance_radar.send_signal(
|
|
|
+ tx_signal=tx_processed,
|
|
|
+ duration=0.1,
|
|
|
+ center_freq=100e6,
|
|
|
+ sample_rate=1e6,
|
|
|
+ gain=20
|
|
|
+ )
|
|
|
+
|
|
|
+ # 3. 干扰雷达接收信号并生成干扰
|
|
|
+ jammed_signal = jammer_radar.execute_jamming(
|
|
|
+ rx_signal=tx_signal,
|
|
|
+ algorithm=jam_algorithm,
|
|
|
+ bandwidth=100e6,
|
|
|
+ sample_rate=1e6
|
|
|
+ ) if jam_algorithm else tx_signal
|
|
|
+
|
|
|
+ # 接收端抗干扰处理(仅限接收端策略)
|
|
|
+ processed_signal = jammed_signal
|
|
|
+ if anti_jam_algorithm:
|
|
|
+ processed_signal = surveillance_radar.execute_anti_jamming(
|
|
|
+ rx_signal=jammed_signal,
|
|
|
+ algorithm=anti_jam_algorithm,
|
|
|
+ sample_rate=1e6,
|
|
|
+ polarization_params={'angle': 45, 'ellipticity': 0.5}
|
|
|
+ )
|
|
|
+
|
|
|
+ # 类型检查并转换复数信号为实数
|
|
|
+ if np.iscomplexobj(processed_signal):
|
|
|
+ processed_signal = np.real(processed_signal)
|
|
|
+
|
|
|
+ # 返回结果
|
|
|
+ return ResponseDTO.SUCCESS({"jamming_signal": np.real(jammed_signal).tolist(),
|
|
|
+ "anti_jamming_signal": np.real(processed_signal).tolist(),
|
|
|
+ "status": Service.status}).to_json()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def send():
|
|
|
+ try:
|
|
|
+ # 设置中心频率、采样率和增益
|
|
|
+ center_freq = 100e6 # 2.4 GHz
|
|
|
+ sample_rate = 1e6 # 1 MS/s
|
|
|
+ duration = 10 # 以秒为单位
|
|
|
+ gain = 20 # [dB] 建议一开始设置小一点,按照实际情况调整
|
|
|
+ # 生成发送信号
|
|
|
+ num_samples = 100
|
|
|
+ tx_signal = np.random.randn(num_samples) + 0.1j * np.random.randn(num_samples) # 修复部分
|
|
|
+
|
|
|
+ # 发送信号
|
|
|
+ Service.usrp.send_waveform(tx_signal, duration, center_freq, sample_rate, [0], gain)
|
|
|
+
|
|
|
+ # 接收信号
|
|
|
+ rx_signal = Service.usrp.recv_num_samps(num_samples, center_freq, sample_rate)
|
|
|
+ print('信号已发送:')
|
|
|
+ print(rx_signal)
|
|
|
+ print('信号已接收')
|
|
|
+ except Exception as e:
|
|
|
+ print('发送或接收信号异常', e)
|
|
|
+ Service.status = 0 # 发送或接收信号失败,状态置为0
|
|
|
+ Service.usrp = None # 重置USRP对象
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _get_jamming_algorithm_method(policy: str) -> callable:
|
|
|
+ # 干扰算法映射
|
|
|
+ jam_algorithm_map = {
|
|
|
+ "噪声调频": JammingSignalAlgo.frequency_modulation,
|
|
|
+ "噪声调幅": JammingSignalAlgo.amplitude_modulation,
|
|
|
+ "噪声直放": JammingSignalAlgo.repeater_jamming,
|
|
|
+ "速度多假目标": JammingSignalAlgo.velocity_deception,
|
|
|
+ "距离多假目标": JammingSignalAlgo.range_deception
|
|
|
+ }
|
|
|
+ return jam_algorithm_map.get(policy)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _get_anti_jamming_algorithm(policy: str, algorithm_type: str) -> callable:
|
|
|
+ """
|
|
|
+ 统一抗干扰策略路由方法
|
|
|
+ :param policy: 策略名称
|
|
|
+ :param algorithm_type: 算法类型['tx'发射端/'rx'接收端]
|
|
|
+ """
|
|
|
+ tx_algorithms = {
|
|
|
+ "波形捷变": AntiJammingSignalAlgo.adaptive_filter,
|
|
|
+ "频率捷变": AntiJammingSignalAlgo.time_frequency_filter
|
|
|
+ }
|
|
|
+ rx_algorithms = {
|
|
|
+ "自适应极化滤波": AntiJammingSignalAlgo.polarization_filter
|
|
|
+ }
|
|
|
+
|
|
|
+ if algorithm_type == 'tx':
|
|
|
+ return tx_algorithms.get(policy)
|
|
|
+ elif algorithm_type == 'rx':
|
|
|
+ return None if policy in {"波形捷变", "频率捷变"} else rx_algorithms.get(policy)
|
|
|
+
|
|
|
+
|
|
|
+# main方法
|
|
|
+if __name__ == '__main__':
|
|
|
+ Service.initialize_usrp()
|
|
|
+ Service.data({"jamming_policy": "噪声调频", "anti_jamming_policy": "频率捷变"})
|