12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import threading
- from configparser import ConfigParser
- import nacos
- from v2.nacos import NacosNamingService, ClientConfigBuilder, GRPCConfig, Instance, SubscribeServiceParam, \
- RegisterInstanceParam, DeregisterInstanceParam, BatchRegisterInstanceParam, GetServiceParam, ListServiceParam, \
- ListInstanceParam, NacosConfigService, ConfigParam
- import asyncio
- config = ConfigParser()
- config.read('config.ini')
- NACOS_NAMESPACE_ID = 'fire'
- NACOS_USERNAME = 'nacos'
- NACOS_PASSWORD = 'nacos'
- NACOS_SERVER_ADDR = config.get('NACOS', 'NACOS_SERVER_ADDR')
- print("NACOS_SERVER_ADDR:", NACOS_SERVER_ADDR)
- SERVICE_NAME = 'SDR'
- SERVICE_IP = config.get('NACOS', 'SERVICE_IP')
- print("IP:", SERVICE_IP)
- SERVICE_PORT = 5000
- class NacosConfig:
- def __init__(self):
- asyncio.run(self.init_nacos_config())
- async def init_nacos_config(self):
- self.client_config = self.create_config()
- self.naming_client = await self.create_naming_client(self.client_config)
- await self.register_instance(self.naming_client)
- # await self.list_instance(self.naming_client)
- self.client = nacos.NacosClient(NACOS_SERVER_ADDR, namespace=NACOS_NAMESPACE_ID)
- thread = threading.Thread(target=self.send_heartbeat_in_thread, args=(self.client, SERVICE_NAME, SERVICE_IP, SERVICE_PORT))
- thread.start()
- # 客户端配置
- def create_config(self):
- client_config = (ClientConfigBuilder()
- # .username(NACOS_USERNAME)
- # .password(NACOS_PASSWORD)
- .namespace_id(NACOS_NAMESPACE_ID)
- .server_address(NACOS_SERVER_ADDR)
- .log_level('DEBUG')
- .grpc_config(GRPCConfig(grpc_timeout=5000))
- .build())
- return client_config
- # 创建命名客户端
- async def create_naming_client(self, client_config):
- naming_client = await NacosNamingService.create_naming_service(client_config)
- return naming_client
- # 注册服务
- async def register_instance(self, client):
- response = await client.register_instance(
- request=RegisterInstanceParam(service_name= SERVICE_NAME, group_name='DEFAULT_GROUP', ip=SERVICE_IP,
- port=SERVICE_PORT,
- enabled=True,
- healthy=True, ephemeral=False))
- if response:
- print('------nacos register instance success!------')
- else:
- print('------nacos register instance fail!------')
- async def list_instance(self, client):
- service_list = await client.list_services(ListServiceParam())
- for service in service_list.services:
- print(service)
- # 异步发送心跳
- def send_heartbeat_in_thread(self,client, service_name, ip, port):
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(self.send_heartbeat(client, service_name, ip, port))
- # def run():
- # try:
- # asyncio.run(self.send_heartbeat(client, service_name, ip, port))
- # except KeyboardInterrupt:
- # print("Heartbeat task interrupted")
- async def send_heartbeat(self,client, service_name, ip, port):
- while True:
- try:
- response = await asyncio.to_thread(lambda: client.send_heartbeat(service_name, ip, port))
- print(response)
- except Exception as e:
- print(f"Failed to send heartbeat: {str(e)}")
- await asyncio.sleep(10)
|