From e2fb576c636f4c88dbb2741d1ad72469d6e6b43d Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Fri, 24 Mar 2023 21:49:26 -0600 Subject: wip: Rethink cache libraries --- lib/caching/cache.go | 64 ++++++++++++++++ lib/caching/linkedlist.go | 127 ++++++++++++++++++++++++++++++ lib/caching/lrucache.go | 178 +++++++++++++++++++++++++++++++++++++++++++ lib/caching/lrucache_test.go | 106 ++++++++++++++++++++++++++ 4 files changed, 475 insertions(+) create mode 100644 lib/caching/cache.go create mode 100644 lib/caching/linkedlist.go create mode 100644 lib/caching/lrucache.go create mode 100644 lib/caching/lrucache_test.go diff --git a/lib/caching/cache.go b/lib/caching/cache.go new file mode 100644 index 0000000..002a7ea --- /dev/null +++ b/lib/caching/cache.go @@ -0,0 +1,64 @@ +// Copyright (C) 2023 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package caching + +import ( + "context" +) + +// A Source is something that a Cache sits in front of. +type Source[K comparable, V any] interface { + // Load updates a 'V' (which is reused accross the lifetime of + // the cache, and may or may not be zero) to be set to the + // value for the 'K'. + Load(context.Context, K, *V) + + // Flush does whatever it needs to to ensure that if the + // program exited right now, no one would be upset. Flush + // being called does not mean that the entry is being evicted + // from the cache. + Flush(context.Context, *V) +} + +type Cache[K comparable, V any] interface { + // Aquire loads the value for `k` (possibly from the cache), + // records that value in to the cache, and increments the + // cache entry's in-use counter preventing it from being + // evicted. + // + // If the cache is at capacity and all entries are in-use, + // then Aquire blocks until an entry becomes available (via + // `Release`). + Acquire(context.Context, K) *V + + // Release decrements the in-use counter for the cache entry + // for `k`. If the in-use counter drops to 0, then that entry + // may be evicted. + // + // It is invalid (runtime-panic) to call Release for an entry + // that does not have a positive in-use counter. + Release(K) + + // Delete invalidates/removes an entry from the cache. Blocks + // until the in-user counter drops to 0. + // + // It is valid to call Delete on an entry that does not exist + // in the cache. + Delete(K) + + // Flush does whatever it needs to to ensure that if the + // program exited right now, no one would be upset. Flush + // does not empty the cache. + Flush(context.Context) +} + +// FuncSource implements Source. Load calls the function, and Flush +// is a no-op. +type FuncSource[K comparable, V any] func(context.Context, K, *V) + +var _ Source[int, string] = FuncSource[int, string](nil) + +func (fn FuncSource[K, V]) Load(ctx context.Context, k K, v *V) { fn(ctx, k, v) } +func (fn FuncSource[K, V]) Flush(context.Context, *V) {} diff --git a/lib/caching/linkedlist.go b/lib/caching/linkedlist.go new file mode 100644 index 0000000..a13a107 --- /dev/null +++ b/lib/caching/linkedlist.go @@ -0,0 +1,127 @@ +// Copyright (C) 2023 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package caching + +import ( + "fmt" +) + +// LinkedListEntry [T] is an entry in a LinkedList [T]. +type LinkedListEntry[T any] struct { + list *LinkedList[T] + older, newer *LinkedListEntry[T] + Value T +} + +func (entry *LinkedListEntry[T]) Older() *LinkedListEntry[T] { return entry.older } +func (entry *LinkedListEntry[T]) Newer() *LinkedListEntry[T] { return entry.newer } + +// LinkedList is a doubly-linked list. +// +// Rather than "head/tail", "front/back", or "next/prev", it has +// "oldest" and "newest". This is for to make code using it clearer; +// as the motivation for the LinkedList is as an implementation detail +// in LRU caches and FIFO queues, where this temporal naming is +// meaningful. Similarly, it does not implement many common features +// of a linked-list, because these applications do not need such +// features. +// +// Compared to `containers/list.List`, LinkedList has the +// disadvantages that it has fewer safety checks and fewer features in +// general. +type LinkedList[T any] struct { + oldest, newest *LinkedListEntry[T] +} + +// IsEmpty returns whether the list empty or not. +func (l *LinkedList[T]) IsEmpty() bool { + return l.oldest == nil +} + +// Delete removes an entry from the list. The entry is invalid once +// Delete returns, and should not be reused or have its .Value +// accessed. +// +// It is invalid (runtime-panic) to call Delete on a nil entry. +// +// It is invalid (runtime-panic) to call Delete on an entry that +// isn't in the list. +func (l *LinkedList[T]) Delete(entry *LinkedListEntry[T]) { + if entry.list != l { + panic(fmt.Errorf("LinkedList.Delete: entry %p not in list", entry)) + } + if entry.newer == nil { + l.newest = entry.older + } else { + entry.newer.older = entry.older + } + if entry.older == nil { + l.oldest = entry.newer + } else { + entry.older.newer = entry.newer + } + + // no memory leaks + entry.list = nil + entry.older = nil + entry.newer = nil +} + +// Store appends a value to the "newest" end of the list, returning +// the created entry. +// +// It is invalid (runtime-panic) to call Store on a nil entry. +// +// It is invalid (runtime-panic) to call Store on an entry that is +// already in a list. +func (l *LinkedList[T]) Store(entry *LinkedListEntry[T]) { + if entry.list != nil { + panic(fmt.Errorf("LinkedList.Store: entry %p is already in a list", entry)) + } + entry.list = l + entry.older = l.newest + l.newest = entry + if entry.older == nil { + l.oldest = entry + } else { + entry.older.newer = entry + } +} + +// MoveToNewest moves an entry fron any position in the list to the +// "newest" end of the list. If the entry is already in the "newest" +// position, then MoveToNewest is a no-op. +// +// It is invalid (runtime-panic) to call MoveToNewest on a nil entry. +// +// It is invalid (runtime-panic) to call MoveToNewest on an entry that +// isn't in the list. +func (l *LinkedList[T]) MoveToNewest(entry *LinkedListEntry[T]) { + if entry.list != l { + panic(fmt.Errorf("LinkedList.MoveToNewest: entry %p not in list", entry)) + } + if entry.newer == nil { + // Already newest. + return + } + entry.newer.older = entry.older + if entry.older == nil { + l.oldest = entry.newer + } else { + entry.older.newer = entry.newer + } + + entry.older = l.newest + l.newest.newer = entry + + entry.newer = nil + l.newest = entry +} + +// Oldest returns the entry at the "oldest" end of the list, or nil if +// the list is empty. +func (l *LinkedList[T]) Oldest() *LinkedListEntry[T] { + return l.oldest +} diff --git a/lib/caching/lrucache.go b/lib/caching/lrucache.go new file mode 100644 index 0000000..c41da09 --- /dev/null +++ b/lib/caching/lrucache.go @@ -0,0 +1,178 @@ +// Copyright (C) 2023 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package caching + +import ( + "context" + "fmt" + "sync" +) + +type lruEntry[K comparable, V any] struct { + key K + val V + refs int + del chan struct{} +} + +type lruCache[K comparable, V any] struct { + cap int + src Source[K, V] + + mu sync.Mutex + + len int + + unused LinkedList[lruEntry[K, V]] + evictable LinkedList[lruEntry[K, V]] // only entries with .refs==0 + byName map[K]*LinkedListEntry[lruEntry[K, V]] + + waiters LinkedList[chan *LinkedListEntry[lruEntry[K, V]]] +} + +// NewLRUCache returns a new Cache with a simple Least-Recently-Used eviction +// policy. +// +// It is invalid (runtime-panic) to call NewLRUCache with a non-positive +// capacity or a nil source. +func NewLRUCache[K comparable, V any](cap int, src Source[K, V]) Cache[K, V] { + if cap <= 0 { + panic(fmt.Errorf("caching.NewLRUCache: invalid capacity: %v", cap)) + } + if src == nil { + panic(fmt.Errorf("caching.NewLRUCache: nil source")) + } + return &lruCache[K, V]{ + cap: cap, + src: src, + } +} + +// Acquire implements Cache. +func (c *lruCache[K, V]) Acquire(ctx context.Context, k K) *V { + c.mu.Lock() + defer c.mu.Unlock() + if c.byName == nil { + c.byName = make(map[K]*LinkedListEntry[lruEntry[K, V]], c.cap) + } + + entry, ok := c.byName[k] + if ok { + if entry.Value.refs == 0 { + c.evictable.Delete(entry) + } + entry.Value.refs++ + } else { + switch { + case !c.unused.IsEmpty(): + entry = c.unused.Oldest() + c.unused.Delete(entry) + case c.len < c.cap: + entry = new(LinkedListEntry[lruEntry[K, V]]) + c.len++ + case !c.evictable.IsEmpty(): + entry = c.evictable.Oldest() + c.evictable.Delete(entry) + delete(c.byName, entry.Value.key) + default: + ch := make(chan *LinkedListEntry[lruEntry[K, V]]) + c.waiters.Store(&LinkedListEntry[chan *LinkedListEntry[lruEntry[K, V]]]{Value: ch}) + c.mu.Unlock() + entry = <-ch + c.mu.Lock() + } + + entry.Value.key = k + c.src.Load(ctx, k, &entry.Value.val) + entry.Value.refs = 1 + + c.byName[k] = entry + } + + return &entry.Value.val +} + +// Release implements Cache. +func (c *lruCache[K, V]) Release(k K) { + c.mu.Lock() + defer c.mu.Unlock() + + entry, ok := c.byName[k] + if !ok || entry.Value.refs <= 0 { + panic(fmt.Errorf("caching.lruCache.Release called on key that is not held: %v", k)) + } + entry.Value.refs-- + if entry.Value.refs == 0 { + del := entry.Value.del != nil + if del { + close(entry.Value.del) + entry.Value.del = nil + } + if c.waiters.IsEmpty() { + // Add it to the free-list. + if del { + delete(c.byName, k) + c.unused.Store(entry) + } else { + c.evictable.Store(entry) + } + } else { + // Someone's waiting to pop something off of the + // free-list; bypass the free-list and hand it directly + // to them. + + // Make sure that no one aquires this entry between us + // writing it to the channel and the waiter calling + // c.mu.Lock(). + delete(c.byName, k) + + // Pass it to the waiter. + waiter := c.waiters.Oldest() + c.waiters.Delete(waiter) + waiter.Value <- entry + } + } +} + +// Delete implements Cache. +func (c *lruCache[K, V]) Delete(k K) { + c.mu.Lock() + + entry, ok := c.byName[k] + if !ok { + return + } + if entry.Value.refs == 0 { + delete(c.byName, k) + if c.waiters.IsEmpty() { + c.unused.Store(entry) + } else { + waiter := c.waiters.Oldest() + c.waiters.Delete(waiter) + waiter.Value <- entry + } + c.mu.Unlock() + } else { + if entry.Value.del == nil { + entry.Value.del = make(chan struct{}) + } + ch := entry.Value.del + c.mu.Unlock() + <-ch + } +} + +// Flush implements Cache. +func (c *lruCache[K, V]) Flush(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, entry := range c.byName { + c.src.Flush(ctx, &entry.Value.val) + } + for entry := c.unused.Oldest(); entry != nil; entry = entry.Newer() { + c.src.Flush(ctx, &entry.Value.val) + } +} diff --git a/lib/caching/lrucache_test.go b/lib/caching/lrucache_test.go new file mode 100644 index 0000000..a9d4fe0 --- /dev/null +++ b/lib/caching/lrucache_test.go @@ -0,0 +1,106 @@ +// Copyright (C) 2023 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package caching + +import ( + "context" + "runtime/debug" + "testing" + "time" + + "github.com/datawire/dlib/dlog" + + "github.com/stretchr/testify/assert" +) + +func TestLRUBlocking(t *testing.T) { + t.Parallel() + const tick = time.Second / 2 + + ctx := dlog.NewTestContext(t, false) + + cache := NewLRUCache[int, int](4, + FuncSource[int, int](func(_ context.Context, k int, v *int) { *v = k * k })) + + assert.Equal(t, 1, *cache.Acquire(ctx, 1)) + assert.Equal(t, 4, *cache.Acquire(ctx, 2)) + assert.Equal(t, 9, *cache.Acquire(ctx, 3)) + assert.Equal(t, 16, *cache.Acquire(ctx, 4)) + + ch := make(chan int) + start := time.Now() + go func() { + ch <- *cache.Acquire(ctx, 5) + }() + go func() { + time.Sleep(tick) + cache.Release(3) + }() + result := <-ch + dur := time.Since(start) + assert.Equal(t, 25, result) + assert.Greater(t, dur, tick) +} + +//nolint:paralleltest // Can't be parallel because we test testing.AllocsPerRun. +func TestLRUAllocs(t *testing.T) { + const ( + cacheLen = 8 + bigNumber = 128 + ) + + ctx := dlog.NewTestContext(t, false) + + evictions := 0 + cache := NewLRUCache[int, int](cacheLen, FuncSource[int, int](func(_ context.Context, k int, v *int) { + if *v > 0 { + evictions++ + } + *v = k + })) + + i := 1 + store := func() { + cache.Acquire(ctx, i) + cache.Release(i) + i++ + } + + // Disable the GC temporarily to prevent cache.byAge.pool from + // being cleaned in the middle of an AllocsPerRun and causing + // spurious allocations. + percent := debug.SetGCPercent(-1) + defer debug.SetGCPercent(percent) + + // 1 alloc each as we fill the cache + assert.Equal(t, float64(1), testing.AllocsPerRun(cacheLen-1, store)) + assert.Equal(t, 0, evictions) + // after that, it should be alloc-free + assert.Equal(t, float64(0), testing.AllocsPerRun(1, store)) + assert.Equal(t, 2, evictions) + assert.Equal(t, float64(0), testing.AllocsPerRun(bigNumber, store)) + assert.Equal(t, 3+bigNumber, evictions) + // check the len + assert.Equal(t, cacheLen, len(cache.(*lruCache[int, int]).byName)) + cnt := 0 + for entry := cache.(*lruCache[int, int]).evictable.Oldest(); entry != nil; entry = entry.Newer() { + cnt++ + } + assert.Equal(t, cacheLen, cnt) + // check contents + cnt = 0 + for j := i - 1; j > 0; j-- { + entry, ok := cache.(*lruCache[int, int]).byName[j] + if cnt < cacheLen { + if assert.True(t, ok, j) { + val := entry.Value.val + assert.Equal(t, j, val, j) + } + cnt++ + } else { + assert.False(t, ok, j) + } + } +} -- cgit v1.2.3