123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- import json
- from math import e
- from re import S
- from matplotlib.colors import CenteredNorm
- import uhd
- import numpy as np
- import threading
- from dto.response_dto import ResponseDTO
- from model.surveillance_radar import SurveillanceRadar
- from model.jammer_radar import JammerRadar
- from algo.jamming_signal_algo import JammingSignalAlgo
- from algo.anti_jamming_signal_algo import AntiJammingSignalAlgo
- # 定义一组通道常量
- CHANNEL_1 = 0
- CHANNEL_2 = 1
- CHANNEL_3 = 2
- CHANNEL_4 = 3
- SAMPLING_RATE = 20e6
- CENTER_FREQ = 6e9
- GAIN = 50
- # 定义干扰、抗干扰策略集合,确保集合中的元素为合法的字符串常量
- from common.strategy_enum import JammingPolicy, AntiJammingPolicy
- class Service:
- usrp = None # 静态类变量
- status = 0 # 静态类变量
- surveillance_radar = None # 侦查雷达实例
- jammer_radar = None # 干扰雷达实例
- _rlock = threading.RLock() # 可重入锁
- @staticmethod
- def initialize_usrp():
- with Service._rlock:
- Service.get_sdr_status()
- if Service.status == 1:
- print('USRP设备已经初始化')
- return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- try:
- Service.usrp = uhd.usrp.MultiUSRP()
- print('------SDR Devices initialize success!------')
- #侦查雷达初始化
- Service.surveillance_radar = SurveillanceRadar(Service.usrp, rx_channel=CHANNEL_1, tx_channel=CHANNEL_1)
- #侦察类雷达启动
- Service.surveillance_radar.run()
- # 干扰雷达初始化
- Service.jammer_radar = JammerRadar(Service.usrp, rx=CHANNEL_2, tx=CHANNEL_2)
- Service.status = 1
- return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- except Exception as e:
- print('SDR设备异常', e)
- Service.usrp = uhd.usrp.MultiUSRP("type=x4xx,reset")
- Service.status = 0 # 初始化失败,状态置为0
- return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
-
- # @staticmethod
- # def get_sdr_status():
- # with Service._rlock:
- # if Service.status == 0:
- # return ResponseDTO.ERROR_MS_DATA('SDR设备异常',{"status": Service.status, "Error": "SDR设备未初始化"}).to_json()
- # try:
- # rx_buffer = np.zeros((500000,), dtype=np.complex64)
- # Service.surveillance_radar.rx_stream.recv(rx_buffer, uhd.types.RXMetadata())
- # Service.status = 1
- # return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- # except Exception as e:
- # print('SDR设备异常', e)
- # Service.status = 0 # 获取状态失败,状态置为0
- # Service.usrp = None # 重置USRP对象
- # return ResponseDTO.ERROR_MS_DATA('SDR设备异常', {"status": Service.status, "Error": str(e)}).to_json()
- @staticmethod
- def get_sdr_status():
- with Service._rlock:
- return ResponseDTO.SUCCESS({"status": Service.status}).to_json()
- @staticmethod
- def data(payload):
- with Service._rlock:
- if Service.status == 0:
- return ResponseDTO.ERROR_MS_DATA('SDR设备异常',
- {"status": Service.status, "Error": "SDR设备未初始化"}).to_json()
- # try:
- # 使用已初始化的雷达实例
- surveillance_radar = Service.surveillance_radar
- jammer_radar = Service.jammer_radar
- jamming_policy = payload.get('jamming_policy')
- anti_jamming_policy = payload.get('anti_jamming_policy')
- # 判断策略是否合法
- if not JammingPolicy.is_valid(jamming_policy) or not AntiJammingPolicy.is_valid(anti_jamming_policy):
- return ResponseDTO.ERROR_MS_DATA('策略不合法', {"status": Service.status, "Error": "策略不合法"}).to_json()
- # 打印对应策略
- print(' jamming_policy:', jamming_policy)
- print(' anti_jamming_policy:', anti_jamming_policy)
- # 根据策略选择算法
- jam_algorithm = Service._get_jamming_algorithm_method(jamming_policy)
- anti_jam_algorithm = Service._get_anti_jamming_algorithm(anti_jamming_policy)
- print(' jamming_algorithm:', jam_algorithm)
- print(' anti_jamming_algorithm:', anti_jam_algorithm)
- # 生成干扰信号
-
- jammed_signal, lower_bound, upper_bound = jammer_radar.execute_jamming(
- algorithm=jam_algorithm,
- bandwidth=CENTER_FREQ,
- sample_rate=SAMPLING_RATE,
- duration=0.1,
- gain=GAIN
- )
-
- # 抗干扰算法 、 干扰信号注入到侦察雷达
- surveillance_radar.inject(anti_jam_algorithm,jammed_signal)
- # 返回结果
- return ResponseDTO.SUCCESS({
- "jamming_signal": np.real(jammed_signal).tolist(),
- # "anti_jamming_signal": np.real(surveillance_radar.processed_signal).flatten().tolist(),
- "anti_jamming_signal": surveillance_radar.processed_signal.tolist(),
- "jamming_range": {
- "upper": upper_bound,
- "lower": lower_bound
- }
- }).to_json()
- # except Exception as e:
- # print('数据处理异常', e)
- # return ResponseDTO.ERROR_MS_DATA('数据处理异常', {"status": Service.status, "Error": str(e)}).to_json()
- @staticmethod
- def _get_jamming_algorithm_method(policy: str) -> callable:
- # 干扰算法映射
- jam_algorithm_map = {
- JammingPolicy.FREQUENCY_MODULATION.value: JammingSignalAlgo.frequency_modulation,
- JammingPolicy.AMPLITUDE_MODULATION.value: JammingSignalAlgo.amplitude_modulation,
- JammingPolicy.REPEATER.value: JammingSignalAlgo.repeater_jamming,
- JammingPolicy.VELOCITY_DECEPTION.value: JammingSignalAlgo.velocity_deception,
- JammingPolicy.RANGE_DECEPTION.value: JammingSignalAlgo.range_deception
- }
- return jam_algorithm_map.get(policy)
- @staticmethod
- def _get_anti_jamming_algorithm(policy: str) -> callable:
- """
- 统一抗干扰策略路由方法
- :param policy: 策略名称
- """
- anti_jam_algorithm_map = {
- AntiJammingPolicy.WAVEFORM_AGILITY.value: AntiJammingSignalAlgo.waveform_agility,
- AntiJammingPolicy.FREQUENCY_AGILITY.value: AntiJammingSignalAlgo.frequency_agility,
- AntiJammingPolicy.POLARIZATION_FILTER.value: AntiJammingSignalAlgo.polarization_filter
- }
-
- return anti_jam_algorithm_map.get(policy)
|