Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature/more-resi…
Browse files Browse the repository at this point in the history
…liency-metrics
  • Loading branch information
jake-engelberg committed Oct 28, 2024
2 parents 90ee517 + 92efe94 commit cd965f5
Show file tree
Hide file tree
Showing 99 changed files with 4,785 additions and 745 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ spec:
tolerations:
{{ toYaml .Values.global.tolerations | indent 8 }}
{{- end }}
{{- if .Values.global.priorityClassName }}
priorityClassName:
{{ toYaml .Values.global.priorityClassName | indent 8 }}
{{- end }}
{{- if or (eq .Values.global.ha.enabled true) (eq .Values.ha true) }}
{{- if eq .Values.cluster.forceInMemoryLog false }}
volumeClaimTemplates:
Expand All @@ -271,8 +275,4 @@ spec:
{{- end }}
{{- end }}
{{- end }}
{{- if .Values.global.priorityClassName }}
priorityClassName:
{{ toYaml .Values.global.priorityClassName | indent 8 }}
{{- end }}
{{- end }}
7 changes: 5 additions & 2 deletions charts/dapr/charts/dapr_rbac/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ metadata:
{{- range $key, $value := .Values.global.k8sLabels }}
{{ $key }}: {{ tpl $value $ }}
{{- end }}
rules: []
rules:
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list", "watch"]
---
{{- if eq .Values.global.rbac.namespaced true }}
kind: RoleBinding
Expand All @@ -47,4 +50,4 @@ roleRef:
kind: ClusterRole
{{- end }}
name: dapr-scheduler
{{- end }}
{{- end }}
1 change: 1 addition & 0 deletions cmd/scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func Run() {
DataDir: opts.EtcdDataDir,
ReplicaCount: opts.ReplicaCount,
ReplicaID: opts.ReplicaID,
KubeConfig: opts.KubeConfig,
EtcdID: opts.ID,
EtcdInitialPeers: opts.EtcdInitialPeers,
EtcdClientPorts: opts.EtcdClientPorts,
Expand Down
14 changes: 14 additions & 0 deletions cmd/scheduler/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package options

import (
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Options struct {
SentryAddress string
PlacementAddress string
Mode string
KubeConfig *string

ID string
ReplicaID uint32
Expand All @@ -61,6 +63,7 @@ type Options struct {
Metrics *metrics.FlagOptions

taFile string
kubeconfig string
etcdSpaceQuota string
}

Expand All @@ -83,6 +86,10 @@ func New(origArgs []string) (*Options, error) {
fs.StringVar(&opts.taFile, "trust-anchors-file", securityConsts.ControlPlaneDefaultTrustAnchorsPath, "Filepath to the trust anchors for the Dapr control plane")
fs.StringVar(&opts.SentryAddress, "sentry-address", fmt.Sprintf("dapr-sentry.%s.svc:443", security.CurrentNamespace()), "Address of the Sentry service")
fs.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr Scheduler")
fs.StringVar(&opts.kubeconfig, "kubeconfig", "", "Kubernetes mode only. Absolute path to the kubeconfig file.")
if err := fs.MarkHidden("kubeconfig"); err != nil {
panic(err)
}

fs.StringVar(&opts.ID, "id", "dapr-scheduler-server-0", "Scheduler server ID")
fs.Uint32Var(&opts.ReplicaCount, "replica-count", 1, "The total number of scheduler replicas in the cluster")
Expand Down Expand Up @@ -134,5 +141,12 @@ func New(origArgs []string) (*Options, error) {
log.Warnf("--etcd-space-quota of %s may be too low for production use. Consider increasing the value to 16Gi or larger.", etcdSpaceQuota.String())
}

if fs.Changed("kubeconfig") {
if opts.Mode != string(modes.KubernetesMode) {
return nil, errors.New("kubeconfig flag is only valid in --mode=kubernetes")
}
opts.KubeConfig = &opts.kubeconfig
}

return &opts, nil
}
57 changes: 57 additions & 0 deletions cmd/scheduler/options/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
t.Run("don't error if only using defaults", func(t *testing.T) {
_, err := New([]string{})
require.NoError(t, err)
})

t.Run("error when kubeconfig is set and using defaults", func(t *testing.T) {
_, err := New([]string{
"--kubeconfig=" + t.TempDir(),
})
require.Error(t, err)
})

t.Run("don't error when mode set to kubernetes", func(t *testing.T) {
_, err := New([]string{
"--mode=kubernetes",
})
require.NoError(t, err)
})

t.Run("error when mode set to standalone and kubeconfig is set", func(t *testing.T) {
_, err := New([]string{
"--mode=standalone",
"--kubeconfig=" + t.TempDir(),
})
require.Error(t, err)
})

t.Run("don't error when mode set to kubernetes and kubeconfig is set", func(t *testing.T) {
_, err := New([]string{
"--mode=kubernetes",
"--kubeconfig=" + t.TempDir(),
})
require.NoError(t, err)
})
}
5 changes: 4 additions & 1 deletion dapr/proto/scheduler/v1/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,11 @@ message NamedJob {
// name is the name of the job.
string name = 1;

// The metadata associated with the job.
JobMetadata metadata = 2;

// The job scheduled.
Job job = 2;
Job job = 3;
}

// ListJobsRequest is the message used by the daprd sidecar to list all jobs.
Expand Down
23 changes: 16 additions & 7 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/dapr/dapr/pkg/config"
diag "github.com/dapr/dapr/pkg/diagnostics"
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
"github.com/dapr/dapr/pkg/healthz"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/modes"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
Expand Down Expand Up @@ -144,6 +145,7 @@ type actorsRuntime struct {
closed atomic.Bool
closeCh chan struct{}
apiLevel atomic.Uint32
htarget healthz.Target

lock sync.Mutex
internalReminderInProgress map[string]struct{}
Expand All @@ -165,6 +167,7 @@ type ActorsOpts struct {
Security security.Handler
SchedulerClients *clients.Clients
SchedulerReminders bool
Healthz healthz.Healthz

// TODO: @joshvanl Remove in Dapr 1.12 when ActorStateTTL is finalized.
StateTTLEnabled bool
Expand Down Expand Up @@ -194,6 +197,7 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,
internalActors: haxmap.New[string, InternalActor](32),
compStore: opts.CompStore,
sec: opts.Security,
htarget: opts.Healthz.AddTarget(),

internalReminderInProgress: map[string]struct{}{},
schedulerReminderFeatureEnabled: opts.SchedulerReminders,
Expand Down Expand Up @@ -242,22 +246,25 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,
}
log.Debug("Using Scheduler service for reminders.")
a.actorsReminders = reminders.NewScheduler(reminders.SchedulerOptions{
Clients: opts.Config.SchedulerClients,
Namespace: opts.Config.Namespace,
AppID: opts.Config.AppID,
Clients: opts.Config.SchedulerClients,
Namespace: opts.Config.Namespace,
AppID: opts.Config.AppID,
ProviderOpts: providerOpts,
ListActorTypesFn: a.Entities,
Healthz: opts.Healthz,
})
} else {
factory, err := opts.Config.GetRemindersProvider(a.placement)
if err != nil {
return nil, fmt.Errorf("failed to initialize reminders provider: %w", err)
}
a.actorsReminders = factory(providerOpts)

a.actorsReminders.SetExecuteReminderFn(a.executeReminder)
a.actorsReminders.SetStateStoreProviderFn(a.stateStore)
a.actorsReminders.SetLookupActorFn(a.isActorLocallyHosted)
}

a.actorsReminders.SetExecuteReminderFn(a.executeReminder)
a.actorsReminders.SetStateStoreProviderFn(a.stateStore)
a.actorsReminders.SetLookupActorFn(a.isActorLocallyHosted)

a.idleActorProcessor = eventqueue.NewProcessor[string, *actor](a.idleProcessorExecuteFn).WithClock(clock)
return a, nil
}
Expand Down Expand Up @@ -294,6 +301,8 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) {
return errors.New("actors runtime has already been closed")
}

