1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import threading
- import nacos
- from v2.nacos import NacosNamingService, ClientConfigBuilder, GRPCConfig, Instance, SubscribeServiceParam, \
- RegisterInstanceParam, DeregisterInstanceParam, BatchRegisterInstanceParam, GetServiceParam, ListServiceParam, \
- ListInstanceParam, NacosConfigService, ConfigParam
- import asyncio
- NACOS_NAMESPACE_ID = 'fire'
- NACOS_USERNAME = 'nacos'
- NACOS_PASSWORD = 'nacos'
- NACOS_SERVER_ADDR = '47.92.123.216:8848'
- SERVICE_NAME = 'SDR'
- SERVICE_IP = '10.198.104.187'
- SERVICE_PORT = 5000
- class NacosConfig:
- def __init__(self):
- asyncio.run(self.init_nacos_config())
- async def init_nacos_config(self):
- self.client_config = self.create_config()
- self.naming_client = await self.create_naming_client(self.client_config)
- await self.register_instance(self.naming_client)
- # await self.list_instance(self.naming_client)
- self.client = nacos.NacosClient(NACOS_SERVER_ADDR, namespace=NACOS_NAMESPACE_ID)
- thread = threading.Thread(target=self.send_heartbeat_in_thread, args=(self.client, SERVICE_NAME, SERVICE_IP, SERVICE_PORT))
- thread.start()
- # 客户端配置
- def create_config(self):
- client_config = (ClientConfigBuilder()
- # .username(NACOS_USERNAME)
- # .password(NACOS_PASSWORD)
- .namespace_id(NACOS_NAMESPACE_ID)
- .server_address(NACOS_SERVER_ADDR)
- .log_level('DEBUG')
- .grpc_config(GRPCConfig(grpc_timeout=5000))
- .build())
- return client_config
- # 创建命名客户端
- async def create_naming_client(self, client_config):
- naming_client = await NacosNamingService.create_naming_service(client_config)
- return naming_client
- # 注册服务
- async def register_instance(self, client):
- response = await client.register_instance(
- request=RegisterInstanceParam(service_name= SERVICE_NAME, group_name='DEFAULT_GROUP', ip=SERVICE_IP,
- port=SERVICE_PORT,
- enabled=True,
- healthy=True, ephemeral=False))
- if response:
- print('------nacos register instance success!------')
- else:
- print('------nacos register instance fail!------')
- async def list_instance(self, client):
- service_list = await client.list_services(ListServiceParam())
- for service in service_list.services:
- print(service)
- # 异步发送心跳
- def send_heartbeat_in_thread(self,client, service_name, ip, port):
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
- # loop.run_until_complete(self.send_heartbeat(client, service_name, ip, port))
- def run():
- try:
- asyncio.run(self.send_heartbeat(client, service_name, ip, port))
- except KeyboardInterrupt:
- print("Heartbeat task interrupted")
- async def send_heartbeat(self,client, service_name, ip, port):
- while True:
- try:
- response = await asyncio.to_thread(lambda: client.send_heartbeat(service_name, ip, port))
- print(response)
- except Exception as e:
- print(f"Failed to send heartbeat: {str(e)}")
- await asyncio.sleep(10)
|