作者: 哗啦啦 mesh团队,热衷于kubernetes、devops、apollo、istio、linkerd、openstack、calico 等领域技术。
Linkerd由 控制平面
和 数据平面
组成:
控制平面
是在所属的 Kubernetes命名空间
(linkerd默认情况下)中运行的一组服务,这些服务可以完成 汇聚遥测数据
,提供面向用户的API,并向 数据平面
代理 提供控制数据
等,它们 共同驱动
数据平面。
数据平面
用Rust编写的轻量级代理,该代理安装在服务的 每个pod
中,并成为数据平面的一部分,它接收Pod的 所有接入
流量,并通过 initContainer
配置 iptables
正确转发流量的拦截所有传出流量,因为它是附加工具,并且拦截服务的所有 传入和传出
流量,所以不需要更改代码,甚至可以将其添加到 正在运行
的服务中。
借用官方的图:
proxy由rust开发完成,其内部的异步运行时采用了 Tokio 框架,服务组件用到了 tower 。
本文主要关注proxy与destination组件交互相关的整体逻辑,分析proxy内部的运行逻辑。
proxy启动后:
app::init
初始化配置 app::Main::new
创建主逻辑 main
, main.run_until
内新加一任务 ProxyParts::build_proxy_task
。 在 ProxyParts::build_proxy_task
中会进行一系列的初始化工作,此处只关注 dst_svc
,其创建代码为:
dst_svc = svc::stack(connect::svc(keepalive)) .push(tls::client::layer(local_identity.clone())) .push_timeout(config.control_connect_timeout) .push(control::client::layer()) .push(control::resolve::layer(dns_resolver.clone())) .push(reconnect::layer({ let backoff = config.control_backoff.clone(); move |_| Ok(backoff.stream()) })) .push(http_metrics::layer::<_, classify::Response>( ctl_http_metrics.clone(), )) .push(proxy::grpc::req_body_as_payload::layer().per_make()) .push(control::add_origin::layer()) .push_buffer_pending( config.destination_buffer_capacity, config.control_dispatch_timeout, ) .into_inner() .make(config.destination_addr.clone())
dst_svc
一共有2处引用,一是 crate::resolve::Resolver
的创建会涉及;另一个就是 ProfilesClient
的创建。
Resolver
api_resolve::Resolve::new(dst_svc.clone())
创建 resolver
对象 outbound::resolve
创建 map_endpoint::Resolve
类型对象,并当做参数 resolve
传入 outbound::spawn
函数开启出口线程 在 outbound::spawn
中, resolve
被用于创建负载均衡控制层,并用于后续路由控制:
let balancer_layer = svc::layers() .push_spawn_ready() .push(discover::Layer::new( DISCOVER_UPDATE_BUFFER_CAPACITY, resolve, )) .push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));
在 discover::Layer::layer
中:
let from_resolve = FromResolve::new(self.resolve.clone()); let make_discover = MakeEndpoint::new(make_endpoint, from_resolve); Buffer::new(self.capacity, make_discover)
Profiles
ProfilesClient::new
中调用 api::client::Destination::new(dst_svc)
创建grpc的client端并存于成员变量 service
profiles_client
对象会被用于 inbound
和 outbound
的创建(省略无关代码): let dst_stack = svc::stack(...)... .push(profiles::router::layer( profile_suffixes, profiles_client, dst_route_stack, )) ...
其中 profiles::router::layer
会创建一个 Layer
对象,并将 profiles_client
赋予 get_routes
成员。然后在 service
方法中,会调到 Layer::layer
方法,里面会创建一个 MakeSvc
对象,其 get_routes
成员的值即为 profiles_client
。
新的连接过来时,从 listen
拿到连接对象后,会交给 linkerd_proxy::transport::tls::accept::AcceptTls
的 call
,然后是 linkerd2_proxy::proxy::server::Server
的 call
,并最终分别调用 linkerd2_proxy_http::balance::MakeSvc::call
和 linkerd2_proxy_http::profiles::router::MakeSvc::call
方法。
balance
在 linkerd2_proxy_http::balance::MakeSvc::call
中:
inner.call(target)
,此处的 inner
即是前面 Buffer::new
的结果。 linkerd2_proxy_http::balance::MakeSvc
对象,当做 Future
返回 先看 inner.call
。它内部经过层层调用,依次触发 Buffer
、 MakeEndpoint
、 FromResolve
等结构的 call
方法,最终会触发最开始创建的 resolve.resolve(target)
,其内部调用 api_resolve::Resolve::call
。
在 api_resolve::Resolve::call
中:
fn call(&mut self, target: T) -> Self::Future { let path = target.to_string(); trace!("resolve {:?}", path); self.service // GRPC请求,获取k8s的endpoint .get(grpc::Request::new(api::GetDestination { path, scheme: self.scheme.clone(), context_token: self.context_token.clone(), })) .map(|rsp| { debug!(metadata = ?rsp.metadata()); // 拿到结果stream Resolution { inner: rsp.into_inner(), } }) }
将返回的 Resolution
再次放入 MakeSvc
中,然后看其poll:
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { // 这个poll会依次调用: // linkerd2_proxy_api_resolve::resolve::Resolution::poll // linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll // linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll // 最终获得Poll<Change<SocketAddr, Endpoint>> let discover = try_ready!(self.inner.poll()); let instrument = PendingUntilFirstData::default(); let loaded = PeakEwmaDiscover::new(discover, self.default_rtt, self.decay, instrument); let balance = Balance::new(loaded, self.rng.clone()); Ok(Async::Ready(balance)) }
最终返回service Balance
。
当具体请求过来后,先会判断 Balance::poll_ready
:
fn poll_ready(&mut self) -> Poll<(), Self::Error> { // 获取Update<Endpoint> // 将Remove的从self.ready_services中删掉 // 将Insert的构造UnreadyService结构加到self.unready_services self.poll_discover()?; // 对UnreadyService,调用其poll,内部会调用到svc的poll_ready判断endpoint是否可用 // 可用时,将其加入self.ready_services self.poll_unready(); loop { if let Some(index) = self.next_ready_index { // 找到对应的endpoint,可用则返回 if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { return Ok(Async::Ready(())); } } // 选择负载比较低的endpoint self.next_ready_index = self.p2c_next_ready_index(); if self.next_ready_index.is_none() { // return Ok(Async::NotReady); } } }
就绪后,对请求 req
调用 call
:
fn call(&mut self, request: Req) -> Self::Future { // 找到下一个可用的svc,并将其从ready_services中删除 let index = self.next_ready_index.take().expect("not ready"); let (key, mut svc) = self .ready_services .swap_remove_index(index) .expect("invalid ready index"); // 将请求转过去 let fut = svc.call(request); // 加到unready self.push_unready(key, svc); fut.map_err(Into::into) }
profiles
在 linkerd2_proxy_http::profiles::router::MakeSvc::call
中:
// Initiate a stream to get route and dst_override updates for this // destination. let route_stream = match target.get_destination() { Some(ref dst) => { if self.suffixes.iter().any(|s| s.contains(dst.name())) { debug!("fetching routes for {:?}", dst); self.get_routes.get_routes(&dst) } else { debug!("skipping route discovery for dst={:?}", dst); None } } None => { debug!("no destination for routes"); None } };
经过若干判断后,会调用 ProfilesClient::get_routes
并将结果存于 route_stream
。
进入 get_routes
:
fn get_routes(&self, dst: &NameAddr) -> Option<Self::Stream> { // 创建通道 let (tx, rx) = mpsc::channel(1); // This oneshot allows the daemon to be notified when the Self::Stream // is dropped. let (hangup_tx, hangup_rx) = oneshot::channel(); // 创建Daemon对象(Future任务) let daemon = Daemon { tx, hangup: hangup_rx, dst: format!("{}", dst), state: State::Disconnected, service: self.service.clone(), backoff: self.backoff, context_token: self.context_token.clone(), }; // 调用Daemon::poll let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ()))); // 将通道接收端传出 spawn.ok().map(|_| Rx { rx, _hangup: hangup_tx, }) }
接着看 Daemon::poll
:
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { // 遍历state成员状态 self.state = match self.state { // 未连接时 State::Disconnected => { match self.service.poll_ready() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(())) => {} Err(err) => { error!( "profile service unexpected error (dst = {}): {:?}", self.dst, err, ); return Ok(Async::Ready(())); } }; // 构造grpc请求 let req = api::GetDestination { scheme: "k8s".to_owned(), path: self.dst.clone(), context_token: self.context_token.clone(), }; debug!("getting profile: {:?}", req); // 获取请求任务 let rspf = self.service.get_profile(grpc::Request::new(req)); State::Waiting(rspf) } // 正在请求时,从请求中获取回复 State::Waiting(ref mut f) => match f.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), // 正常回复 Ok(Async::Ready(rsp)) => { trace!("response received"); // 流式回复 State::Streaming(rsp.into_inner()) } Err(e) => { warn!("error fetching profile for {}: {:?}", self.dst, e); State::Backoff(Delay::new(clock::now() + self.backoff)) } }, // 接收回复 State::Streaming(ref mut s) => { // 处理回复流 // 注意此处,参数1是get_profile请求的回复流, // 参数2是之前创建的通道发送端 match Self::proxy_stream(s, &mut self.tx, &mut self.hangup) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(StreamState::SendLost) => return Ok(().into()), Async::Ready(StreamState::RecvDone) => { State::Backoff(Delay::new(clock::now() + self.backoff)) } } } // 异常,结束请求 State::Backoff(ref mut f) => match f.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Err(_) | Ok(Async::Ready(())) => State::Disconnected, }, }; } }
接着 proxy_stream
:
fn proxy_stream( rx: &mut grpc::Streaming<api::DestinationProfile, T::ResponseBody>, tx: &mut mpsc::Sender<profiles::Routes>, hangup: &mut oneshot::Receiver<Never>, ) -> Async<StreamState> { loop { // 发送端是否就绪 match tx.poll_ready() { Ok(Async::NotReady) => return Async::NotReady, Ok(Async::Ready(())) => {} Err(_) => return StreamState::SendLost.into(), } // 从grpc stream中取得一条数据 match rx.poll() { Ok(Async::NotReady) => match hangup.poll() { Ok(Async::Ready(never)) => match never {}, // unreachable! Ok(Async::NotReady) => { // We are now scheduled to be notified if the hangup tx // is dropped. return Async::NotReady; } Err(_) => { // Hangup tx has been dropped. debug!("profile stream cancelled"); return StreamState::SendLost.into(); } }, Ok(Async::Ready(None)) => return StreamState::RecvDone.into(), // 正确取得profile结构 Ok(Async::Ready(Some(profile))) => { debug!("profile received: {:?}", profile); // 解析数据 let retry_budget = profile.retry_budget.and_then(convert_retry_budget); let routes = profile .routes .into_iter() .filter_map(move |orig| convert_route(orig, retry_budget.as_ref())) .collect(); let dst_overrides = profile .dst_overrides .into_iter() .filter_map(convert_dst_override) .collect(); // 构造profiles::Routes结构并推到发送端 match tx.start_send(profiles::Routes { routes, dst_overrides, }) { Ok(AsyncSink::Ready) => {} // continue Ok(AsyncSink::NotReady(_)) => { info!("dropping profile update due to a full buffer"); // This must have been because another task stole // our tx slot? It seems pretty unlikely, but possible? return Async::NotReady; } Err(_) => { return StreamState::SendLost.into(); } } } Err(e) => { warn!("profile stream failed: {:?}", e); return StreamState::RecvDone.into(); } } } }
回到 MakeSvc::call
方法,前面创建的 route_stream
会被用于创建一个 linkerd2_proxy::proxy::http::profiles::router::Service
任务对象,并在其 poll_ready
方法中通过 poll_route_stream
从 route_steam
获取 profiles::Routes
并调用 update_routes
创建具体可用的路由规则 linkerd2_router::Router
,至此,路由规则已建好,就等具体的请求过来然后在 call
中调用 linkerd2_router::call
进行对请求的路由判断。
proxy采用的tower框架,每个处理逻辑都是其中的一个layer,开发时只需层层堆叠即可。不过,也正因如此,各层之间的接口都极其相似,须得小心不可调错。 对于destination这部分逻辑,linkerd2的destination组件收到来自proxy的grpc请求后,每当endpoint或service profile有任何变动,都会立即通过stream发送过去,proxy收到后根据endpoint调整负载均衡策略,根据service profile调整路由,然后通过它们来处理用户服务的实际请求。