defer a.htarget.Ready()

if len(a.actorsConfig.ActorsService) == 0 {
return errors.New("actors: couldn't connect to actors service: address is empty")
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/dapr/dapr/pkg/apis/resiliency/v1alpha1"
"github.com/dapr/dapr/pkg/channel"
"github.com/dapr/dapr/pkg/config"
"github.com/dapr/dapr/pkg/healthz"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/modes"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
Expand Down Expand Up @@ -207,6 +208,7 @@ func (b *runtimeBuilder) buildActorRuntime(t *testing.T) *actorsRuntime {
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.FromConfigurations(log, testResiliency),
StateStoreName: storeName,
Healthz: healthz.New(),
}, clock)
require.NoError(t, err)

Expand Down Expand Up @@ -235,6 +237,7 @@ func newTestActorsRuntimeWithMock(t *testing.T, appChannel channel.AppChannel) *
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Healthz: healthz.New(),
MockPlacement: NewMockPlacement(TestAppID),
}, clock)
require.NoError(t, err)
Expand All @@ -258,6 +261,7 @@ func newTestActorsRuntimeWithMockWithoutPlacement(t *testing.T, appChannel chann
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Healthz: healthz.New(),
}, clock)
require.NoError(t, err)

Expand All @@ -280,6 +284,7 @@ func newTestActorsRuntimeWithMockAndNoStore(t *testing.T, appChannel channel.App
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Healthz: healthz.New(),
}, clock)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit cd965f5

Please sign in to comment.