diff options
Diffstat (limited to 'lib/caching/lrucache.go')
-rw-r--r-- | lib/caching/lrucache.go | 217 |
1 files changed, 125 insertions, 92 deletions
diff --git a/lib/caching/lrucache.go b/lib/caching/lrucache.go index 095a78c..4d7c6b5 100644 --- a/lib/caching/lrucache.go +++ b/lib/caching/lrucache.go @@ -10,9 +10,32 @@ import ( "sync" ) +// NewLRUCache returns a new thread-safe Cache with a simple +// Least-Recently-Used eviction policy. +// +// It is invalid (runtime-panic) to call NewLRUCache with a +// non-positive capacity or a nil source. +func NewLRUCache[K comparable, V any](cap int, src Source[K, V]) Cache[K, V] { + if cap <= 0 { + panic(fmt.Errorf("caching.NewLRUCache: invalid capacity: %v", cap)) + } + if src == nil { + panic(fmt.Errorf("caching.NewLRUCache: nil source")) + } + ret := &lruCache[K, V]{ + cap: cap, + src: src, + } + for i := 0; i < cap; i++ { + ret.unused.Store(new(LinkedListEntry[lruEntry[K, V]])) + } + return ret +} + type lruEntry[K comparable, V any] struct { - key K - val V + key K + val V + refs int del chan struct{} // non-nil if a delete is waiting on .refs to drop to zero } @@ -23,36 +46,84 @@ type lruCache[K comparable, V any] struct { mu sync.Mutex + // Pinned entries are in .byName, but not in any LinkedList. unused LinkedList[lruEntry[K, V]] evictable LinkedList[lruEntry[K, V]] // only entries with .refs==0 byName map[K]*LinkedListEntry[lruEntry[K, V]] - waiters LinkedList[chan *LinkedListEntry[lruEntry[K, V]]] + waiters LinkedList[chan struct{}] } -// NewLRUCache returns a new thread-safe Cache with a simple Least-Recently-Used -// eviction policy. -// -// It is invalid (runtime-panic) to call NewLRUCache with a non-positive -// capacity or a nil source. -func NewLRUCache[K comparable, V any](cap int, src Source[K, V]) Cache[K, V] { - if cap <= 0 { - panic(fmt.Errorf("caching.NewLRUCache: invalid capacity: %v", cap)) +// Blocking primitives ///////////////////////////////////////////////////////// + +// Because of pinning, there might not actually be an available entry +// for us to use/evict. If we need an entry to use or evict, we'll +// call waitForAvail to block until there is en entry that is either +// unused or evictable. We'll give waiters FIFO priority. +func (c *lruCache[K, V]) waitForAvail() { + if !(c.unused.IsEmpty() && c.evictable.IsEmpty()) { + return } - if src == nil { - panic(fmt.Errorf("caching.NewLRUCache: nil source")) + ch := make(chan struct{}) + c.waiters.Store(&LinkedListEntry[chan struct{}]{Value: ch}) + c.mu.Unlock() + <-ch + c.mu.Lock() +} + +// notifyAvail is called when an entry becomes unused or evictable, +// and wakes up the highest-priority .waitForAvail() waiter (if there +// is one). +func (c *lruCache[K, V]) notifyAvail(entry *LinkedListEntry[lruEntry[K, V]]) { + waiter := c.waiters.Oldest + if waiter == nil { + return } - ret := &lruCache[K, V]{ - cap: cap, - src: src, + c.waiters.Delete(waiter) + close(waiter.Value) +} + +// Calling .Delete(k) on an entry that is pinned needs to block until +// the entry is no longer pinned. +func (c *lruCache[K, V]) unlockAndWaitForDel(entry *LinkedListEntry[lruEntry[K, V]]) { + if entry.Value.del == nil { + entry.Value.del = make(chan struct{}) } - for i := 0; i < cap; i++ { - c.unused.Store(new(LinkedListEntry[lruEntry[K, V]])) + ch := entry.Value.del + c.mu.Unlock() + <-ch +} + +// notifyOfDel unblocks any calls to .Delete(k), notifying them that +// the entry has been deleted and they can now return. +func (c *lruCache[K, V]) notifyOfDel(entry *LinkedListEntry[arcLiveEntry[K, V]]) { + if entry.Value.del != nil { + close(entry.Value.del) + entry.Value.del = nil } - return ret } -// Acquire implements Cache. +// Main implementation ///////////////////////////////////////////////////////// + +// lruReplace is the LRU(c) replacement policy. It returns an entry +// that is not in any list. +func (c *lruCache[K, V]) lruReplace() *LinkedListEntry[lruEntry[K, V]] { + c.waitForAvail() + + // If the cache isn't full, no need to do an eviction. + if entry := c.unused.Oldest; entry != nil { + c.unused.Delete(entry) + return entry + } + + // Replace the oldest entry. + entry := c.evictable.Oldest + c.evictable.Delete(entry) + delete(c.byName, entry.Value.key) + return entry +} + +// Acquire implements the 'Cache' interface. func (c *lruCache[K, V]) Acquire(ctx context.Context, k K) *V { c.mu.Lock() defer c.mu.Unlock() @@ -60,28 +131,14 @@ func (c *lruCache[K, V]) Acquire(ctx context.Context, k K) *V { c.byName = make(map[K]*LinkedListEntry[lruEntry[K, V]], c.cap) } - entry, ok := c.byName[k] - if ok { + entry := c.byName[k] + if entry != nil { if entry.Value.refs == 0 { c.evictable.Delete(entry) } entry.Value.refs++ } else { - switch { - case !c.unused.IsEmpty(): - entry = c.unused.Oldest() - c.unused.Delete(entry) - case !c.evictable.IsEmpty(): - entry = c.evictable.Oldest() - c.evictable.Delete(entry) - delete(c.byName, entry.Value.key) - default: - ch := make(chan *LinkedListEntry[lruEntry[K, V]]) - c.waiters.Store(&LinkedListEntry[chan *LinkedListEntry[lruEntry[K, V]]]{Value: ch}) - c.mu.Unlock() - entry = <-ch - c.mu.Lock() - } + entry = c.lruReplace() entry.Value.key = k c.src.Load(ctx, k, &entry.Value.val) @@ -93,77 +150,53 @@ func (c *lruCache[K, V]) Acquire(ctx context.Context, k K) *V { return &entry.Value.val } -// Release implements Cache. -func (c *lruCache[K, V]) Release(k K) { +// Delete implements the 'Cache' interface. +func (c *lruCache[K, V]) Delete(k K) { c.mu.Lock() - defer c.mu.Unlock() - entry, ok := c.byName[k] - if !ok || entry.Value.refs <= 0 { - panic(fmt.Errorf("caching.lruCache.Release called on key that is not held: %v", k)) + entry := c.byName[k] + if entry == nil { + return } - entry.Value.refs-- - if entry.Value.refs == 0 { - del := entry.Value.del != nil - if del { - close(entry.Value.del) - entry.Value.del = nil - } - if c.waiters.IsEmpty() { - // Add it to the free-list. - if del { - delete(c.byName, k) - c.unused.Store(entry) - } else { - c.evictable.Store(entry) - } - } else { - // Someone's waiting to pop something off of the - // free-list; bypass the free-list and hand it directly - // to them. + if entry.Value.refs > 0 { + // Let .Release(k) do the deletion when the + // refcount drops to 0. + c.unlockAndWaitForDel(entry) + return + } + delete(c.byName, k) + c.evictable.Delete(entry) + c.unused.Store(entry) - // Make sure that no one aquires this entry between us - // writing it to the channel and the waiter calling - // c.mu.Lock(). - delete(c.byName, k) + // No need to call c.notifyAvail(); if we were able to delete + // it, it was already available. - // Pass it to the waiter. - waiter := c.waiters.Oldest() - c.waiters.Delete(waiter) - waiter.Value <- entry - } - } + c.mu.Unlock() } -// Delete implements Cache. -func (c *lruCache[K, V]) Delete(k K) { +// Release implements the 'Cache' interface. +func (c *lruCache[K, V]) Release(k K) { c.mu.Lock() + defer c.mu.Unlock() - entry, ok := c.byName[k] - if !ok { - return + entry := c.byName[k] + if entry == nil || entry.Value.refs <= 0 { + panic(fmt.Errorf("caching.lruCache.Release called on key that is not held: %v", k)) } + + entry.Value.refs-- if entry.Value.refs == 0 { - delete(c.byName, k) - if c.waiters.IsEmpty() { + if entry.Value.del != nil { + delete(c.byName, k) c.unused.Store(entry) } else { - waiter := c.waiters.Oldest() - c.waiters.Delete(waiter) - waiter.Value <- entry - } - c.mu.Unlock() - } else { - if entry.Value.del == nil { - entry.Value.del = make(chan struct{}) + c.evictable.Store(entry) } - ch := entry.Value.del - c.mu.Unlock() - <-ch + c.notifyAvail(entry) } } -// Flush implements Cache. +// Flush implements the 'Cache' interface. func (c *lruCache[K, V]) Flush(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() @@ -171,7 +204,7 @@ func (c *lruCache[K, V]) Flush(ctx context.Context) { for _, entry := range c.byName { c.src.Flush(ctx, &entry.Value.val) } - for entry := c.unused.Oldest(); entry != nil; entry = entry.Newer() { + for entry := c.unused.Oldest; entry != nil; entry = entry.Newer { c.src.Flush(ctx, &entry.Value.val) } } |