深入理解Istio核心组件Pilot
深入理解Istio核心组件之Pilot - 姚灯灯! - 博客园 (cnblogs.com)
Istio Pilot代码深度解析-赵化冰的博客 | Zhaohuabing Blog
Istio作为当前服务网格(Service mesh)领域的事实标准,流量治理Traffic Management是最基础也最为重要的功能。本文将结合源码对Istio流量治理的实现主体——组件Pilot进行深入地分析。(本文参考的代码为位于Istio repo的master分支,commit为b8e30e0
)
架构分析
在应用从单体架构向微服务架构演进的过程中,微服务之间的服务发现、负载均衡、熔断、限流等流量治理需求是无法回避的问题。在Service Mesh出现之前,通常的做法是将此类公共的基础功能以SDK的形式嵌入业务代码中。这虽然不失为解决问题的一种方式,但这种强耦合的方案无疑会增加业务开发的难度,代码维护的成本,同时如果存在跨语言应用间的交互,对于多语言SDK的支持造成的臃肿低效也令人很难接受。
而Service Mesh的本质则是将此类通用功能沉淀至Proxy中,由Proxy接管服务的流量并对其进行治理,从而将服务于服务间的流量治转变为proxy与proxy之间的流量治理。Service Mesh对代码的零入侵使得开发人员能够更为专注于业务代码开发而无需再对底层流量治理功能做过多关注。
如果仅仅只是将应用域TCP、IP网络层之间的流量治理功能进行封装,那么Envoy和Linkerd1.0为代表的纯proxy已经足够了。而Istio所做的是在这一基础上,增加控制平面,从而允许用户在更高的抽象维度,以更灵活的方式对服务间的流量进行管理。同时Istio对于服务模型的抽象所带来的高度扩展性,也让其对于Kubernetes等多种平台的支持变的更为简单。
如上图所示,Pilot是Istio进行流量治理的核心组件,可以看到,其架构与Istio的设计理念是一致的。Pilot支持从Kubernetes、Consul等多种平台获取服务发现功能。同时支持用户通过VirtualService,DestinationRule等API制定服务间的流量治理规则。最后,Pilot将发现的服务一用户定义的服务间的调用规则进行融合并与底层Proxy的API进行适配后将规则进行下发。(底层的Proxy一般为Envoy并且将其抽象为Service Mesh控制平面与数据平面的标准接口:xDS,理论上任何实现了xDS协议的Proxy都能无缝接入Istio)Proxy负责对后端服务发出的流量进行劫持并依据Pilot下发的规则对流量进行处理。
首先我们来看一下Pilot在Istio中的功能定位,Pilot将服务信息和配置信息转换为xDS接口的标准数据,通过GRPC下发到数据面的Envoy。如果把Pilot看成一个处理数据的黑盒,则其有俩个输入,一个输出:
目前Pilot的输入包括俩部分数据来源
- 服务数据:来源与各个服务注册表(Service Register),例如Kubernetes中注册的Service,Consul Catalog中的服务等。
- 配置规则:各种配置规则,包括路由规则及流量管理规则等,通过Kubernetes CRD(Custom resource definition)形式定义并存储在Kubernetes中
pilot的输出为符合xDS接口的数据面配置数据,并通过GRPC Streaming接口将配置数据推送到数据面的Envoy中。
Pilot-Discovery架构
pilot-Discovery工作原理
Pilot-Discovery的入口函数为:pilot/cmd/pilot-discovery/main.go
中的main方法。main方法中创建了Pilot Server,Pilot Server中主要包含三部分逻辑:
- ConfigController:管理各种配置数据,包括用户创建的流量规则和策略
- ServiceController:获取Service Registry中的服务发现数据
DiscoveryService:主要包含以下逻辑
- 启动GRPCserver并接收来自Envoy端的连接请求
- 接受Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
- 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy
ConfigController
ConfigController用于管理各种配置数据,包括用户创建的流量规则策略。Istio目前支持三种类型的Config Controller
- Kubernetes:使用Kubernetes来作为配置数据的存储,该方式直接依附于Kubernetes强大的CRD机制来存储配置数据,简单方便,是Istio最开始使用的配置存储方案。
- MCP(Mesh Configuration Protocol:网格配置协议):使用了Kubernetes来存储配置数据导致了Istio和Kubernetes的耦合,限制了Istio在非Kubernetes环境下的运用。为了解耦合,Istio社区提出了MCP,MCP定义了一个像Istio控制面下发配置数据的标准协议,Istio Polit作为MCP协议向Pilot下发配置,从而解除了Istio和Kubernetes的耦合
- Memory:一个在内存中的Config Controller实现,主要用于测试。
目前Istio的配置包括:
- Virtual Service: 定义流量路由规则。
- Destination Rule: 定义和一个服务或者subset相关的流量处理规则,包括负载均衡策略,连接池大小,断路器设置,subset定义等等。
- Gateway: 定义入口网关上对外暴露的服务。
- Service Entry: 通过定义一个Service Entry可以将一个外部服务手动添加到服务网格中。
- Envoy Filter: 通过Pilot在Envoy的配置中添加一个自定义的Filter。
Service Controller
Service Controller用于管理各种Service Registry,提出服务发现数据,目前Istio支持的Service Registry包括:
- Kubernetes:对接Kubernetes Registry,可以将Kubernetes中定义的Service和Instance采集到Istio中。
- Consul: 对接Consul Catalog,将Consul中定义的Service采集到Istio中。
- MCP: 和MCP config controller类似,从MCP Server中获取Service和Service Instance。
- Memory: 一个内存中的Service Controller实现,主要用于测试。
Discovery Service
Disconvery Service 中主要包含下述逻辑
- 启动GRPCServer并接收来自Envoy端的连接请求。
- 接受Envoy端的xDS请求,从ConfigController和Service Controller中获取配置和服务信息,生成相应消息发送给Envory。
- 监听到来自ConfigController的配置变化消息和来自Service Controller的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy。(备注:目前Pilot未实现增量变化推送,每次变化推送的是全量配置,在网格中服务较多的情况下可能会有性能问题)
Pliot-Discovery业务流程
初始化主要组件
Pilot-Discovery命令的入口为pilot/cmd/pilot-discovery/main.go中的main方法,在该方法中创建Pilot Server,Server代码位于文件pilot/pkg/bootstrap/server.go中。Server主要做了下面一些初始化工作:
- 创建并初始化Config Controller
- 创建并初始化Service Controller
- 创建并初始化Discovery Server,Pilot中创建了基于Envoy V1 API的HTTP Discovery Server和基于Envoy V2 API的GPRC Discovery Server。由于V1已经被废弃,本文将主要分析V2 API的GRPC Discovery Server。
- 将Discovery Server注册为Config Controller和Service Controller的Event Handler,监听配置和服务变化消息。
创建GRPC Server 并接收Envoy的连接请求
Pilot Server创建了一个GRPC Server,用于监听和接受来自Envoy的xDS请求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources方法被注册为GRPC Server的服务处理方法。(后期修改到了pilot/pkg/xds/asd.go)
当GRPC Server收到来自Envoy的连接时,会调用DiscoveryServer.StreamAggregatedResources方法,在该方法中创建一个XdsConnection对象,并开启一个goroutine,从该Connection中接受客户端的xDS请求并进行处理;如果控制面的资源发生变化,Pilot也会通过connection把配置变化推送到Envoy端。
配置变化后向Envoy推送更新
这是Pilot中最复杂的业务流程,主要是因为代码中采用了多个channel和queue对消息进行合并和转发。该业务流程如下
- Config Controller或者ServiceController在配置或服务发生变化时通过回调方法通知Discovery Server,Discovery Server将变化消息放入到PushChannel中。
- Discovery Server通过一个goroutine从push channel中接收变化消息,将一段时间内连续发生的变化消息进行合并。如果超过指定时间没有心的变化消息,则将合并后的消息假如到一个队列Push Queue中
- 另一个DiscoveryServer.StreamAggregatedResources方法中从PushChannel中取出XdsEvent,然后根据上下文生成符合xDS接口规范的DiscoveryResponse,通过GRPC推送给Envoy端。(GRPC会为每个client链接单独分配一个goroutine来进行处理,因此不同客户端连接StreamAggregatedResources处理方法是在不同的goroutine中处理的)
响应Envoy主动发起的xDS请求
pilot和Envoy之间建立的是一个双向的Streaming GRPC服务调用,因此Pilot可以再配置变化时向Envoy推送,Envoy也可以主动发起xDS调用请求获取配置。Envoy主动发起的xDS请求流程如下。
- Envoy通过创建好的GRPC连接发送一个DiscoveryRequest
- Discovery Server通过一个goroutine从XdsConnection中接收来自Envoy的DiscoveryRequest,并将请求发送到ReqChannel中
- Discovery Server的另一个goroutine从ReqChannel中接收DiscoveryRequest,根据上下文生成符合xDS接口规范的DiscoveryResponse,然后返回给Envoy。
DiscoveryServer业务处理关键代码
该部分关键代码位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go
文件中(后istio.io/istio/pilot/pkg/proxy/xds/discovery.go
)
用于监听服务和配置变化消息,并将变化消息合并后通过channel发送到前面提到的StreamAggregatedResources方法进行处理。
ConfigUpdate是处理服务和配置变化的回调函数,Servicecontroller和ConfigController在发生变化时会调用该方法通知Discovery Server
// ConfigUpdate implements ConfigUpdater interface, used to request pushes.
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
if len(model.ConfigsOfKind(req.ConfigsUpdated, kind.Address)) > 0 {
// This is a bit like clearing EDS cache on EndpointShard update. Because Address
// types are fetched dynamically, they are not part of the same protections, so we need to clear
// the cache.
s.Cache.ClearAll()
}
inboundConfigUpdates.Increment()
s.InboundUpdates.Inc()
//服务或配置变化后,将一个pushRequest发送到pushChannel中
s.pushChannel <- req
}
在debounce方法中将连续发生的PushRequest进行合并,如果一段时间内没有收到新的PushRequest,再发起推送;以避免由于服务和配置频繁变化带来较大压力
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
......
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
//一段时间内没有收到新的PushRequest,再发起推送
if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
if req != nil {
pushCounter++
adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents,
quietTime, eventDelay, req.Full)
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(DebounceAfter - quietTime)
}
}
for {
select {
......
case r := <-ch:
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(DebounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
//合并连续发生的多个PushRequest
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}