Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ const (
MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack"
NumGoRoutines = CadenceMetricsPrefix + "num-go-routines"

EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"
PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage"

WorkerUsageCollectorPanic = CadenceMetricsPrefix + "worker-metrics-collector-panic"
)
9 changes: 6 additions & 3 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func newWorkflowTaskWorkerInternal(
taskWorker: poller,
identity: params.Identity,
workerType: "DecisionWorker",
shutdownTimeout: params.WorkerStopTimeout},
shutdownTimeout: params.WorkerStopTimeout,
sync: &params.Sync},
params.Logger,
params.MetricsScope,
nil,
Expand All @@ -304,7 +305,8 @@ func newWorkflowTaskWorkerInternal(
taskWorker: localActivityTaskPoller,
identity: params.Identity,
workerType: "LocalActivityWorker",
shutdownTimeout: params.WorkerStopTimeout},
shutdownTimeout: params.WorkerStopTimeout,
sync: &params.Sync},
params.Logger,
params.MetricsScope,
nil,
Expand Down Expand Up @@ -482,7 +484,8 @@ func newActivityTaskWorker(
identity: workerParams.Identity,
workerType: workerType,
shutdownTimeout: workerParams.WorkerStopTimeout,
userContextCancel: workerParams.UserContextCancel},
userContextCancel: workerParams.UserContextCancel,
sync: &workerParams.Sync},
workerParams.Logger,
workerParams.MetricsScope,
sessionTokenBucket,
Expand Down
118 changes: 47 additions & 71 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sync"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -57,7 +55,7 @@ var (

var errShutdown = errors.New("worker shutting down")

var collectHardwareUsageOnce sync.Once
var emitOnce sync.Once

type (
// resultHandler that returns result
Expand Down Expand Up @@ -124,6 +122,7 @@ type (
shutdownTimeout time.Duration
userContextCancel context.CancelFunc
host string
sync *oncePerHost
}

// baseWorker that wraps worker activities.
Expand All @@ -140,15 +139,20 @@ type (
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
workerUsageCollector *workerUsageCollector
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
}

polledTask struct {
task interface{}
}

oncePerHost interface {
Do(func())
}
)

func createPollRetryPolicy() backoff.RetryPolicy {
Expand All @@ -174,16 +178,36 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
)
}

var once oncePerHost
if options.sync == nil {
once = &emitOnce
} else {
once = *options.sync
}

// for now it's default to be enabled
workerUC := newWorkerUsageCollector(
workerUsageCollectorOptions{
Enabled: true,
Cooldown: 30 * time.Second,
MetricsScope: metricsScope,
WorkerType: options.workerType,
EmitOnce: once,
},
logger,
)

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
workerUsageCollector: workerUC,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.

limiterContext: ctx,
limiterContextCancel: cancel,
Expand All @@ -207,6 +231,10 @@ func (bw *baseWorker) Start() {
bw.pollerAutoScaler.Start()
}

if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Start()
}

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
go bw.runPoller()
Expand All @@ -215,11 +243,6 @@ func (bw *baseWorker) Start() {
bw.shutdownWG.Add(1)
go bw.runTaskDispatcher()

// We want the emit function run once per host instead of run once per worker
// since the emit function is host level metric.
bw.shutdownWG.Add(1)
go bw.emitHardwareUsage()

bw.isWorkerStarted = true
traceLog(func() {
bw.logger.Info("Started Worker",
Expand Down Expand Up @@ -403,6 +426,9 @@ func (bw *baseWorker) Stop() {
if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Stop()
}
if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Stop()
}

if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
Expand All @@ -416,53 +442,3 @@ func (bw *baseWorker) Stop() {
}
return
}

func (bw *baseWorker) emitHardwareUsage() {
defer func() {
if p := recover(); p != nil {
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
st := getStackTraceRaw(topLine, 7, 0)
bw.logger.Error("Unhandled panic in hardware emitting.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer bw.shutdownWG.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(hardwareMetricsCollectInterval)
for {
select {
case <-bw.shutdownCh:
ticker.Stop()
return
case <-ticker.C:
host := bw.options.host
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})

cpuPercent, err := cpu.Percent(0, false)
if err != nil {
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
return
}
cpuCores, err := cpu.Counts(false)
if err != nil {
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
return
}
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
}
}
})

}
18 changes: 15 additions & 3 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,19 @@ type (
mockCtrl *gomock.Controller
service *workflowservicetest.MockClient
}

// fakeSyncOnce is a fake implementation of oncePerHost interface
// that DOES NOT ensure run only once per host
fakeSyncOnce struct {
}
)

var fakeSyncOnceValue fakeSyncOnce

func (m *fakeSyncOnce) Do(f func()) {
f()
}

func helloWorldWorkflowFunc(ctx Context, input []byte) error {
queryResult := startingQueryValue
SetQueryHandler(ctx, queryType, func() (string, error) {
Expand Down Expand Up @@ -179,12 +190,12 @@ func (s *InterfacesTestSuite) TestInterface() {
domain := "testDomain"
// Workflow execution parameters.
workflowExecutionParameters := workerExecutionParameters{
TaskList: "testTaskList",
WorkerOptions: WorkerOptions{
MaxConcurrentActivityTaskPollers: 4,
MaxConcurrentDecisionTaskPollers: 4,
Logger: zaptest.NewLogger(s.T()),
Tracer: opentracing.NoopTracer{}},
Tracer: opentracing.NoopTracer{},
Sync: &fakeSyncOnce{}},
}

domainStatus := m.DomainStatusRegistered
Expand Down Expand Up @@ -216,7 +227,8 @@ func (s *InterfacesTestSuite) TestInterface() {
MaxConcurrentActivityTaskPollers: 10,
MaxConcurrentDecisionTaskPollers: 10,
Logger: zaptest.NewLogger(s.T()),
Tracer: opentracing.NoopTracer{}},
Tracer: opentracing.NoopTracer{},
Sync: &fakeSyncOnce{}},
}

// Register activity instances and launch the worker.
Expand Down
10 changes: 6 additions & 4 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func createShadowWorker(
return createWorkerWithThrottle(t, service, 0, WorkerOptions{
EnableShadowWorker: true,
ShadowOptions: *shadowOptions,
Sync: &fakeSyncOnce{},
})
}

Expand Down Expand Up @@ -409,6 +410,7 @@ func createWorkerWithThrottle(
workerOptions.TaskListActivitiesPerSecond = activitiesPerSecond
workerOptions.Logger = zaptest.NewLogger(t)
workerOptions.EnableSessionWorker = true
workerOptions.Sync = &fakeSyncOnce{}

// Start Worker.
worker := NewWorker(
Expand All @@ -423,14 +425,14 @@ func createWorkerWithDataConverter(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter(), Sync: &fakeSyncOnce{}})
}

func createWorkerWithAutoscaler(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}, Sync: &fakeSyncOnce{}})
}

func createWorkerWithStrictNonDeterminismDisabled(
Expand All @@ -444,7 +446,7 @@ func createWorkerWithHost(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host", Sync: &fakeSyncOnce{}})
}

func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {
Expand Down Expand Up @@ -1031,7 +1033,7 @@ func TestActivityNilArgs(t *testing.T) {
func TestWorkerOptionDefaults(t *testing.T) {
domain := "worker-options-test"
taskList := "worker-options-tl"
aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{Sync: &fakeSyncOnce{}})
decisionWorker := aggWorker.workflowWorker
require.True(t, decisionWorker.executionParameters.Identity != "")
require.NotNil(t, decisionWorker.executionParameters.Logger)
Expand Down
Loading