Skip to content

Commit aaf287c

Browse files
committed
Cache HelmRepository index files
If implemented, will provide users with a way to cache index files. This addresses issues where the index file is loaded and unmarshalled in concurrent reconciliation resulting in a heavy memory footprint. The caching strategy used is cache aside, and the cache is a k/v store with expiration. The cache number of entries and ttl for entries are configurable. The cache is optional and is disabled by default Signed-off-by: Soule BA <soule@weave.works>
1 parent e4d0b53 commit aaf287c

File tree

7 files changed

+366
-4
lines changed

7 files changed

+366
-4
lines changed

api/v1beta2/condition_types.go

+3
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,7 @@ const (
8080

8181
// SymlinkUpdateFailedReason signals a failure in updating a symlink.
8282
SymlinkUpdateFailedReason string = "SymlinkUpdateFailed"
83+
84+
// CacheOperationFailedReason signals a failure in cache operation.
85+
CacheOperationFailedReason string = "CacheOperationFailed"
8386
)

controllers/helmchart_controller.go

+34
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
securejoin "github.com/cyphar/filepath-securejoin"
3232
helmgetter "helm.sh/helm/v3/pkg/getter"
33+
helmrepo "helm.sh/helm/v3/pkg/repo"
3334
corev1 "k8s.io/api/core/v1"
3435
apierrs "k8s.io/apimachinery/pkg/api/errors"
3536
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -55,6 +56,7 @@ import (
5556
"github.com/fluxcd/pkg/untar"
5657

5758
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
59+
"github.com/fluxcd/source-controller/internal/cache"
5860
serror "github.com/fluxcd/source-controller/internal/error"
5961
"github.com/fluxcd/source-controller/internal/helm/chart"
6062
"github.com/fluxcd/source-controller/internal/helm/getter"
@@ -109,6 +111,9 @@ type HelmChartReconciler struct {
109111
Storage *Storage
110112
Getters helmgetter.Providers
111113
ControllerName string
114+
115+
Cache *cache.Cache
116+
TTL time.Duration
112117
}
113118

114119
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
@@ -451,6 +456,15 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
451456
}
452457
}
453458

459+
// Try to retrieve the repository index from the cache
460+
if r.Cache != nil {
461+
if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); err == nil {
462+
if found {
463+
chartRepo.Index = index.(*helmrepo.IndexFile)
464+
}
465+
}
466+
}
467+
454468
// Construct the chart builder with scoped configuration
455469
cb := chart.NewRemoteBuilder(chartRepo)
456470
opts := chart.BuildOptions{
@@ -474,6 +488,26 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
474488
return sreconcile.ResultEmpty, err
475489
}
476490

491+
defer func() {
492+
// Cache the index if it was successfully retrieved
493+
// and the chart was successfully built
494+
if r.Cache != nil && chartRepo.Index != nil {
495+
// The cache key have to be safe in multi-tenancy environments,
496+
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
497+
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
498+
err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL)
499+
if err != nil {
500+
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %v", err)
501+
}
502+
503+
}
504+
505+
// Delete the index reference
506+
if chartRepo.Index != nil {
507+
chartRepo.Unload()
508+
}
509+
}()
510+
477511
*b = *build
478512
return sreconcile.ResultSuccess, nil
479513
}

controllers/suite_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/fluxcd/pkg/testserver"
3636

3737
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
38+
"github.com/fluxcd/source-controller/internal/cache"
3839
// +kubebuilder:scaffold:imports
3940
)
4041

@@ -126,12 +127,15 @@ func TestMain(m *testing.M) {
126127
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
127128
}
128129

130+
cache := cache.New(5, 1*time.Second)
129131
if err := (&HelmChartReconciler{
130132
Client: testEnv,
131133
EventRecorder: record.NewFakeRecorder(32),
132134
Metrics: testMetricsH,
133135
Getters: testGetters,
134136
Storage: testStorage,
137+
Cache: cache,
138+
TTL: 1 * time.Second,
135139
}).SetupWithManager(testEnv); err != nil {
136140
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
137141
}

internal/cache/cache.go

