负载均衡算法

常见的负载均衡算法如下:

  1. RoundRobin(轮询)
  2. Weight-RoundRobin(加权轮询)
    • 不同的后端服务器可能机器的配置和当前系统的负载并不相同,因此它们的抗压能力也不相同。给配置高、负载低的机器配置更高的权重,而配置低、负载高的机器,给其分配较低的权重,降低其系统负载,加权轮询能很好地处理这一问题,并将请求顺序且按照权重分配到后端。
  3. Random(随机)
  4. Weight-Random(加权随机)
    • 通过系统的随机算法,根据后端服务器的列表随机选取其中的一台服务器进行访问
  5. 源地址哈希法
    • 源地址哈希的思想是根据获取客户端的 IP 地址,通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算,得到的结果便是客服端要访问服务器的序号。采用源地址哈希法进行负载均衡,同一 IP 地址的客户端,当后端服务器列表不变时,它每次都会映射到同一台后端服务器进行访问
  6. 最小连接数法
    • 最小连接数算法比较灵活和智能,由于后端服务器的配置不尽相同,对于请求的处理有快有慢,它是根据后端服务器当前的连接情况,动态地选取其中当前积压连接数最少的一台服务器来处理当前的请求,尽可能地提高后端服务的利用效率,将负责合理地分流到每一台服务器
  7. Consistent-Hash(一致性哈希算法)
    • 常见的是 Ketama 算法(虚拟节点),该算法是用来解决 cache 失效导致的缓存穿透的问题的,当然也可以适用于 gRPC 长连接的场景
  8. 自适应算法(P2C:多选二,二选一):即从可用节点列表中随机选择两个节点,计算它们的负载率,选择负载率较低的进行请求
    • 基于最小负载策略:该策略是 linkerd 的默认负载均衡器。当确定发送请求的位置时,linkerd 随机从负载均衡器池中选择两个副本,并选择两者中最少负载的副本。负载由每个副本的未完成请求数决定。该算法为单个副本提供了可管理的负载上限,与具有相似性能的其他算法相比,开销较少
    • 峰值 EWMA(预测)策略:该算法是上述的变体,同样在发送请求时仍然在两个副本之间进行选择。不一样的是,该算法需要保持观察到的延迟的动态平均值,并且使用它来对每个副本的未完成请求的数量进行加权。这种方法对延迟波动更敏感,并通过向较慢的后端发送更少的请求来允许他们恢复时间(可以通过调参来改变对请求延时的敏感度)

gRPC负载均衡

接着上篇,通过DNS进行服务发现获取服务器IP list之后,调用ccResolverWrapper.UpdateState更新状态:

// UpdateState is called by resolver implementations to report new state to gRPC
// which includes addresses and service config.
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
	...
	return ccr.cc.updateResolverStateAndUnlock(s, nil)
}

func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
	...

	var ret error
    // 应用最新的从dns发现的ServiceConfig
	if cc.dopts.disableServiceConfig {
		channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
		cc.maybeApplyDefaultServiceConfig()
	} else if s.ServiceConfig == nil {
		cc.maybeApplyDefaultServiceConfig()
	} else {
		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
			configSelector := iresolver.GetConfigSelector(s)
			if configSelector != nil {
				if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
					channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
				}
			} else {
				configSelector = &defaultConfigSelector{sc}
			}
			cc.applyServiceConfigAndBalancer(sc, configSelector)
		} else {
			...
		}
	}

    // ServiceConfig的负载均衡配置
	var balCfg serviceconfig.LoadBalancingConfig
	if cc.sc != nil && cc.sc.lbConfig != nil {
		balCfg = cc.sc.lbConfig
	}
    // 负载均衡器
	bw := cc.balancerWrapper
	cc.mu.Unlock()

    // 应用服务发现结果
	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
	if ret == nil {
		ret = uccsErr // prefer ErrBadResolver state since any other error is
		// currently meaningless to the caller.
	}
	return ret
}

// ccBalancerWrapper解耦ClientConn和Balancer,在updateClientConnState调用时才会创建Balancer
// 并且ccBalancerWrapper使用gracefulswitch.Balancer支持Balancer的优雅切换
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
	errCh := make(chan error)
	uccs := func(ctx context.Context) {
		defer close(errCh)
		if ctx.Err() != nil || ccb.balancer == nil {
			return
		}
		name := gracefulswitch.ChildName(ccs.BalancerConfig)
		if ccb.curBalancerName != name {
			ccb.curBalancerName = name
			channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
		}
        // 通知gracefulBalancer状态更新
		err := ccb.balancer.UpdateClientConnState(*ccs)
		if logger.V(2) && err != nil {
			logger.Infof("error from balancer.UpdateClientConnState: %v", err)
		}
		errCh <- err
	}
	onFailure := func() { close(errCh) }
	ccb.serializer.ScheduleOr(uccs, onFailure)
	return <-errCh
}

