Skip to content

Commit 6191ec6

Browse files
m25nwarshawd
andcommitted
use informer pattern in ConfigMapSource
Co-authored-by: Devon Warshaw <warshawd@vmware.com> Signed-off-by: Matthew Conger-Eldeen <matthewco@vmware.com>
1 parent d9d8786 commit 6191ec6

File tree

5 files changed

+160
-181
lines changed

5 files changed

+160
-181
lines changed

src/vizier/services/query_broker/script_runner/BUILD.bazel

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,13 @@ go_library(
4848
"@in_gopkg_yaml_v2//:yaml_v2",
4949
"@io_k8s_api//core/v1:core",
5050
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
51-
"@io_k8s_apimachinery//pkg/watch",
52-
"@io_k8s_client_go//kubernetes/typed/core/v1:core",
51+
"@io_k8s_apimachinery//pkg/labels",
52+
"@io_k8s_client_go//informers",
53+
"@io_k8s_client_go//informers/core/v1:core",
54+
"@io_k8s_client_go//kubernetes",
55+
"@io_k8s_client_go//listers/core/v1:core",
5356
"@io_k8s_client_go//rest",
57+
"@io_k8s_client_go//tools/cache",
5458
"@org_golang_google_grpc//codes",
5559
"@org_golang_google_grpc//metadata",
5660
"@org_golang_google_grpc//status",
@@ -84,11 +88,7 @@ pl_go_test(
8488
"@com_github_stretchr_testify//require",
8589
"@io_k8s_api//core/v1:core",
8690
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
87-
"@io_k8s_apimachinery//pkg/labels",
88-
"@io_k8s_apimachinery//pkg/runtime",
89-
"@io_k8s_apimachinery//pkg/watch",
9091
"@io_k8s_client_go//kubernetes/fake",
91-
"@io_k8s_client_go//testing",
9292
"@org_golang_google_grpc//:go_default_library",
9393
"@org_golang_google_grpc//codes",
9494
"@org_golang_google_grpc//status",

src/vizier/services/query_broker/script_runner/config_map_source.go

Lines changed: 108 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,111 +20,161 @@ package scriptrunner
2020

2121
import (
2222
"context"
23+
"reflect"
24+
"runtime"
25+
"sync/atomic"
2326
"time"
2427

2528
log "github.com/sirupsen/logrus"
2629
"gopkg.in/yaml.v2"
27-
v1 "k8s.io/api/core/v1"
30+
corev1 "k8s.io/api/core/v1"
2831
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29-
"k8s.io/apimachinery/pkg/watch"
30-
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
32+
"k8s.io/apimachinery/pkg/labels"
33+
"k8s.io/client-go/informers"
34+
informercorev1 "k8s.io/client-go/informers/core/v1"
35+
"k8s.io/client-go/kubernetes"
36+
listercorev1 "k8s.io/client-go/listers/core/v1"
37+
"k8s.io/client-go/tools/cache"
3138

3239
"px.dev/pixie/src/shared/cvmsgspb"
3340
"px.dev/pixie/src/utils"
3441
)
3542

3643
// ConfigMapSource pulls cron scripts from config maps.
3744
type ConfigMapSource struct {
38-
stop func()
39-
client clientv1.ConfigMapInterface
45+
stop func()
46+
informer informercorev1.ConfigMapInformer
4047
}
4148

4249
// NewConfigMapSource constructs a [Source] that extracts cron scripts from config maps with the label "purpose=cron-script".
4350
// Each config map must contain
4451
// - a script.pxl with the pixel script
4552
// - a configs.yaml which will be stored in the Configs field of [cvmsgspb.CronScript]
4653
// - a cron.yaml that contains a "frequency_s" key
47-
func NewConfigMapSource(client clientv1.ConfigMapInterface) *ConfigMapSource {
48-
return &ConfigMapSource{client: client}
54+
func NewConfigMapSource(client kubernetes.Interface, namespace string) *ConfigMapSource {
55+
return &ConfigMapSource{
56+
informer: informers.NewSharedInformerFactoryWithOptions(
57+
client,
58+
12*time.Hour,
59+
informers.WithNamespace(namespace),
60+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
61+
options.LabelSelector = "purpose=cron-script"
62+
}),
63+
).Core().V1().ConfigMaps(),
64+
}
4965
}
5066

5167
// Start watches for updates to matching configmaps and sends resulting updates on updatesCh.
52-
func (source *ConfigMapSource) Start(baseCtx context.Context, updatesCh chan<- *cvmsgspb.CronScriptUpdate) (map[string]*cvmsgspb.CronScript, error) {
53-
options := metav1.ListOptions{LabelSelector: "purpose=cron-script"}
54-
watcher, err := source.client.Watch(baseCtx, options)
68+
func (source *ConfigMapSource) Start(_ context.Context, updatesCh chan<- *cvmsgspb.CronScriptUpdate) (map[string]*cvmsgspb.CronScript, error) {
69+
stopCh := make(chan struct{})
70+
isInitialized := &atomic.Bool{}
71+
_, err := source.informer.Informer().AddEventHandler(configMapEventHandlers(isInitialized, updatesCh))
72+
if err != nil {
73+
return nil, err
74+
}
75+
go source.informer.Informer().Run(stopCh)
76+
waitUntilSynced(source.informer.Informer())
77+
initialScripts, err := getInitialScripts(source.informer.Lister())
5578
if err != nil {
79+
close(stopCh)
5680
return nil, err
5781
}
58-
go configMapUpdater(watcher, updatesCh)
59-
configmaps, err := source.client.List(baseCtx, options)
82+
isInitialized.Store(true)
83+
source.stop = func() { close(stopCh) }
84+
return initialScripts, nil
85+
}
86+
87+
func getInitialScripts(lister listercorev1.ConfigMapLister) (map[string]*cvmsgspb.CronScript, error) {
88+
configMaps, err := lister.List(labels.Everything())
6089
if err != nil {
61-
watcher.Stop()
6290
return nil, err
6391
}
6492
scripts := map[string]*cvmsgspb.CronScript{}
65-
for _, configmap := range configmaps.Items {
66-
id, cronScript, err := configmapToCronScript(&configmap)
93+
for _, configMap := range configMaps {
94+
id, cronScript, err := configmapToCronScript(configMap)
6795
if err != nil {
6896
logCronScriptParseError(err)
6997
continue
7098
}
7199
scripts[id] = cronScript
72100
}
73-
source.stop = watcher.Stop
74101
return scripts, nil
75102
}
76103

77-
// Stop stops further updates from being sent.
78-
func (source *ConfigMapSource) Stop() {
79-
source.stop()
80-
}
81-
82-
func configMapUpdater(watcher watch.Interface, updatesCh chan<- *cvmsgspb.CronScriptUpdate) {
83-
for event := range watcher.ResultChan() {
84-
switch event.Type {
85-
case watch.Modified, watch.Added:
86-
configmap := event.Object.(*v1.ConfigMap)
87-
id, script, err := configmapToCronScript(configmap)
88-
if err != nil {
89-
logCronScriptParseError(err)
90-
continue
91-
}
92-
cronScriptUpdate := &cvmsgspb.CronScriptUpdate{
93-
Msg: &cvmsgspb.CronScriptUpdate_UpsertReq{
94-
UpsertReq: &cvmsgspb.RegisterOrUpdateCronScriptRequest{
95-
Script: script,
96-
},
97-
},
98-
RequestID: id,
99-
Timestamp: time.Now().Unix(),
104+
func configMapEventHandlers(isInitialized *atomic.Bool, updatesCh chan<- *cvmsgspb.CronScriptUpdate) cache.ResourceEventHandlerFuncs {
105+
return cache.ResourceEventHandlerFuncs{
106+
AddFunc: func(obj interface{}) {
107+
configmap := obj.(*corev1.ConfigMap)
108+
if !isInitialized.Load() {
109+
return
100110
}
101-
updatesCh <- cronScriptUpdate
102-
case watch.Deleted:
103-
configmap := event.Object.(*v1.ConfigMap)
104-
id, script, err := configmapToCronScript(configmap)
105-
if err != nil {
106-
logCronScriptParseError(err)
107-
continue
111+
updatesCh <- makeUpdate(configmap)
112+
},
113+
UpdateFunc: func(oldObj, newObj interface{}) {
114+
if reflect.DeepEqual(oldObj, newObj) {
115+
return
108116
}
109-
cronScriptUpdate := &cvmsgspb.CronScriptUpdate{
110-
Msg: &cvmsgspb.CronScriptUpdate_DeleteReq{
111-
DeleteReq: &cvmsgspb.DeleteCronScriptRequest{
112-
ScriptID: script.ID,
113-
},
114-
},
115-
RequestID: id,
116-
Timestamp: time.Now().Unix(),
117-
}
118-
updatesCh <- cronScriptUpdate
119-
}
117+
configmap := newObj.(*corev1.ConfigMap)
118+
updatesCh <- makeUpdate(configmap)
119+
},
120+
DeleteFunc: func(obj interface{}) {
121+
configmap := obj.(*corev1.ConfigMap)
122+
updatesCh <- makeDelete(configmap)
123+
},
124+
}
125+
}
126+
127+
func makeDelete(configmap *corev1.ConfigMap) *cvmsgspb.CronScriptUpdate {
128+
_, script, err := configmapToCronScript(configmap)
129+
if err != nil {
130+
logCronScriptParseError(err)
131+
return nil
132+
}
133+
return &cvmsgspb.CronScriptUpdate{
134+
Msg: &cvmsgspb.CronScriptUpdate_DeleteReq{
135+
DeleteReq: &cvmsgspb.DeleteCronScriptRequest{
136+
ScriptID: script.ID,
137+
},
138+
},
139+
Timestamp: time.Now().Unix(),
140+
}
141+
}
142+
143+
func makeUpdate(configmap *corev1.ConfigMap) *cvmsgspb.CronScriptUpdate {
144+
_, script, err := configmapToCronScript(configmap)
145+
if err != nil {
146+
logCronScriptParseError(err)
147+
return nil
148+
}
149+
return &cvmsgspb.CronScriptUpdate{
150+
Msg: &cvmsgspb.CronScriptUpdate_UpsertReq{
151+
UpsertReq: &cvmsgspb.RegisterOrUpdateCronScriptRequest{
152+
Script: script,
153+
},
154+
},
155+
Timestamp: time.Now().Unix(),
120156
}
121157
}
122158

159+
// waitUntilSynced busy waits until the informer has completed its initial LIST call to the K8s API.
160+
// There is no other way to synchronize with the informer's first LIST call. Yielding to the go scheduler will ensure
161+
// that we don't lock up the underlying OS thread while we're busy waiting.
162+
func waitUntilSynced(informer cache.SharedInformer) {
163+
for !informer.HasSynced() {
164+
runtime.Gosched()
165+
}
166+
}
167+
168+
// Stop stops further updates from being sent.
169+
func (source *ConfigMapSource) Stop() {
170+
source.stop()
171+
}
172+
123173
func logCronScriptParseError(err error) {
124174
log.WithError(err).Error("Failed to parse cron.yaml from configmap cron script")
125175
}
126176

127-
func configmapToCronScript(configmap *v1.ConfigMap) (string, *cvmsgspb.CronScript, error) {
177+
func configmapToCronScript(configmap *corev1.ConfigMap) (string, *cvmsgspb.CronScript, error) {
128178
id := string(configmap.UID)
129179
cronScript := &cvmsgspb.CronScript{
130180
ID: utils.ProtoFromUUIDStrOrNil(id),

0 commit comments

Comments
 (0)