+218
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package cache
2+
3+
import (
4+
"fmt"
5+
"runtime"
6+
"sync"
7+
"time"
8+
)
9+
10+
// NOTE: this is heavily based on patrickmn/go-cache:
11+
// https://github.com/patrickmn/go-cache
12+
13+
// Cache is a thread-safe in-memory key/value store.
14+
type Cache struct {
15+
*cache
16+
}
17+
18+
// Item is an item stored in the cache.
19+
type Item struct {
20+
Object interface{}
21+
Expiration int64
22+
}
23+
24+
type cache struct {
25+
// Items holds the elements in the cache.
26+
Items map[string]Item
27+
// Maximum number of items the cache can hold.
28+
MaxItems int
29+
mu sync.RWMutex
30+
janitor *janitor
31+
}
32+
33+
// ItemCount returns the number of items in the cache.
34+
// This may include items that have expired, but have not yet been cleaned up.
35+
func (c *cache) ItemCount() int {
36+
c.mu.RLock()
37+
n := len(c.Items)
38+
c.mu.RUnlock()
39+
return n
40+
}
41+
42+
func (c *cache) set(key string, value interface{}, expiration time.Duration) {
43+
var e int64
44+
if expiration > 0 {
45+
e = time.Now().Add(expiration).UnixNano()
46+
}
47+
48+
c.Items[key] = Item{
49+
Object: value,
50+
Expiration: e,
51+
}
52+
}
53+
54+
// Set adds an item to the cache, replacing any existing item.
55+
// If expiration is zero, the item never expires.
56+
// If the cache is full, Set will return an error.
57+
func (c *cache) Set(key string, value interface{}, expiration time.Duration) error {
58+
c.mu.Lock()
59+
_, found := c.Items[key]
60+
if found {
61+
c.set(key, value, expiration)
62+
c.mu.Unlock()
63+
return nil
64+
}
65+
66+
if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
67+
c.set(key, value, expiration)
68+
c.mu.Unlock()
69+
return nil
70+
}
71+
72+
c.mu.Unlock()
73+
return fmt.Errorf("Cache is full")
74+
}
75+
76+
func (c *cache) Add(key string, value interface{}, expiration time.Duration) error {
77+
c.mu.Lock()
78+
_, found := c.Items[key]
79+
if found {
80+
c.mu.Unlock()
81+
return fmt.Errorf("Item %s already exists", key)
82+
}
83+
84+
if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
85+
c.set(key, value, expiration)
86+
c.mu.Unlock()
87+
return nil
88+
}
89+
90+
c.mu.Unlock()
91+
return fmt.Errorf("Cache is full")
92+
}
93+
94+
func (c *cache) Get(key string) (interface{}, bool) {
95+
c.mu.RLock()
96+
item, found := c.Items[key]
97+
if !found {
98+
c.mu.RUnlock()
99+
return nil, false
100+
}
101+
if item.Expiration > 0 {
102+
if item.Expiration < time.Now().UnixNano() {
103+
c.mu.RUnlock()
104+
return nil, false
105+
}
106+
}
107+
c.mu.RUnlock()
108+
return item.Object, true
109+
}
110+
111+
func (c *cache) Delete(key string) {
112+
c.mu.Lock()
113+
delete(c.Items, key)
114+
c.mu.Unlock()
115+
}
116+
117+
func (c *cache) Clear() {
118+
c.mu.Lock()
119+
c.Items = make(map[string]Item)
120+
c.mu.Unlock()
121+
}
122+
123+
func (c *cache) HasExpired(key string) bool {
124+
c.mu.RLock()
125+
item, ok := c.Items[key]
126+
if !ok {
127+
c.mu.RUnlock()
128+
return true
129+
}
130+
if item.Expiration > 0 {
131+
if item.Expiration < time.Now().UnixNano() {
132+
c.mu.RUnlock()
133+
return true
134+
}
135+
}
136+
c.mu.RUnlock()
137+
return false
138+
}
139+
140+
func (c *cache) SetExpiration(key string, expiration time.Duration) {
141+
c.mu.Lock()
142+
item, ok := c.Items[key]
143+
if !ok {
144+
c.mu.Unlock()
145+
return
146+
}
147+
item.Expiration = time.Now().Add(expiration).UnixNano()
148+
c.mu.Unlock()
149+
}
150+
151+
func (c *cache) GetExpiration(key string) time.Duration {
152+
c.mu.RLock()
153+
item, ok := c.Items[key]
154+
if !ok {
155+
c.mu.RUnlock()
156+
return 0
157+
}
158+
if item.Expiration > 0 {
159+
if item.Expiration < time.Now().UnixNano() {
160+
c.mu.RUnlock()
161+
return 0
162+
}
163+
}
164+
c.mu.RUnlock()
165+
return time.Duration(item.Expiration - time.Now().UnixNano())
166+
}
167+
168+
func (c *cache) DeleteExpired() {
169+
c.mu.Lock()
170+
for k, v := range c.Items {
171+
if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() {
172+
delete(c.Items, k)
173+
}
174+
}
175+
c.mu.Unlock()
176+
}
177+
178+
type janitor struct {
179+
Interval time.Duration
180+
stop chan bool
181+
}
182+
183+
func (j *janitor) Run(c *cache) {
184+
ticker := time.NewTicker(j.Interval)
185+
for {
186+
select {
187+
case <-ticker.C:
188+
c.DeleteExpired()
189+
case <-j.stop:
190+
ticker.Stop()
191+
return
192+
}
193+
}
194+
}
195+
196+
func stopJanitor(c *Cache) {
197+
c.janitor.stop <- true
198+
}
199+
200+
func New(maxItems int, interval time.Duration) *Cache {
201+
c := &cache{
202+
Items: make(map[string]Item),
203+
MaxItems: maxItems,
204+
janitor: &janitor{
205+
Interval: interval,
206+
stop: make(chan bool),
207+
},
208+
}
209+
210+
C := &Cache{c}
211+
212+
if interval > 0 {
213+
go c.janitor.Run(c)
214+
runtime.SetFinalizer(C, stopJanitor)
215+
}
216+
217+
return C
218+
}

