nacos_20250421151740.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import threading
  2. import nacos
  3. from v2.nacos import NacosNamingService, ClientConfigBuilder, GRPCConfig, Instance, SubscribeServiceParam, \
  4. RegisterInstanceParam, DeregisterInstanceParam, BatchRegisterInstanceParam, GetServiceParam, ListServiceParam, \
  5. ListInstanceParam, NacosConfigService, ConfigParam
  6. import asyncio
  7. NACOS_NAMESPACE_ID = 'fire'
  8. NACOS_USERNAME = 'nacos'
  9. NACOS_PASSWORD = 'nacos'
  10. NACOS_SERVER_ADDR = '47.92.123.216:8848'
  11. SERVICE_NAME = 'SDR'
  12. SERVICE_IP = '10.198.104.187'
  13. SERVICE_PORT = 5000
  14. class NacosConfig:
  15. def __init__(self):
  16. asyncio.run(self.init_nacos_config())
  17. async def init_nacos_config(self):
  18. self.client_config = self.create_config()
  19. self.naming_client = await self.create_naming_client(self.client_config)
  20. await self.register_instance(self.naming_client)
  21. # await self.list_instance(self.naming_client)
  22. self.client = nacos.NacosClient(NACOS_SERVER_ADDR, namespace=NACOS_NAMESPACE_ID)
  23. thread = threading.Thread(target=self.send_heartbeat_in_thread, args=(self.client, SERVICE_NAME, SERVICE_IP, SERVICE_PORT))
  24. thread.start()
  25. # 客户端配置
  26. def create_config(self):
  27. client_config = (ClientConfigBuilder()
  28. # .username(NACOS_USERNAME)
  29. # .password(NACOS_PASSWORD)
  30. .namespace_id(NACOS_NAMESPACE_ID)
  31. .server_address(NACOS_SERVER_ADDR)
  32. .log_level('DEBUG')
  33. .grpc_config(GRPCConfig(grpc_timeout=5000))
  34. .build())
  35. return client_config
  36. # 创建命名客户端
  37. async def create_naming_client(self, client_config):
  38. naming_client = await NacosNamingService.create_naming_service(client_config)
  39. return naming_client
  40. # 注册服务
  41. async def register_instance(self, client):
  42. response = await client.register_instance(
  43. request=RegisterInstanceParam(service_name= SERVICE_NAME, group_name='DEFAULT_GROUP', ip=SERVICE_IP,
  44. port=SERVICE_PORT,
  45. enabled=True,
  46. healthy=True, ephemeral=False))
  47. if response:
  48. print('------nacos register instance success!------')
  49. else:
  50. print('------nacos register instance fail!------')
  51. async def list_instance(self, client):
  52. service_list = await client.list_services(ListServiceParam())
  53. for service in service_list.services:
  54. print(service)
  55. # 异步发送心跳
  56. def send_heartbeat_in_thread(self,client, service_name, ip, port):
  57. # loop = asyncio.new_event_loop()
  58. # asyncio.set_event_loop(loop)
  59. # loop.run_until_complete(self.send_heartbeat(client, service_name, ip, port))
  60. def run():
  61. try:
  62. asyncio.run(self.send_heartbeat(client, service_name, ip, port))
  63. except KeyboardInterrupt:
  64. print("Heartbeat task interrupted")
  65. async def send_heartbeat(self,client, service_name, ip, port):
  66. while True:
  67. try:
  68. response = await asyncio.to_thread(lambda: client.send_heartbeat(service_name, ip, port))
  69. print(response)
  70. except Exception as e:
  71. print(f"Failed to send heartbeat: {str(e)}")
  72. await asyncio.sleep(10)