Knative 驾驭篇:带你 '纵横驰骋' Knative

优采云 发布时间: 2020-08-09 15:33

  Knative 中提供了手动扩缩容灵活的实现机制,本文从 三横两纵 的维度带你深入了解 KPA 自动扩缩容的实现机制。让你轻松驾驭 Knative 自动扩缩容。

  注:本文基于最新 Knative v0.11.0 版本代码剖析

  KPA 实现流程图

  

  在 Knative 中,创建一个 Revision 会相应的创建 PodAutoScaler 资源。在KPA中通过操作 PodAutoScaler 资源,对当前的 Revision 中的 POD 进行扩缩容。

  针对里面的流程实现,我们从三横两纵的维度进行分析其实现机制。

  三横KPA 控制器

  通过Revision 创建PodAutoScaler, 在 KPA 控制器中主要包括两个资源(Decider 和 Metric)和一个操作(Scale)。主要代码如下

  func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler) error {

......

decider, err := c.reconcileDecider(ctx, pa, pa.Status.MetricsServiceName)

if err != nil {

return fmt.Errorf("error reconciling Decider: %w", err)

}

if err := c.ReconcileMetric(ctx, pa, pa.Status.MetricsServiceName); err != nil {

return fmt.Errorf("error reconciling Metric: %w", err)

}

// Metrics services are no longer needed as we use the private services now.

if err := c.DeleteMetricsServices(ctx, pa); err != nil {

return err

}

// Get the appropriate current scale from the metric, and right size

// the scaleTargetRef based on it.

want, err := c.scaler.Scale(ctx, pa, sks, decider.Status.DesiredScale)

if err != nil {

return fmt.Errorf("error scaling target: %w", err)

}

......

}

  这里先介绍一下两个资源:

  再看一下Scale操作,在Scale方式中,根据扩缩容POD数、最小实例数和最大实例数确定最终须要扩容的POD实例数,然后更改deployment的Replicas值,最终实现POD的扩缩容, 代码实现如下:

  // Scale attempts to scale the given PA's target reference to the desired scale.

func (ks *scaler) Scale(ctx context.Context, pa *pav1alpha1.PodAutoscaler, sks *nv1a1.ServerlessService, desiredScale int32) (int32, error) {

......

min, max := pa.ScaleBounds()

if newScale := applyBounds(min, max, desiredScale); newScale != desiredScale {

logger.Debugf("Adjusting desiredScale to meet the min and max bounds before applying: %d -> %d", desiredScale, newScale)

desiredScale = newScale

}

desiredScale, shouldApplyScale := ks.handleScaleToZero(ctx, pa, sks, desiredScale)

if !shouldApplyScale {

return desiredScale, nil

}

ps, err := resources.GetScaleResource(pa.Namespace, pa.Spec.ScaleTargetRef, ks.psInformerFactory)

if err != nil {

return desiredScale, fmt.Errorf("failed to get scale target %v: %w", pa.Spec.ScaleTargetRef, err)

}

currentScale := int32(1)

if ps.Spec.Replicas != nil {

currentScale = *ps.Spec.Replicas

}

if desiredScale == currentScale {

return desiredScale, nil

}

logger.Infof("Scaling from %d to %d", currentScale, desiredScale)

return ks.applyScale(ctx, pa, desiredScale, ps)

}

  根据指标定时估算 POD 数

  这是一个关于Decider的故事。Decider创建以后会同时创建下来一个定时器,该定时器默认每隔 2 秒(可以通过TickInterval 参数配置)会调用Scale方式,该Scale方式实现如下:

  func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {

......

metricName := spec.ScalingMetric

var observedStableValue, observedPanicValue float64

switch spec.ScalingMetric {

case autoscaling.RPS:

observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)

a.reporter.ReportStableRPS(observedStableValue)

a.reporter.ReportPanicRPS(observedPanicValue)

a.reporter.ReportTargetRPS(spec.TargetValue)

default:

metricName = autoscaling.Concurrency // concurrency is used by default

observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)

a.reporter.ReportStableRequestConcurrency(observedStableValue)

a.reporter.ReportPanicRequestConcurrency(observedPanicValue)

a.reporter.ReportTargetRequestConcurrency(spec.TargetValue)

}

// Put the scaling metric to logs.

logger = logger.With(zap.String("metric", metricName))

if err != nil {

if err == ErrNoData {

logger.Debug("No data to scale on yet")

} else {

logger.Errorw("Failed to obtain metrics", zap.Error(err))

}

return 0, 0, false

}

// Make sure we don't get stuck with the same number of pods, if the scale up rate

// is too conservative and MaxScaleUp*RPC==RPC, so this permits us to grow at least by a single

// pod if we need to scale up.

// E.g. MSUR=1.1, OCC=3, RPC=2, TV=1 => OCC/TV=3, MSU=2.2 => DSPC=2, while we definitely, need

// 3 pods. See the unit test for this scenario in action.

maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)

// Same logic, opposite math applies here.

maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)

dspc := math.Ceil(observedStableValue / spec.TargetValue)

dppc := math.Ceil(observedPanicValue / spec.TargetValue)

logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",

dspc, dppc, maxScaleUp, maxScaleDown)

// We want to keep desired pod count in the [maxScaleDown, maxScaleUp] range.

desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))

desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))

......

return desiredPodCount, excessBC, true

}

  该方式主要是从 MetricCollector 中获取指标信息,根据指标信息估算出须要扩缩的POD数。然后设置在 Decider 中。另外当 Decider 中 POD 期望值发生变化时会触发 PodAutoscaler 重新调和的操作,关键代码如下:

  ......

if runner.updateLatestScale(desiredScale, excessBC) {

m.Inform(metricKey)

}

......

  在KPA controller中设置调和Watch操作:

  ......

// Have the Deciders enqueue the PAs whose decisions have changed.

deciders.Watch(impl.EnqueueKey)

......

  指标采集

  通过两种方法搜集POD指标:

  PUSH 采集指标实现比较简单,在main.go中 暴露服务,将接收到的 metric 推送到 MetricCollector 中:

  // Set up a statserver.

statsServer := statserver.New(statsServerAddr, statsCh, logger)

....

go func() {

for sm := range statsCh {

collector.Record(sm.Key, sm.Stat)

multiScaler.Poke(sm.Key, sm.Stat)

}

}()

  PULL 采集指标是怎样搜集的呢? 还记得前面提及的Metric资源吧,这里接收到Metric资源又会创建出一个定时器,这个定时器每隔 1 秒会访问 queue-proxy 9090 端口采集指标信息。关键代码如下:

<p>// newCollection creates a new collection, which uses the given scraper to

// collect stats every scrapeTickInterval.

func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection {

c := &collection{

metric: metric,

concurrencyBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),

rpsBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),

scraper: scraper,

stopCh: make(chan struct{}),

}

logger = logger.Named("collector").With(

zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name)))

c.grp.Add(1)

go func() {

defer c.grp.Done()

scrapeTicker := time.NewTicker(scrapeTickInterval)

for {

select {

case

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线