Knative 驾驭篇:带你 '纵横驰骋' Knative
优采云 发布时间: 2020-08-09 15:33Knative 中提供了手动扩缩容灵活的实现机制,本文从 三横两纵 的维度带你深入了解 KPA 自动扩缩容的实现机制。让你轻松驾驭 Knative 自动扩缩容。
注:本文基于最新 Knative v0.11.0 版本代码剖析
KPA 实现流程图
在 Knative 中,创建一个 Revision 会相应的创建 PodAutoScaler 资源。在KPA中通过操作 PodAutoScaler 资源,对当前的 Revision 中的 POD 进行扩缩容。
针对里面的流程实现,我们从三横两纵的维度进行分析其实现机制。
三横KPA 控制器
通过Revision 创建PodAutoScaler, 在 KPA 控制器中主要包括两个资源(Decider 和 Metric)和一个操作(Scale)。主要代码如下
func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler) error {
......
decider, err := c.reconcileDecider(ctx, pa, pa.Status.MetricsServiceName)
if err != nil {
return fmt.Errorf("error reconciling Decider: %w", err)
}
if err := c.ReconcileMetric(ctx, pa, pa.Status.MetricsServiceName); err != nil {
return fmt.Errorf("error reconciling Metric: %w", err)
}
// Metrics services are no longer needed as we use the private services now.
if err := c.DeleteMetricsServices(ctx, pa); err != nil {
return err
}
// Get the appropriate current scale from the metric, and right size
// the scaleTargetRef based on it.
want, err := c.scaler.Scale(ctx, pa, sks, decider.Status.DesiredScale)
if err != nil {
return fmt.Errorf("error scaling target: %w", err)
}
......
}
这里先介绍一下两个资源:
再看一下Scale操作,在Scale方式中,根据扩缩容POD数、最小实例数和最大实例数确定最终须要扩容的POD实例数,然后更改deployment的Replicas值,最终实现POD的扩缩容, 代码实现如下:
// Scale attempts to scale the given PA's target reference to the desired scale.
func (ks *scaler) Scale(ctx context.Context, pa *pav1alpha1.PodAutoscaler, sks *nv1a1.ServerlessService, desiredScale int32) (int32, error) {
......
min, max := pa.ScaleBounds()
if newScale := applyBounds(min, max, desiredScale); newScale != desiredScale {
logger.Debugf("Adjusting desiredScale to meet the min and max bounds before applying: %d -> %d", desiredScale, newScale)
desiredScale = newScale
}
desiredScale, shouldApplyScale := ks.handleScaleToZero(ctx, pa, sks, desiredScale)
if !shouldApplyScale {
return desiredScale, nil
}
ps, err := resources.GetScaleResource(pa.Namespace, pa.Spec.ScaleTargetRef, ks.psInformerFactory)
if err != nil {
return desiredScale, fmt.Errorf("failed to get scale target %v: %w", pa.Spec.ScaleTargetRef, err)
}
currentScale := int32(1)
if ps.Spec.Replicas != nil {
currentScale = *ps.Spec.Replicas
}
if desiredScale == currentScale {
return desiredScale, nil
}
logger.Infof("Scaling from %d to %d", currentScale, desiredScale)
return ks.applyScale(ctx, pa, desiredScale, ps)
}
根据指标定时估算 POD 数
这是一个关于Decider的故事。Decider创建以后会同时创建下来一个定时器,该定时器默认每隔 2 秒(可以通过TickInterval 参数配置)会调用Scale方式,该Scale方式实现如下:
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {
......
metricName := spec.ScalingMetric
var observedStableValue, observedPanicValue float64
switch spec.ScalingMetric {
case autoscaling.RPS:
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)
a.reporter.ReportStableRPS(observedStableValue)
a.reporter.ReportPanicRPS(observedPanicValue)
a.reporter.ReportTargetRPS(spec.TargetValue)
default:
metricName = autoscaling.Concurrency // concurrency is used by default
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)
a.reporter.ReportStableRequestConcurrency(observedStableValue)
a.reporter.ReportPanicRequestConcurrency(observedPanicValue)
a.reporter.ReportTargetRequestConcurrency(spec.TargetValue)
}
// Put the scaling metric to logs.
logger = logger.With(zap.String("metric", metricName))
if err != nil {
if err == ErrNoData {
logger.Debug("No data to scale on yet")
} else {
logger.Errorw("Failed to obtain metrics", zap.Error(err))
}
return 0, 0, false
}
// Make sure we don't get stuck with the same number of pods, if the scale up rate
// is too conservative and MaxScaleUp*RPC==RPC, so this permits us to grow at least by a single
// pod if we need to scale up.
// E.g. MSUR=1.1, OCC=3, RPC=2, TV=1 => OCC/TV=3, MSU=2.2 => DSPC=2, while we definitely, need
// 3 pods. See the unit test for this scenario in action.
maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)
// Same logic, opposite math applies here.
maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)
dspc := math.Ceil(observedStableValue / spec.TargetValue)
dppc := math.Ceil(observedPanicValue / spec.TargetValue)
logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",
dspc, dppc, maxScaleUp, maxScaleDown)
// We want to keep desired pod count in the [maxScaleDown, maxScaleUp] range.
desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))
desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))
......
return desiredPodCount, excessBC, true
}
该方式主要是从 MetricCollector 中获取指标信息,根据指标信息估算出须要扩缩的POD数。然后设置在 Decider 中。另外当 Decider 中 POD 期望值发生变化时会触发 PodAutoscaler 重新调和的操作,关键代码如下:
......
if runner.updateLatestScale(desiredScale, excessBC) {
m.Inform(metricKey)
}
......
在KPA controller中设置调和Watch操作:
......
// Have the Deciders enqueue the PAs whose decisions have changed.
deciders.Watch(impl.EnqueueKey)
......
指标采集
通过两种方法搜集POD指标:
PUSH 采集指标实现比较简单,在main.go中 暴露服务,将接收到的 metric 推送到 MetricCollector 中:
// Set up a statserver.
statsServer := statserver.New(statsServerAddr, statsCh, logger)
....
go func() {
for sm := range statsCh {
collector.Record(sm.Key, sm.Stat)
multiScaler.Poke(sm.Key, sm.Stat)
}
}()
PULL 采集指标是怎样搜集的呢? 还记得前面提及的Metric资源吧,这里接收到Metric资源又会创建出一个定时器,这个定时器每隔 1 秒会访问 queue-proxy 9090 端口采集指标信息。关键代码如下:
<p>// newCollection creates a new collection, which uses the given scraper to
// collect stats every scrapeTickInterval.
func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection {
c := &collection{
metric: metric,
concurrencyBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),
rpsBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),
scraper: scraper,
stopCh: make(chan struct{}),
}
logger = logger.Named("collector").With(
zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name)))
c.grp.Add(1)
go func() {
defer c.grp.Done()
scrapeTicker := time.NewTicker(scrapeTickInterval)
for {
select {
case