gRPC阅读(3)—— 服务发现

服务发现概述 平时用浏览器上过网都知道,输入一个网址比如google.com就能访问内容,背后是DNS帮我们将google.com解析成IP地址,最终浏览器才能基于TCP协议,从本地连接到这个服务提供商的IP地址。所以DNS属于服务发现的其中一种方式。 所以服务发现提供的就是通过自动化的方式帮助服务在网络中找到彼此,无需手动配置。 一个好的服务发现需要: 服务地址动态变化:服务的 IP 或端口可能因为容器化或自动扩展而频繁改变。 高可用:需要在服务实例宕机时快速感知并移除不健康的实例。 负载均衡:服务发现需要为调用方提供负载均衡能力,选择最佳的服务实例。 服务发现通常与负载均衡同时实现,分为两种方式: 客户端服务发现(如eureka、consul):在客户端做负载均衡,选择一个实例进行调用,优点是避免集中式LB可能存在的瓶颈,性能较好,但是每个客户端需要维护服务端列表,服务端这部分的负载可能变高。并且更新LB或其他相关组件的策略时需要所有客户端都一起更新,管理不方便。并且需要多语言支持 代理服务发现(如k8s+coreDNS、nginx+consul):客户端将请求发送到负载均衡器(如 API 网关),由负载均衡器查询服务注册中心并将请求转发给目标服务实例。 独立LB进程:LB与消费者在同一个主机中,但分别作为不同的进程,避免了需要多语言支持,以及LB的更新不需要调用方改代码。 服务发现的核心组件有:注册中心、服务提供者、客户端(服务消费者) 服务发现的关键功能有:服务注册、服务查询、健康检查、动态更新 gRPC服务发现 gRPC使用客户端服务发现,gRPC中称为名称解析(Name Resolution),默认情况下使用DNS-resolver。通过服务发现解析出IP列表后就通过LB组件进行负载均衡并建立连接。 下面基于target=localhost:50052这个服务端地址来进行分析,并且是默认的DNS作为resolver(不用官方例子的50051端口是因为被mac的launchd进程占用了)。 首先gRPC在创建cc(ClientConn)的时候,使用initParsedTargetAndResolverBuilder创建resolver.Builder。这一步决定的是采用什么服务发现机制,默认是DNS。 func (cc *ClientConn) initParsedTargetAndResolverBuilder() error { logger.Infof("original dial target is: %q", cc.target) // 尝试直接解析target并获取相应的resolver.Builder var rb resolver.Builder parsedTarget, err := parseTarget(cc.target) if err == nil { rb = cc.getResolver(parsedTarget.URL.Scheme) if rb != nil { cc.parsedTarget = parsedTarget cc.resolverBuilder = rb return nil } } // target没有指定schema(比如我们的localhost:50052是没有指定schema的)或者无法匹配schema对应的resolver.Builder // 那么使用默认的schema,即dns defScheme := cc.dopts.defaultScheme if internal.UserSetDefaultScheme { defScheme = resolver.GetDefaultScheme() } // 此处canonicalTarget为dns:///localhost:50052 // "//"与第三个"/"之间的是authority canonicalTarget := defScheme + ":///" + cc.target // 再次尝试target并获取相应的resolver.Builder,此处会拿到dns.dnsBuilder parsedTarget, err = parseTarget(canonicalTarget) if err != nil { return err } rb = cc.getResolver(parsedTarget.URL.Scheme) if rb == nil { return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme) } // 保存parsedTarget和resolverBuilder cc.parsedTarget = parsedTarget cc.resolverBuilder = rb return nil } 那么resolverBuilder在什么时候会Build一个resolver出来呢?在ide的帮助下,可以直接定位到这个函数中: ...

十一月 18, 2024 · by NOSAE

gRPC阅读(2)—— 客户端

启动客户端 客户端的启动也是三部曲: 初始化grpc.ClientConn 创建service对应的Client(比如codegen生成的GreeterClient) 发起rpc调用 第二步比较简单,只是把ClientConn作为GreeterClient的成员变量,重点分析建立连接和RPC调用 初始化ClientConn 初始化ClientConn做了很多准备工作,包括但不限于: 应用选项(DialOption) 构建拦截器调用链(Interceptor) 决定使用什么resolver(resolver.Builder) 检查传输层凭证,比如TLS(TransportCredentials) 解析自定义服务端配置(ServerConfig) … 但还有一些配置是在真正发起RPC调用的时候才会被设置和触发,比如重试限流器、RPC配置选择器、RPC负载均衡器等。 func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), } // 重试限流器 cc.retryThrottler.Store((*retryThrottler)(nil)) // 配置选择器,动态选择每个RPC的调用配置 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) // options ... // 确定使用哪个resolver(默认为dns) if err := cc.initParsedTargetAndResolverBuilder(); err != nil { return nil, err } // 内部使用的全局perTarget options for _, opt := range globalPerTargetDialOptions { opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts) } // 初始化拦截器调用链 chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) // 验证安全传输,如TLS if err := cc.validateTransportCredentials(); err != nil { return nil, err } // 解析以json格式指定的配置 // 如负载均衡配置、per-RPC方法超时等 if cc.dopts.defaultServiceConfigRawJSON != nil { scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts) if scpr.Err != nil { return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) } cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) } // keepalive对服务端探活 cc.mkp = cc.dopts.copts.KeepaliveParams // 获取authority,作为请求头中的:authority字段 if err = cc.initAuthority(); err != nil { return nil, err } // 注册channelz,用于监测grpc的运行 // 可通过http协议访问/grpc/channelz/v1查看grpc的状态 cc.channelzRegistration(target) channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget) channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority) // 连接状态管理器 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz) // 负载均衡器,动态选择每个RPC的子通道 cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) // stats cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers) cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. // idle状态管理 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) return cc, nil } 这么一套下来可以看到,初始化ClientConn的时候并没有建立连接,所以猜测是在第一次发起RPC调用的时候才去尝试建立连接。还有一种验证方法是,把服务端关闭,尝试NewClient,是不会返回错误的。 ...

