fastapi微服务系列(4)-之GRPC和consul的服务注册与发现
前言
没有服务注册与发现就没有微服务,既然我们的说的是微服务离不开肯定是需要做相关的服务注册和发现。
nacos竟然也支持做服务发现了!(Nacos支持DNS-Based和RPC-Based(Dubbo、gRPC)模式的服务发现。Nacos也提供实时健康检查,以防止将请求发往不健康的主机或服务实例。借助Nacos,您可以更容易地为您的服务实现断路器)
但是本节主要是先实践一下consul,后续使用到nacos作为配置中心的时候可以顺便实践一下它的服务注册和发现。
回到我们的consul之上,其实之前我在用go-micro做相关的小练习的时候,也是用到过。
这里为有助于我自己回忆哈哈!顺便继续唠叨多几句:
关于注册中心
一些概念理解:
是微服务架构中的通信中心
是服务和服务之间获取沟通信息的桥梁(主要获取的是服务的状态、地址、端口、等信息)
主要角色:
服务提供者
服务消费者
注册中心
服务注册的流程:
服务提供者启动---》将自身的网络地址等信息---》注册到注册中心---》注册中心记录
服务消费者获取---》注册中心把服务提供者的地址---》交给消费者进行使用
服务消费者获取---》服务者的信息---》进行请求服务提供接口信息---》进行消费
另外:各个服务与注册中心使用一定机制通信,做相关的健康检测
当服务需要下线或退出的时候,需要机进行服务注销
背景
微服务或分布式环境场景下,各个服务的独立部署和分布,组合层一个分布式应用,服务和服务之间相互关联调用,如何的而管理这些服务,服务注册和服务的发现就应用而生了!
理解
参考来源:www.iteye.com/blog/yangya…
segmentfault.com/a/119000000…
youzhixueyuan.com/registratio…
1)服务注册
######意思:将服务元信息(IP,端口号等信息)服务自动将信息上传至服务注册表,并通过心跳进行同步。
注册方式:1:客户端自注册 2:第三方注册
客户端自注册 :服务注册和注销原信息绑定再我们的服务逻辑代码上,服务启动时候提交相关的信息到注册中心,当服务下线时候,也告知服务中心,进行服务注销。
缺点:(1)需要代码逻辑配合,存在入侵性(2)服务运行期间需要和注册中心保持心跳互通。
第三方注册
由独立的服务 Registrar 负责注册与注销。服务启动后以某种方式通知独立的服务 Registrar ,Registrar 再提交到注册中心。
2)服务注册表
各个服务集群,维护了一个数据库,数据库存储的是可用服务的元信息、提供给服务发现和注销。
3)服务发现
######意思:当需要使用服务时,通过读取服务注册表获取可用的服务元信息,客户端可以通过此信息连接服务器。
服务发现的方式包括:客户端服务发现和服务端服务发现。
客户端发现
客户端负责向注册中心获取相应的 ip 与 port ,多种语言需要实现同一套逻辑,有点冗余的感觉。
服务端发现
由 API gateway 实现服务发现的功能。 #####(4)联系
服务注册机制将启动服务的信息上传至服务注册表,服务发现机制通过服务注册表实时获取可用服务的信息
常见的第三方服务注册中心
zookeeper
zookeeper 起源于 Hadoop ,它非常成熟、稳定,有比较多的大公司在使用一个高性能、分布式应用程序协调服务,用于名称服务、分布式锁定、共享资源同步和分布式配置管理。
etcd
etcd 是一个采用 HTTP 协议的健/值对存储系统,它是一个分布式和功能层次配置系统,可用于构建服务发现系统。其很容易部署、安装和使用,提供了可靠的数据持久化特性。它是安全的并且文档也十分齐全。它需要搭配一些第三方工具才可以提供服务发现功能。
consul
Consul 是强一致性的数据存储,使用 gossip 形成动态集群。它提供分级键/值存储方式,不仅可以存储数据,而且可以用于注册器件事各种任务,从发送数据改变通知到运行健康检查和自定义命令,具体如何取决于它们的输出。consul web 界面,用户可以查看所有的服务和节点、监控健康检查状态以及通过切换数据中心读取设置键/值对数据。
对于服务的注册和发现,其实我们的可选的挺多:
etcd
consul
zoomkeper
nacos
Eureka
Consul说明
1:为啥要选consul
有图形化
之前使用过
实践起来方便
2:Consul介绍
来自官网的介绍
Consul是一套开源的分布式服务发现和配置管理系统
Consul提供了微服务系统中的服务治理、配置中心、控制总线等功能
作用:基于GO语言开发,用于实现分布式系统的服务发现与配置的等管理。
特性:
Raft 算法 分布式一致性协议的算法方式。所谓的CP的特性。
服务发现: Consul提供了通过DNS或者HTTP接口的方式来注册服务和发现服务。一些外部的服务通过Consul很容易的找到它所依赖的服务。
健康检测: Consul的Client提供了健康检查的机制,可以通过用来避免流量被转发到有故障的服务上。
Key/Value存储: 应用程序可以根据自己的需要使用Consul提供的Key/Value存储。 Consul提供了简单易用的HTTP接口,结合其他工具可以实现动态配置、功能标记、领袖选举等等功能。,可以用于配置中心等。
多数据中心: Consul支持开箱即用的多数据中心. 这意味着用户不需要担心需要建立额外的抽象层让业务扩展到多个区域。
WEB UI 服务管理
Consul 的一些特性总结:
支持服务发现,提供 HTTP 和 DNS 两种发现方式
支持服务的健康检查,支持多种方式,HTTP、TCP、Docker、Shell脚本定制化,检查对象可以是service或者是节点
支持Mtls双向加密,为服务创建和分发证书
支持KV形式的键值数据库数据存贮:Key、Value的存储方式类似redis
支持暴露xDS协议,使用内置proxy或外部proxy实现流量控制
支持多数据中心
可视化界面
3:Consul 角色
DEV 启动模式(单节点的形式安装部署-开发模式)
用于本地开发环境下的方便的进行测试。如果是线上的环境的一般我们的是用集群的模式。Consul启动时候就是一个服务注册中心.
开发模式的下,一般我们的都是基于客户端的自注册的模式进行,意思就是服务启动的时候,把服务的信息都提交到的我们的注册中心上。
线上启动模式
-- client 客户端模式 无状态,作用是把外部请求过来的HTTP或DNS的接口请求转发到内部server服务端的集群。主要起到的作用是一个代理。-- server 服务端,保存配置信息,线上环境一般肯定是需要配置成高可用形式。官网的建议是每个数据中心的server数量推荐为3 或 5个 奇数个服务。
关于server 因为基于CP下的强一致性的问题,如果server过多的也增加server之间数据同步的时间。所以也不是越多越好。
当我们的集群有一半的挂了基本整个集群就不可用了!
4:Consul 工作模式
server、client server 模式:
server之间采取了一致性协议,推荐三或五个节点
server负责持久化数据存储
一个datacent的server之间选举出一个leader节点,leader会负责处理所有查询及事务,非leader节点接收到rpc请求会转给leader进行处理
agent(client) 模式:
负责对service的健康检查
通过gossip协议发现集群内的服务
作为消息层传递重要消息
5:consul 工作流
服务发现以及注册:
当服务的生产者producer 启动的时候,把自身的服务的元数据信息提交到Consul,Consul接受到注册信息后,会每隔10秒(默认值)想注册的服务Prodcucer进行健康检查。
服务调用
当我们的Consumerl消费者请求Prodcuer的是,会先从Consul获取到存贮Producter的数据(地址IP 和端口等)的临时表,从这个临时表里面任选一个Producr是的IP和Port,进行服务的请求
关于临时表,只会包含通过健康的检查的的服务,且会根据 默认的间隔时间进行更新同步。
6:结合Grpc的实践
6.1 consul安装
docker安装:
docker run -d -p 8500:8500 consul consul agent -data-dir=/consul/data -config-dir=/consul/config -dev -client=0.0.0.0 -bind=0.0.0.0复制代码
非docker方式:
6.1.1 环境
基于window 10 下的单节的安装。
因为consul本身的是基于GO开发的,具有比较好的跨平台的支持,容易部署。
6.1.2 下载
地址:www.consul.io/
6.1.3 单节点DEV模式启动
因为只是为了实践学习使用,生产环境下,一般部署的是主从模式的高可用!比较繁琐!我这里就不那么麻烦了!
启动控制台:
cd 到 我们的下载好的 文件的目录下
consul agent -dev -client=0.0.0.0复制代码
-dev 表示的是的启动模式
-client 表示的是谁可以向我们的注册中心注册或访问等
访问我们的Consul 是webUI 管理中心:http://localhost:8500
PS:;垃圾QQ浏览器!不要使用它了吧!ε=(´ο`*)))唉
界面上看到的consul服务就是他们自己的服务。
6.2 consul 使用
这里我区别之前我使用的GO-MICRO,这里我主要是对PY的实践和使用,所以借来的都是PY主题为主
6.2.1 依赖库的安装python-consul
pip install python-consul -i https://pypi.tuna.tsinghua.edu.cn/simple复制代码
6.2.2 注册和注销GRPC服务到cousul
完整示例代码:
#!/usr/bin/evn python # -*- coding: utf-8 -*- import sys from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal from typing import Any, Callable # 实现 proto文件中定义的 GreeterServicer的接口 class Greeter(hello_pb2_grpc.GreeterServicer): # 实现 proto 文件中定义的 rpc 调用 def SayHello(self, request, context): # 返回是我们的定义的响应体的对象 return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def SayHelloAgain(self, request, context): # 返回是我们的定义的响应体的对象 # # 设置异常状态码 # context.set_code(grpc.StatusCode.PERMISSION_DENIED) # context.set_details("你没有这个访问的权限") # raise context # 接收请求头的信息 print("接收到的请求头元数据信息", context.invocation_metadata()) # 设置响应报文头信息 context.set_trailing_metadata((('name', '223232'), ('sex', '23232'))) # 三种的压缩机制处理 # NoCompression = _compression.NoCompression # Deflate = _compression.Deflate # Gzip = _compression.Gzip # 局部的数据进行压缩 context.set_compression(grpc.Compression.Gzip) return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) from grpc_interceptor import ServerInterceptor from grpc_interceptor.exceptions import GrpcException from grpc_interceptor.exceptions import NotFound class MyUnaryServerInterceptor1(ServerInterceptor): def intercept( self, method: Callable, request: Any, context: grpc.ServicerContext, method_name: str, ) -> Any: rsep = None try: print("唉是滴哈是:开始----1", request) print("我是拦截器1号:开始----1-request.name", request.name) print("我是拦截器1号:开始----1-method_name", method_name) rsep = method(request, context) except GrpcException as e: context.set_code(e.status_code) context.set_details(e.details) raise finally: print("我是拦截器1号:结束----2", rsep) return rsep class MyUnaryServerInterceptor2(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): # 从里面获取 handler_call_details 包含有相关的请求头的信息 print("handler call continuation: ", type(continuation)) print("handler call invocation_metadata: ", type(handler_call_details)) print("handler call details: ", handler_call_details.invocation_metadata) print("我是拦截器11111111111111号:开始----1") respn = continuation(handler_call_details) print("我是拦截器1222222222222222号:结束----2", respn) return respn import consul def register(server_name, ip, port): c = consul.Consul(host='127.0.0.1', port=8500) # 连接consul 服务器,默认是127.0.0.1,可用host参数指定host print(f"GRPC开始注册服务{server_name}") check = consul.Check.tcp(ip, port, "10s") # 健康检查的ip,端口,检查时间 if c.agent.service.register(name=server_name, service_id="小钟同学",address=ip, port=port, check=check): # 注册服务部分 print(f"GRPC注册服务{server_name}成功") else: print(f"GRPC注册服务{server_name}失败") def unregister(service_id): c = consul.Consul() print(f"开始退出服务{service_id}") if c.agent.service.deregister(service_id=service_id): print(f"GRPC注销服务{service_id}成功") else: print(f"GRPC注销服务{service_id}失败") def serve(): compression = grpc.Compression.Gzip server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), compression=compression, interceptors=[MyUnaryServerInterceptor1(), MyUnaryServerInterceptor2()]) # 添加我们服务 hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) # 配置启动的端口 server.add_insecure_port('[::]:50051') # 开始启动的服务 register("cesifuwu", '127.0.0.1', 50051) server.start() def stop_serve(signum, frame): print("进程结束了!!!!") # sys.exit(0) unregister(service_id='小钟同学') raise KeyboardInterrupt # 注销相关的信号 # SIGINT 对应windos下的 ctrl+c的命令 # SIGTERM 对应的linux下的kill命令 signal.signal(signal.SIGINT, stop_serve) # signal.signal(signal.SIGTERM, stop_serve) # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行 server.wait_for_termination() if __name__ == '__main__': serve()复制代码
主要是使用第三方库进行注册:
此时我们的启动我们的服务观察我们的consul:
已经被注册成功。
观察我们服务ID信息,里面包括健康检测等。
此时我们的结束我们的服务进程,观察情况,因为我们的对我们的服务进程进行了信号事件的处理,再接收到事件信号时候,调用服务注销的方法:
def unregister(service_id): c = consul.Consul() print(f"开始退出服务{service_id}") if c.agent.service.deregister(service_id=service_id): print(f"GRPC注销服务{service_id}成功") else: print(f"GRPC注销服务{service_id}失败")复制代码
服务自动消息了:
6.2.3 GRPC随机端口分配注册
上面的示例中我们的端口是固定写死的方式来进行,通常线上的话,一般是我们的随机生成或通过命令行参数动态传入配置的方式。
目前我们的暂时处理随机端口生成的方式。命令行参数的接收的后续我们的可以是cickl
def get_open_port(): import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("",0)) s.listen(1) port = s.getsockname()[1] s.close() return port def serve(): compression = grpc.Compression.Gzip server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), compression=compression, interceptors=[MyUnaryServerInterceptor1(), MyUnaryServerInterceptor2()]) # 添加我们服务 hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) # 配置启动的端口 # 获取随机的端口: port = get_open_port() print("启动端口!!!!",port) server.add_insecure_port(f'[::]:{port}') # 开始启动的服务 register("cesifuwu", '127.0.0.1', port) server.start() def stop_serve(signum, frame): print("进程结束了!!!!") # sys.exit(0) unregister(service_id='小钟同学') raise KeyboardInterrupt # 注销相关的信号 # SIGINT 对应windos下的 ctrl+c的命令 # SIGTERM 对应的linux下的kill命令 signal.signal(signal.SIGINT, stop_serve) # signal.signal(signal.SIGTERM, stop_serve) # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行 server.wait_for_termination() if __name__ == '__main__': serve()复制代码
关注点图示:
6.2.4 服务发现之工具安装
首先我们的服务端启动多个服务并启动:
然后尝试从cousul获取服务信息:
import consul _consul = consul.Consul(host='127.0.0.1', port=8500) # 获取服务名列表 services_name =_consul.catalog.services()[1].keys() print('获取服务名列表',services_name) def get_address_port(): _, nodes = _consul.health.service(service='cesifuwu', passing=True) if len(nodes) == 0: raise Exception('service is empty.') for node in nodes: service = node.get('Service') print(service['Address'],service['Port']) get_address_port()复制代码
输出:
获取服务名列表 dict_keys(['cesifuwu', 'consul']) 127.0.0.1 63930 127.0.0.1 63953复制代码
从上面可以看得到我们的可以采取一种随机的方式获取到一个IP和端口就可以了! 但是!!!这种随机的算法是一种客户端随机的方式,客户端负载方式如果每次都是这样的随机的话,可能会有问题?
通常目前的我们线上环境一般都是采取客户端负载均衡的方式来处理分发,不会采取类似nginx又做一个中间件的方式做负载均衡,而是直接的由客户端进行获取分配就可以了
另外通常在服务发现中:为了保持连接的活动性、健康性和可用性,其实我们的gRPC使用了许多组件,其中最重要有:
名称解析器
负载平衡器
另外我们的还需要做相关的域名解析处理,可以使用工具dig,Dig 工具全称为域名信息搜索器(Domain Information Groper):
它能够显示详细的DNS查询过程,
对DNS故障进行诊断分析工具。
下载地址:
https://www.isc.org/bind/复制代码
下载图示位置:但是看你运气了!运气好能下!!!
辉哥友情的提供下载地址,也可以从这里下载对应的版本信息:
ftp://ftp.isc.org/isc/复制代码
另外还可以所以可以使用这位大佬提供的:
链接:https://pan.baidu.com/s/1t7ZG5TsF_Mx0HffzmXdKcg \ 提取码:itej复制代码
使用安装:
下载后解压到任意文件夹,然后以管理员权限运行“BINDInstall.exe”,进行安装
此处我们只勾选“tools only”,然后选择“install”安装
提示错误:
重来来,使用管理员的身份进行安装即可
使用dig打开cmd,输入如下命令:
简单的使用示例:
# 不指定dns服务器地址 dig www.baidu.com # 指定dns服务器地址 dig @114.114.114.114 www.baidu.com复制代码
6.2.5 服务发现使用
PS: consul开启的端口默认8600 是用于DNS的解析 8500是用于HTTP的服务
结合我们的consul的使用:
我们的上面的grpc启动启动了一个服务的名称是:
# 开始启动的服务 register("cesifuwu", '127.0.0.1', 50051)复制代码
那我们的执行解析的时候则需要把我们的服务
名称.service.consul 复制代码
进行解析.
解析我们的默认的服务: 如完整的示例:
dig @127.0.0.1 -p 8600 consul.service.consul SRV dig @127.0.0.1 -p 8600 cesifuwu.service.consul SRV复制代码
把我们的自己定义的服务进行解析完成后:
如何的使用类似的名称解析器来做客户端的负载均衡呐?这里我们使用名称先使用名称解释器处理下,首先需要先安装一个依赖包:
步骤1:安装依赖包
pip install dnspython -i https://pypi.tuna.tsinghua.edu.cn/simple复制代码
我这里安装的是最新的版本,部分的API名称有所变动:如查询的时候的:query--->resolve
步骤2:启动多服务并注册到我们的consul
步骤3:查询获取对应的IP和端口信息
from dns import resolver from dns.exception import DNSException # 创建一个consul dns查询的 resolver consul_resolver = resolver.Resolver() consul_resolver.port = 8600 consul_resolver.nameservers = ['127.0.0.1'] def get_ip_port(server_name): '''查询出可用的一个ip,和端口''' try: dnsanswer = consul_resolver.resolve(f'{server_name}.service.consul', "A") dnsanswer_srv = consul_resolver.resolve(f"{server_name}.service.consul", "SRV") except DNSException: return None, None return dnsanswer[0].address, dnsanswer_srv[0].port for i in range(10): print(get_ip_port('cesifuwu'))复制代码
输出:
步骤4:集成到我们的客户端去调用
#!/usr/bin/evn python # -*- coding: utf-8 -*- import grpc import hello_pb2 import hello_pb2_grpc # 连接consul服务,作为dns服务器 class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客户端的拦截器1:---开始1") resp = continuation(client_call_details, request) print("客户端的拦截器1:---结束2", resp) return resp class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): print("客户端的拦截器2:---开始1") resp = continuation(client_call_details, request) print("客户端的拦截器2:---结束2", resp) return resp from dns import resolver from dns.exception import DNSException # 创建一个consul dns查询的 resolver consul_resolver = resolver.Resolver() consul_resolver.port = 8600 consul_resolver.nameservers = ['127.0.0.1'] def get_ip_port(server_name): '''查询出可用的一个ip,和端口''' try: dnsanswer = consul_resolver.resolve(f'{server_name}.service.consul', "A") dnsanswer_srv = consul_resolver.resolve(f"{server_name}.service.consul", "SRV") except DNSException: return None, None return dnsanswer[0].address, dnsanswer_srv[0].port def run(): cesifuwu = get_ip_port('cesifuwu') with grpc.insecure_channel(target=f'{cesifuwu[0]}:{cesifuwu[1]}',) as channel: # 通过通道服务一个服务intercept_channel interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2()) stub = hello_pb2_grpc.GreeterStub(interceptor_channel) # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象 try: reest_header = ( ('mesasge', '1010'), ('error', 'No Error') ) response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'), # 设置请求的超时处理 timeout=5, # 设置请求的头的信息 metadata=reest_header, ) print("SayHelloAgain函数调用结果的返回: " + response.message) print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata()) except grpc._channel._InactiveRpcError as e: print(e.code()) print(e.details()) if __name__ == '__main__': run()复制代码
步骤5:启动客户端调用服务端,查看情况:
可以分布到不同的服务商进行执行
但是遗留的问提是?DNS怎么做权重的负载均衡呢?还有如果做相关的其他负载均衡算法的呐?类似GO的那种!!!有哪位大佬可以指点迷津吗?出了这个DNS的方案之外,还有哪种可以又保持客户端的连接的可用性,又可以自定义其他负载均衡算法的吗? 还可以同时监听相关服务的注册变化的吗?
至此暂时初步完成了关于注册服务和发现的初步实践。关于遗留的问题,如有大佬知晓,可以告知我的吗?万分感激!
作者:小钟同学
链接:https://juejin.cn/post/7031761412309549064