summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Shumaker <lukeshu@lukeshu.com>2023-03-24 21:49:26 -0600
committerLuke Shumaker <lukeshu@lukeshu.com>2023-03-26 11:06:57 -0600
commite2fb576c636f4c88dbb2741d1ad72469d6e6b43d (patch)
treee5f752d2d3df3634d4f8dd75cee01e251556c868
parentbf5eed5af5c34b8cf9dc2985a7c4475602929bb1 (diff)
wip: Rethink cache libraries
-rw-r--r--lib/caching/cache.go64
-rw-r--r--lib/caching/linkedlist.go127
-rw-r--r--lib/caching/lrucache.go178
-rw-r--r--lib/caching/lrucache_test.go106
4 files changed, 475 insertions, 0 deletions
diff --git a/lib/caching/cache.go b/lib/caching/cache.go
new file mode 100644
index 0000000..002a7ea
--- /dev/null
+++ b/lib/caching/cache.go
@@ -0,0 +1,64 @@
+// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package caching
+
+import (
+ "context"
+)
+
+// A Source is something that a Cache sits in front of.
+type Source[K comparable, V any] interface {
+ // Load updates a 'V' (which is reused accross the lifetime of
+ // the cache, and may or may not be zero) to be set to the
+ // value for the 'K'.
+ Load(context.Context, K, *V)
+
+ // Flush does whatever it needs to to ensure that if the
+ // program exited right now, no one would be upset. Flush
+ // being called does not mean that the entry is being evicted
+ // from the cache.
+ Flush(context.Context, *V)
+}
+
+type Cache[K comparable, V any] interface {
+ // Aquire loads the value for `k` (possibly from the cache),
+ // records that value in to the cache, and increments the
+ // cache entry's in-use counter preventing it from being
+ // evicted.
+ //
+ // If the cache is at capacity and all entries are in-use,
+ // then Aquire blocks until an entry becomes available (via
+ // `Release`).
+ Acquire(context.Context, K) *V
+
+ // Release decrements the in-use counter for the cache entry
+ // for `k`. If the in-use counter drops to 0, then that entry
+ // may be evicted.
+ //
+ // It is invalid (runtime-panic) to call Release for an entry
+ // that does not have a positive in-use counter.
+ Release(K)
+
+ // Delete invalidates/removes an entry from the cache. Blocks
+ // until the in-user counter drops to 0.
+ //
+ // It is valid to call Delete on an entry that does not exist
+ // in the cache.
+ Delete(K)
+
+ // Flush does whatever it needs to to ensure that if the
+ // program exited right now, no one would be upset. Flush
+ // does not empty the cache.
+ Flush(context.Context)
+}
+
+// FuncSource implements Source. Load calls the function, and Flush
+// is a no-op.
+type FuncSource[K comparable, V any] func(context.Context, K, *V)
+
+var _ Source[int, string] = FuncSource[int, string](nil)
+
+func (fn FuncSource[K, V]) Load(ctx context.Context, k K, v *V) { fn(ctx, k, v) }
+func (fn FuncSource[K, V]) Flush(context.Context, *V) {}
diff --git a/lib/caching/linkedlist.go b/lib/caching/linkedlist.go
new file mode 100644
index 0000000..a13a107
--- /dev/null
+++ b/lib/caching/linkedlist.go
@@ -0,0 +1,127 @@
+// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package caching
+
+import (
+ "fmt"
+)
+
+// LinkedListEntry [T] is an entry in a LinkedList [T].
+type LinkedListEntry[T any] struct {
+ list *LinkedList[T]
+ older, newer *LinkedListEntry[T]
+ Value T
+}
+
+func (entry *LinkedListEntry[T]) Older() *LinkedListEntry[T] { return entry.older }
+func (entry *LinkedListEntry[T]) Newer() *LinkedListEntry[T] { return entry.newer }
+
+// LinkedList is a doubly-linked list.
+//
+// Rather than "head/tail", "front/back", or "next/prev", it has
+// "oldest" and "newest". This is for to make code using it clearer;
+// as the motivation for the LinkedList is as an implementation detail
+// in LRU caches and FIFO queues, where this temporal naming is
+// meaningful. Similarly, it does not implement many common features
+// of a linked-list, because these applications do not need such
+// features.
+//
+// Compared to `containers/list.List`, LinkedList has the
+// disadvantages that it has fewer safety checks and fewer features in
+// general.
+type LinkedList[T any] struct {
+ oldest, newest *LinkedListEntry[T]
+}
+
+// IsEmpty returns whether the list empty or not.
+func (l *LinkedList[T]) IsEmpty() bool {
+ return l.oldest == nil
+}
+
+// Delete removes an entry from the list. The entry is invalid once
+// Delete returns, and should not be reused or have its .Value
+// accessed.
+//
+// It is invalid (runtime-panic) to call Delete on a nil entry.
+//
+// It is invalid (runtime-panic) to call Delete on an entry that
+// isn't in the list.
+func (l *LinkedList[T]) Delete(entry *LinkedListEntry[T]) {
+ if entry.list != l {
+ panic(fmt.Errorf("LinkedList.Delete: entry %p not in list", entry))
+ }
+ if entry.newer == nil {
+ l.newest = entry.older
+ } else {
+ entry.newer.older = entry.older
+ }
+ if entry.older == nil {
+ l.oldest = entry.newer
+ } else {
+ entry.older.newer = entry.newer
+ }
+
+ // no memory leaks
+ entry.list = nil
+ entry.older = nil
+ entry.newer = nil
+}
+
+// Store appends a value to the "newest" end of the list, returning
+// the created entry.
+//
+// It is invalid (runtime-panic) to call Store on a nil entry.
+//
+// It is invalid (runtime-panic) to call Store on an entry that is
+// already in a list.
+func (l *LinkedList[T]) Store(entry *LinkedListEntry[T]) {
+ if entry.list != nil {
+ panic(fmt.Errorf("LinkedList.Store: entry %p is already in a list", entry))
+ }
+ entry.list = l
+ entry.older = l.newest
+ l.newest = entry
+ if entry.older == nil {
+ l.oldest = entry
+ } else {
+ entry.older.newer = entry
+ }
+}
+
+// MoveToNewest moves an entry fron any position in the list to the
+// "newest" end of the list. If the entry is already in the "newest"
+// position, then MoveToNewest is a no-op.
+//
+// It is invalid (runtime-panic) to call MoveToNewest on a nil entry.
+//
+// It is invalid (runtime-panic) to call MoveToNewest on an entry that
+// isn't in the list.
+func (l *LinkedList[T]) MoveToNewest(entry *LinkedListEntry[T]) {
+ if entry.list != l {
+ panic(fmt.Errorf("LinkedList.MoveToNewest: entry %p not in list", entry))
+ }
+ if entry.newer == nil {
+ // Already newest.
+ return
+ }
+ entry.newer.older = entry.older
+ if entry.older == nil {
+ l.oldest = entry.newer
+ } else {
+ entry.older.newer = entry.newer
+ }
+
+ entry.older = l.newest
+ l.newest.newer = entry
+
+ entry.newer = nil
+ l.newest = entry
+}
+
+// Oldest returns the entry at the "oldest" end of the list, or nil if
+// the list is empty.
+func (l *LinkedList[T]) Oldest() *LinkedListEntry[T] {
+ return l.oldest
+}
diff --git a/lib/caching/lrucache.go b/lib/caching/lrucache.go
new file mode 100644
index 0000000..c41da09
--- /dev/null
+++ b/lib/caching/lrucache.go
@@ -0,0 +1,178 @@
+// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package caching
+
+import (
+ "context"
+ "fmt"
+ "sync"
+)
+
+type lruEntry[K comparable, V any] struct {
+ key K
+ val V
+ refs int
+ del chan struct{}
+}
+
+type lruCache[K comparable, V any] struct {
+ cap int
+ src Source[K, V]
+
+ mu sync.Mutex
+
+ len int
+
+ unused LinkedList[lruEntry[K, V]]
+ evictable LinkedList[lruEntry[K, V]] // only entries with .refs==0
+ byName map[K]*LinkedListEntry[lruEntry[K, V]]
+
+ waiters LinkedList[chan *LinkedListEntry[lruEntry[K, V]]]
+}
+
+// NewLRUCache returns a new Cache with a simple Least-Recently-Used eviction
+// policy.
+//
+// It is invalid (runtime-panic) to call NewLRUCache with a non-positive
+// capacity or a nil source.
+func NewLRUCache[K comparable, V any](cap int, src Source[K, V]) Cache[K, V] {
+ if cap <= 0 {
+ panic(fmt.Errorf("caching.NewLRUCache: invalid capacity: %v", cap))
+ }
+ if src == nil {
+ panic(fmt.Errorf("caching.NewLRUCache: nil source"))
+ }
+ return &lruCache[K, V]{
+ cap: cap,
+ src: src,
+ }
+}
+
+// Acquire implements Cache.
+func (c *lruCache[K, V]) Acquire(ctx context.Context, k K) *V {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.byName == nil {
+ c.byName = make(map[K]*LinkedListEntry[lruEntry[K, V]], c.cap)
+ }
+
+ entry, ok := c.byName[k]
+ if ok {
+ if entry.Value.refs == 0 {
+ c.evictable.Delete(entry)
+ }
+ entry.Value.refs++
+ } else {
+ switch {
+ case !c.unused.IsEmpty():
+ entry = c.unused.Oldest()
+ c.unused.Delete(entry)
+ case c.len < c.cap:
+ entry = new(LinkedListEntry[lruEntry[K, V]])
+ c.len++
+ case !c.evictable.IsEmpty():
+ entry = c.evictable.Oldest()
+ c.evictable.Delete(entry)
+ delete(c.byName, entry.Value.key)
+ default:
+ ch := make(chan *LinkedListEntry[lruEntry[K, V]])
+ c.waiters.Store(&LinkedListEntry[chan *LinkedListEntry[lruEntry[K, V]]]{Value: ch})
+ c.mu.Unlock()
+ entry = <-ch
+ c.mu.Lock()
+ }
+
+ entry.Value.key = k
+ c.src.Load(ctx, k, &entry.Value.val)
+ entry.Value.refs = 1
+
+ c.byName[k] = entry
+ }
+
+ return &entry.Value.val
+}
+
+// Release implements Cache.
+func (c *lruCache[K, V]) Release(k K) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ entry, ok := c.byName[k]
+ if !ok || entry.Value.refs <= 0 {
+ panic(fmt.Errorf("caching.lruCache.Release called on key that is not held: %v", k))
+ }
+ entry.Value.refs--
+ if entry.Value.refs == 0 {
+ del := entry.Value.del != nil
+ if del {
+ close(entry.Value.del)
+ entry.Value.del = nil
+ }
+ if c.waiters.IsEmpty() {
+ // Add it to the free-list.
+ if del {
+ delete(c.byName, k)
+ c.unused.Store(entry)
+ } else {
+ c.evictable.Store(entry)
+ }
+ } else {
+ // Someone's waiting to pop something off of the
+ // free-list; bypass the free-list and hand it directly
+ // to them.
+
+ // Make sure that no one aquires this entry between us
+ // writing it to the channel and the waiter calling
+ // c.mu.Lock().
+ delete(c.byName, k)
+
+ // Pass it to the waiter.
+ waiter := c.waiters.Oldest()
+ c.waiters.Delete(waiter)
+ waiter.Value <- entry
+ }
+ }
+}
+
+// Delete implements Cache.
+func (c *lruCache[K, V]) Delete(k K) {
+ c.mu.Lock()
+
+ entry, ok := c.byName[k]
+ if !ok {
+ return
+ }
+ if entry.Value.refs == 0 {
+ delete(c.byName, k)
+ if c.waiters.IsEmpty() {
+ c.unused.Store(entry)
+ } else {
+ waiter := c.waiters.Oldest()
+ c.waiters.Delete(waiter)
+ waiter.Value <- entry
+ }
+ c.mu.Unlock()
+ } else {
+ if entry.Value.del == nil {
+ entry.Value.del = make(chan struct{})
+ }
+ ch := entry.Value.del
+ c.mu.Unlock()
+ <-ch
+ }
+}
+
+// Flush implements Cache.
+func (c *lruCache[K, V]) Flush(ctx context.Context) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for _, entry := range c.byName {
+ c.src.Flush(ctx, &entry.Value.val)
+ }
+ for entry := c.unused.Oldest(); entry != nil; entry = entry.Newer() {
+ c.src.Flush(ctx, &entry.Value.val)
+ }
+}
diff --git a/lib/caching/lrucache_test.go b/lib/caching/lrucache_test.go
new file mode 100644
index 0000000..a9d4fe0
--- /dev/null
+++ b/lib/caching/lrucache_test.go
@@ -0,0 +1,106 @@
+// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package caching
+
+import (
+ "context"
+ "runtime/debug"
+ "testing"
+ "time"
+
+ "github.com/datawire/dlib/dlog"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestLRUBlocking(t *testing.T) {
+ t.Parallel()
+ const tick = time.Second / 2
+
+ ctx := dlog.NewTestContext(t, false)
+
+ cache := NewLRUCache[int, int](4,
+ FuncSource[int, int](func(_ context.Context, k int, v *int) { *v = k * k }))
+
+ assert.Equal(t, 1, *cache.Acquire(ctx, 1))
+ assert.Equal(t, 4, *cache.Acquire(ctx, 2))
+ assert.Equal(t, 9, *cache.Acquire(ctx, 3))
+ assert.Equal(t, 16, *cache.Acquire(ctx, 4))
+
+ ch := make(chan int)
+ start := time.Now()
+ go func() {
+ ch <- *cache.Acquire(ctx, 5)
+ }()
+ go func() {
+ time.Sleep(tick)
+ cache.Release(3)
+ }()
+ result := <-ch
+ dur := time.Since(start)
+ assert.Equal(t, 25, result)
+ assert.Greater(t, dur, tick)
+}
+
+//nolint:paralleltest // Can't be parallel because we test testing.AllocsPerRun.
+func TestLRUAllocs(t *testing.T) {
+ const (
+ cacheLen = 8
+ bigNumber = 128
+ )
+
+ ctx := dlog.NewTestContext(t, false)
+
+ evictions := 0
+ cache := NewLRUCache[int, int](cacheLen, FuncSource[int, int](func(_ context.Context, k int, v *int) {
+ if *v > 0 {
+ evictions++
+ }
+ *v = k
+ }))
+
+ i := 1
+ store := func() {
+ cache.Acquire(ctx, i)
+ cache.Release(i)
+ i++
+ }
+
+ // Disable the GC temporarily to prevent cache.byAge.pool from
+ // being cleaned in the middle of an AllocsPerRun and causing
+ // spurious allocations.
+ percent := debug.SetGCPercent(-1)
+ defer debug.SetGCPercent(percent)
+
+ // 1 alloc each as we fill the cache
+ assert.Equal(t, float64(1), testing.AllocsPerRun(cacheLen-1, store))
+ assert.Equal(t, 0, evictions)
+ // after that, it should be alloc-free
+ assert.Equal(t, float64(0), testing.AllocsPerRun(1, store))
+ assert.Equal(t, 2, evictions)
+ assert.Equal(t, float64(0), testing.AllocsPerRun(bigNumber, store))
+ assert.Equal(t, 3+bigNumber, evictions)
+ // check the len
+ assert.Equal(t, cacheLen, len(cache.(*lruCache[int, int]).byName))
+ cnt := 0
+ for entry := cache.(*lruCache[int, int]).evictable.Oldest(); entry != nil; entry = entry.Newer() {
+ cnt++
+ }
+ assert.Equal(t, cacheLen, cnt)
+ // check contents
+ cnt = 0
+ for j := i - 1; j > 0; j-- {
+ entry, ok := cache.(*lruCache[int, int]).byName[j]
+ if cnt < cacheLen {
+ if assert.True(t, ok, j) {
+ val := entry.Value.val
+ assert.Equal(t, j, val, j)
+ }
+ cnt++
+ } else {
+ assert.False(t, ok, j)
+ }
+ }
+}