十一月 15, 2024 · by NOSAE

gRPC阅读(1)—— 服务端

gRPC介绍 gRPC 是一种由 Google 开发的高性能远程过程调用(RPC)框架,适用于分布式系统间的通信。它基于 HTTP/2 进行传输,使用 Protocol Buffers 进行序列化,提供跨平台的兼容性。gRPC 的核心理念是让客户端像调用本地函数一样调用远程服务,简化服务间的调用流程。 通过编写与具体编程语言无关的 IDL (默认是 protobuf) 来定义 RPC 方法,gRPC 框架就会生成语言相关的客户端/服务端代码。 HTTP/2介绍 相比http1,具有更高的传输效率(多路复用:在同一个链连接上同时处理多个请求),更低的延迟(服务端推送,减少请求数量、简化header大小)、带宽利用率更高(头部压缩、数据流优先)、更安全(基于tls)。 http2具体特性有: 帧、消息、流:帧是小通信数据单元;消息由一个或多个帧组成。例如请求的消息和响应的消息;一个连接中包含多个流,每个流包含多个帧。帧通过流id进行标识属于哪个流 二进制分帧:每个消息由若干个帧组成,帧是最小传输单位,并且原来基于文本编码变成基于二进制,进一步减小帧大小 压缩header 多路复用:即在同一连接中的多个stream的传输互不影响 服务端推送 流量控制和资源优先级:流量控制以有效利用多路复用机制,确保只有接收者使用的数据会被传输。优先级机制可以确保重要的资源被优先传输。 启动服务端 通过官方的 helloworld 例子可以看到,服务端的启动分为三步: 创建gRPC的Server 将业务handler注册到Server 调用Server.Serve在端口上进行监听 第一步没什么好说的,注意下第二步注册进去的东西: // 注册进去的ServiceDesc var Greeter_ServiceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "helloworld/helloworld.proto", } // Method对应的handler func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(HelloRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(GreeterServer).SayHello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: Greeter_SayHello_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) } return interceptor(ctx, in, info, handler) } 前两步比较简单,再来看第三步的如何建立连接并进行处理。类似标准库http的ListenAndServe,本质上就是创建一个死循环等待有新的连接到来,然后开新的goroutine去处理这个连接上的读写事件: ...

十一月 14, 2024 · by NOSAE

golang sync包源码阅读

