nacos.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import threading
  2. from configparser import ConfigParser
  3. import nacos
  4. from v2.nacos import NacosNamingService, ClientConfigBuilder, GRPCConfig, Instance, SubscribeServiceParam, \
  5. RegisterInstanceParam, DeregisterInstanceParam, BatchRegisterInstanceParam, GetServiceParam, ListServiceParam, \
  6. ListInstanceParam, NacosConfigService, ConfigParam
  7. import asyncio
  8. config = ConfigParser()
  9. config.read('config.ini')
  10. NACOS_NAMESPACE_ID = 'fire'
  11. NACOS_USERNAME = 'nacos'
  12. NACOS_PASSWORD = 'nacos'
  13. NACOS_SERVER_ADDR = config.get('NACOS', 'NACOS_SERVER_ADDR')
  14. print("NACOS_SERVER_ADDR:", NACOS_SERVER_ADDR)
  15. SERVICE_NAME = 'SDR'
  16. SERVICE_IP = config.get('NACOS', 'SERVICE_IP')
  17. print("IP:", SERVICE_IP)
  18. SERVICE_PORT = 5000
  19. class NacosConfig:
  20. def __init__(self):
  21. asyncio.run(self.init_nacos_config())
  22. async def init_nacos_config(self):
  23. self.client_config = self.create_config()
  24. self.naming_client = await self.create_naming_client(self.client_config)
  25. await self.register_instance(self.naming_client)
  26. # await self.list_instance(self.naming_client)
  27. self.client = nacos.NacosClient(NACOS_SERVER_ADDR, namespace=NACOS_NAMESPACE_ID)
  28. thread = threading.Thread(target=self.send_heartbeat_in_thread, args=(self.client, SERVICE_NAME, SERVICE_IP, SERVICE_PORT))
  29. thread.start()
  30. # 客户端配置
  31. def create_config(self):
  32. client_config = (ClientConfigBuilder()
  33. # .username(NACOS_USERNAME)
  34. # .password(NACOS_PASSWORD)
  35. .namespace_id(NACOS_NAMESPACE_ID)
  36. .server_address(NACOS_SERVER_ADDR)
  37. .log_level('DEBUG')
  38. .grpc_config(GRPCConfig(grpc_timeout=5000))
  39. .build())
  40. return client_config
  41. # 创建命名客户端
  42. async def create_naming_client(self, client_config):
  43. naming_client = await NacosNamingService.create_naming_service(client_config)
  44. return naming_client
  45. # 注册服务
  46. async def register_instance(self, client):
  47. response = await client.register_instance(
  48. request=RegisterInstanceParam(service_name= SERVICE_NAME, group_name='DEFAULT_GROUP', ip=SERVICE_IP,
  49. port=SERVICE_PORT,
  50. enabled=True,
  51. healthy=True, ephemeral=False))
  52. if response:
  53. print('------nacos register instance success!------')
  54. else:
  55. print('------nacos register instance fail!------')
  56. async def list_instance(self, client):
  57. service_list = await client.list_services(ListServiceParam())
  58. for service in service_list.services:
  59. print(service)
  60. # 异步发送心跳
  61. def send_heartbeat_in_thread(self,client, service_name, ip, port):
  62. loop = asyncio.new_event_loop()
  63. asyncio.set_event_loop(loop)
  64. loop.run_until_complete(self.send_heartbeat(client, service_name, ip, port))
  65. # def run():
  66. # try:
  67. # asyncio.run(self.send_heartbeat(client, service_name, ip, port))
  68. # except KeyboardInterrupt:
  69. # print("Heartbeat task interrupted")
  70. async def send_heartbeat(self,client, service_name, ip, port):
  71. while True:
  72. try:
  73. response = await asyncio.to_thread(lambda: client.send_heartbeat(service_name, ip, port))
  74. # print(response)
  75. except Exception as e:
  76. print(f"Failed to send heartbeat: {str(e)}")
  77. await asyncio.sleep(10)