// gsb更新状态
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
    // 获取最新的balancer
	balToUpdate := gsb.latestBalancer()
	gsbCfg, ok := state.BalancerConfig.(*lbConfig)
	if ok {
		// Switch to the child in the config unless it is already active.
		if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
			var err error
            // 切换到新的balancer
			balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
			if err != nil {
				return fmt.Errorf("could not switch to new child balancer: %w", err)
			}
		}
		// Unwrap the child balancer's config.
		state.BalancerConfig = gsbCfg.childConfig
	}

	if balToUpdate == nil {
		return errBalancerClosed
	}

    // 通知真正的Balancer状态更新
	return balToUpdate.UpdateClientConnState(state)
}

// gsb优雅切换balancer
func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
	gsb.mu.Lock()
	if gsb.closed {
		gsb.mu.Unlock()
		return nil, errBalancerClosed
	}
	bw := &balancerWrapper{
		builder: builder,
		gsb:     gsb,
		lastState: balancer.State{
			ConnectivityState: connectivity.Connecting,
			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
		},
		subconns: make(map[balancer.SubConn]bool),
	}
	balToClose := gsb.balancerPending // nil if there is no pending balancer
	if gsb.balancerCurrent == nil {
		gsb.balancerCurrent = bw
	} else {
		gsb.balancerPending = bw
	}
	gsb.mu.Unlock()
    // 关闭旧的balancer
	balToClose.Close()
	// 创建新的balancer
	newBalancer := builder.Build(bw, gsb.bOpts)
	if newBalancer == nil {
		gsb.mu.Lock()
		if gsb.balancerPending != nil {
			gsb.balancerPending = nil
		} else {
			gsb.balancerCurrent = nil
		}
		gsb.mu.Unlock()
		return nil, balancer.ErrBadResolverState
	}

	bw.Balancer = newBalancer
	return bw, nil
}

// pick_first balancer状态更新
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
	...
    
	cfg, ok := state.BalancerConfig.(pfConfig)
    
	...

	// 展开endpoints/addrs
	var addrs []resolver.Address
    ...

    // balancer之前维护了子连接
    // 如果连接的地址在addrs中,则保持连接
    // 否则断开并用addrs重连
	if b.subConn != nil {
		b.cc.UpdateAddresses(b.subConn, addrs)
		return nil
	}

    // balancer之前没有维护子连接
    // 创建新的子连接
	var subConn balancer.SubConn
	subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
		StateListener: func(state balancer.SubConnState) {
            // 子连接状态更新,回调通知balancer
			b.updateSubConnState(subConn, state)
		},
	})
    ...
    
	b.subConn = subConn
	b.state = connectivity.Idle
    // 通知cc现在是Connecting状态
	b.cc.UpdateState(balancer.State{
		ConnectivityState: connectivity.Connecting,
		Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
	})
	b.subConn.Connect()
	return nil
}

可以看到在pick_first策略下,会将所有地址都试一遍,连上其中一个后,后续picker返回的都是那个连接,相当于不做负载均衡:

func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	return p.result, p.err
}

再对比下round_robin策略的负载均衡:

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
	...
    
    // 对每个地址都建立一个子连接
	for _, a := range s.ResolverState.Addresses {
		addrsSet.Set(a, nil)
		if _, ok := b.subConns.Get(a); !ok {
            ...
			sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
			if err != nil {
				logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
				continue
			}
			b.subConns.Set(a, sc)
			b.scStates[sc] = connectivity.Idle
			b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
			sc.Connect()
		}
	}
    // 关闭已经失效的子连接
	for _, a := range b.subConns.Keys() {
		sci, _ := b.subConns.Get(a)
		sc := sci.(balancer.SubConn)
		if _, ok := addrsSet.Get(a); !ok {
			sc.Shutdown()
			b.subConns.Delete(a)
		}
	}
    
    ...
	b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
	return nil
}

round_robin的picker:

type rrPicker struct {
	subConns []balancer.SubConn
    // 下一个子连接的下标
	next     uint32
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	subConnsLen := uint32(len(p.subConns))
	nextIndex := atomic.AddUint32(&p.next, 1)

    // 选择下一个子连接返回
	sc := p.subConns[nextIndex%subConnsLen]
	return balancer.PickResult{SubConn: sc}, nil
}

总结

结合gRPC阅读(2)和gRPC阅读(3)这两篇,我们可以理出一个大致的闭环:

  1. 初始化ClientConn时,根据配置初始化相应的resolver builder和balancer builder
  2. 由空闲状态第一次发起rpc调用时,创建resolver进行服务发现
  3. 服务发现结果通知给balancer优雅切换器进行balancer切换(如果需要切换的话),创建balancer
  4. 通知balancer服务发现结果的更新,由balancer创建新的连接并维护这些连接
  5. 客户端创建连接(transport)时,通过picker.Pick获取连接,以进一步地创建stream

图示如下:

image-20241121162839190