前言 sync包提供了常见的并发编程工具,比如最常见的Mutex、WaitGroup等。这些工具都非常简洁,几乎0学习成本。本篇将从源码角度简单看看这些工具的实现原理,以在未来有需求的时候,理解甚至是手动实现功能更强大的,更复杂的并发编程工具。 sync.Mutex sync.Mutex是golang中的互斥锁,但是注意它仅仅具有互斥访问的功能,没有其他功能,比如不支持可重入、不可自定义公平/非公平。 公平性 对于公平性,Mutex采取了综合两者的做法: normal mode(非公平模式,利于高效率运行):锁释放时,优先让同时新来尝试获取锁的线程获取到锁,而不是等待队列中的线程,运行成本低,只需数次CAS就能获取到锁。这是默认的模式 starvation mode(公平模式,避免高并发下线程饿死):锁释放时,优先让等待队列的线程获取到锁,而不是新来的线程。当等待队列队头线程等待超过1ms进入公平模式 如果当前为公平模式,那么当等待队列唯一的队头线程获取到锁,或者队头线程等待时间不足1ms,又会自动回到非公平模式。 可重入性 在开始源码之前,关于为什么golang的官方互斥锁不考虑支持可重入性我想简单讨论下。Russ Cox在讨论里核心观点在于:互斥锁的目的是保护程序的不变性(即invariant,关于什么是程序的不变性可以参考这篇)。因此当线程获取到互斥锁以及释放锁的那一刻,程序都应该是invariant的,在持有锁的期间,程序可以随便破坏invariant,只要保证释放锁的那一刻恢复了invariant即可。从这个观点来说,如果锁是可重入的,就会有这样的情况发生: func G() { mu.Lock() // 破坏invariant ... F() // 恢复invariant ... mu.Unlock() } func F() { mu.Lock() // 此时持有锁,程序应该是invariant的 // 继续执行下去可能会导致bug,因为F认为持有锁的那一刻程序是invariant的 // 但F不知道invariant已经被G破坏 ... mu.Unlock() } 也就是说,Russ Cox给互斥锁功能上的定义是保持程序的invariant,因此可重入锁的想法就是错的。但也有别的观点认为他对互斥锁的定义是错的,互斥锁本身就是为了避免多线程访问修改变量,invariant是开发者的责任,与你用不用互斥锁无关,互斥锁只是帮助你实现invariant的,并且出于编程上的方便,可重入锁可以make your life easier!况且很多语言其实都支持可重入锁。 另外关于invariant,本人也不认为是互斥锁的责任,比如在单线程的程序中,你需要维护(a==b)==true这个invariant,而且由于单线程你根本不需要锁,那么只要你会改变a或者改变b,就肯定会有某些时刻会出现invariant被破坏的情况,但这些情况一般是函数内部的瞬时发生的,而函数执行前后都是保持invariant的就没问题,可以看到与锁无关。因此锁只是个实现invariant的工具之一,它只需要关注底层并发的事情,不需要给他下“保持程序的invariant”这样的高层抽象定义。 后面我们会利用sync包现有的工具,尝试实现一个可重入锁。 源码 Mutex结构体: const ( mutexLocked = 1 << iota // 锁是否被线程持有 mutexWoken // 是否有被唤醒的线程正在等待获取锁 mutexStarving // 是否处于饥饿模式 ) type Mutex struct { state int32 // 低三位分别对应上述三个状态,高位记录等待队列的线程数 sema uint32 // 给底层同步原语使用的信号量 } Lock方法: func (m *Mutex) Lock() { // Fast path: 使用CAS快速加锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path: CAS加锁失败,说明锁被其他线程占有,当前应该被阻塞 m.lockSlow() } lockSlow方法: ...

十月 31, 2024 · by NOSAE

字节RPC框架kitex源码阅读(二)

Note 基于kitex@v0.11.3 开篇 在上篇字节RPC框架kitex源码阅读(一)中,简单过了一遍从创建服务、监听端口、建立连接&派发、退出清理的流程,对于代码生成的回调如何在kitex内部得到调用也有了初步的认知。 这篇是(一)的续篇,深入分析remote.Server如何基于与客户端建立的连接做交互,包括传输、解码、编码等。 remote.ServerTransHandler 我们知道server.Server主要构建调用链、调用用户定义的回调。与远程传输有关的remote.Server提供了简单的几个接口方法给server.Server使用,相当于server.Server只需要关心调用链要怎么消费封装好的数据,不用管传输如何建立、数据如何封装: ...

十月 23, 2024 · by NOSAE