internal/cache/cache_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package cache
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
func TestCache(t *testing.T) {
11+
g := NewWithT(t)
12+
// create a cache that can hold 2 items and have no cleanup
13+
cache := New(2, 0)
14+
15+
// Get an Item from the cache
16+
if _, found := cache.Get("key1"); found {
17+
t.Error("Item should not be found")
18+
}
19+
20+
// Add an item to the cache
21+
err := cache.Add("key1", "value1", 0)
22+
g.Expect(err).ToNot(HaveOccurred())
23+
24+
// Get the item from the cache
25+
item, found := cache.Get("key1")
26+
g.Expect(found).To(BeTrue())
27+
g.Expect(item).To(Equal("value1"))
28+
29+
// Add another item to the cache
30+
err = cache.Add("key2", "value2", 0)
31+
g.Expect(err).ToNot(HaveOccurred())
32+
g.Expect(cache.ItemCount()).To(Equal(2))
33+
34+
// Get the item from the cache
35+
item, found = cache.Get("key2")
36+
g.Expect(found).To(BeTrue())
37+
g.Expect(item).To(Equal("value2"))
38+
39+
//Add an item to the cache
40+
err = cache.Add("key3", "value3", 0)
41+
g.Expect(err).To(HaveOccurred())
42+
43+
// Replace an item in the cache
44+
err = cache.Set("key2", "value3", 0)
45+
g.Expect(err).ToNot(HaveOccurred())
46+
47+
// Get the item from the cache
48+
item, found = cache.Get("key2")
49+
g.Expect(found).To(BeTrue())
50+
g.Expect(item).To(Equal("value3"))
51+
52+
// new cache with a cleanup interval of 1 second
53+
cache = New(2, 1*time.Second)
54+
55+
// Add an item to the cache
56+
err = cache.Add("key1", "value1", 2*time.Second)
57+
g.Expect(err).ToNot(HaveOccurred())
58+
59+
// Get the item from the cache
60+
item, found = cache.Get("key1")
61+
g.Expect(found).To(BeTrue())
62+
g.Expect(item).To(Equal("value1"))
63+
64+
// wait for the item to expire
65+
time.Sleep(3 * time.Second)
66+
67+
// Get the item from the cache
68+
item, found = cache.Get("key1")
69+
g.Expect(found).To(BeFalse())
70+
g.Expect(item).To(BeNil())
71+
}

0 commit comments

Comments
 (0)