From f6f0a251ed962374f69e9fd7722dcd5c44aa58ad Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Fri, 24 Mar 2023 21:49:26 -0600 Subject: containers: Rethink the caching libraries --- lib/btrfs/io4_fs.go | 238 +++++----- lib/btrfsutil/open.go | 1 + lib/btrfsutil/rebuilt_forrest.go | 43 +- lib/btrfsutil/rebuilt_readitem.go | 20 +- lib/btrfsutil/rebuilt_tree.go | 136 +++--- lib/containers/arcache.go | 831 +++++++++++++++++++++++----------- lib/containers/arcache_string_test.go | 34 +- lib/containers/arcache_test.go | 296 +++++++++--- lib/containers/cache.go | 64 +++ lib/containers/linkedlist.go | 101 +++-- lib/containers/lrucache.go | 251 ++++++---- lib/containers/lrucache_test.go | 86 ++-- lib/diskio/file_blockbuf.go | 97 ++-- lib/diskio/file_state_test.go | 5 +- 14 files changed, 1453 insertions(+), 750 deletions(-) create mode 100644 lib/containers/cache.go (limited to 'lib') diff --git a/lib/btrfs/io4_fs.go b/lib/btrfs/io4_fs.go index 4a68695..a611f35 100644 --- a/lib/btrfs/io4_fs.go +++ b/lib/btrfs/io4_fs.go @@ -75,10 +75,10 @@ type Subvolume struct { rootInfo btrfstree.TreeRoot rootErr error - bareInodeCache containers.ARCache[btrfsprim.ObjID, *BareInode] - fullInodeCache containers.ARCache[btrfsprim.ObjID, *FullInode] - dirCache containers.ARCache[btrfsprim.ObjID, *Dir] - fileCache containers.ARCache[btrfsprim.ObjID, *File] + bareInodeCache containers.Cache[btrfsprim.ObjID, BareInode] + fullInodeCache containers.Cache[btrfsprim.ObjID, FullInode] + dirCache containers.Cache[btrfsprim.ObjID, Dir] + fileCache containers.Cache[btrfsprim.ObjID, File] } func NewSubvolume( @@ -109,10 +109,14 @@ func NewSubvolume( } sv.rootInfo = *rootInfo - sv.bareInodeCache.MaxLen = textui.Tunable(128) - sv.fullInodeCache.MaxLen = textui.Tunable(128) - sv.dirCache.MaxLen = textui.Tunable(128) - sv.fileCache.MaxLen = textui.Tunable(128) + sv.bareInodeCache = containers.NewARCache[btrfsprim.ObjID, BareInode](textui.Tunable(128), + containers.SourceFunc[btrfsprim.ObjID, BareInode](sv.loadBareInode)) + sv.fullInodeCache = containers.NewARCache[btrfsprim.ObjID, FullInode](textui.Tunable(128), + containers.SourceFunc[btrfsprim.ObjID, FullInode](sv.loadFullInode)) + sv.dirCache = containers.NewARCache[btrfsprim.ObjID, Dir](textui.Tunable(128), + containers.SourceFunc[btrfsprim.ObjID, Dir](sv.loadDir)) + sv.fileCache = containers.NewARCache[btrfsprim.ObjID, File](textui.Tunable(128), + containers.SourceFunc[btrfsprim.ObjID, File](sv.loadFile)) return sv } @@ -125,113 +129,128 @@ func (sv *Subvolume) GetRootInode() (btrfsprim.ObjID, error) { return sv.rootInfo.RootInode, sv.rootErr } -func (sv *Subvolume) LoadBareInode(inode btrfsprim.ObjID) (*BareInode, error) { - val := containers.LoadOrElse[btrfsprim.ObjID, *BareInode](&sv.bareInodeCache, inode, func(inode btrfsprim.ObjID) (val *BareInode) { - val = &BareInode{ - Inode: inode, - } - item, err := sv.fs.TreeLookup(sv.TreeID, btrfsprim.Key{ - ObjectID: inode, - ItemType: btrfsitem.INODE_ITEM_KEY, - Offset: 0, - }) - if err != nil { - val.Errs = append(val.Errs, err) - return val - } +func (sv *Subvolume) AcquireBareInode(inode btrfsprim.ObjID) (*BareInode, error) { + val := sv.bareInodeCache.Acquire(sv.ctx, inode) + if val.InodeItem == nil { + sv.bareInodeCache.Release(inode) + return nil, val.Errs + } + return val, nil +} - switch itemBody := item.Body.(type) { - case *btrfsitem.Inode: - bodyCopy := itemBody.Clone() - val.InodeItem = &bodyCopy - case *btrfsitem.Error: - val.Errs = append(val.Errs, fmt.Errorf("malformed inode: %w", itemBody.Err)) - default: - panic(fmt.Errorf("should not happen: INODE_ITEM has unexpected item type: %T", itemBody)) - } +func (sv *Subvolume) ReleaseBareInode(inode btrfsprim.ObjID) { + sv.bareInodeCache.Release(inode) +} - return val +func (sv *Subvolume) loadBareInode(_ context.Context, inode btrfsprim.ObjID, val *BareInode) { + *val = BareInode{ + Inode: inode, + } + item, err := sv.fs.TreeLookup(sv.TreeID, btrfsprim.Key{ + ObjectID: inode, + ItemType: btrfsitem.INODE_ITEM_KEY, + Offset: 0, }) - if val.InodeItem == nil { + if err != nil { + val.Errs = append(val.Errs, err) + return + } + + switch itemBody := item.Body.(type) { + case *btrfsitem.Inode: + bodyCopy := itemBody.Clone() + val.InodeItem = &bodyCopy + case *btrfsitem.Error: + val.Errs = append(val.Errs, fmt.Errorf("malformed inode: %w", itemBody.Err)) + default: + panic(fmt.Errorf("should not happen: INODE_ITEM has unexpected item type: %T", itemBody)) + } +} + +func (sv *Subvolume) AcquireFullInode(inode btrfsprim.ObjID) (*FullInode, error) { + val := sv.fullInodeCache.Acquire(sv.ctx, inode) + if val.InodeItem == nil && val.OtherItems == nil { + sv.fullInodeCache.Release(inode) return nil, val.Errs } return val, nil } -func (sv *Subvolume) LoadFullInode(inode btrfsprim.ObjID) (*FullInode, error) { - val := containers.LoadOrElse[btrfsprim.ObjID, *FullInode](&sv.fullInodeCache, inode, func(indoe btrfsprim.ObjID) (val *FullInode) { - val = &FullInode{ - BareInode: BareInode{ - Inode: inode, - }, - XAttrs: make(map[string]string), - } - items, err := sv.fs.TreeSearchAll(sv.TreeID, btrfstree.SearchObject(inode)) - if err != nil { - val.Errs = append(val.Errs, err) - if len(items) == 0 { - return val - } +func (sv *Subvolume) ReleaseFullInode(inode btrfsprim.ObjID) { + sv.fullInodeCache.Release(inode) +} + +func (sv *Subvolume) loadFullInode(_ context.Context, inode btrfsprim.ObjID, val *FullInode) { + *val = FullInode{ + BareInode: BareInode{ + Inode: inode, + }, + XAttrs: make(map[string]string), + } + items, err := sv.fs.TreeSearchAll(sv.TreeID, btrfstree.SearchObject(inode)) + if err != nil { + val.Errs = append(val.Errs, err) + if len(items) == 0 { + return } - for _, item := range items { - switch item.Key.ItemType { - case btrfsitem.INODE_ITEM_KEY: - switch itemBody := item.Body.(type) { - case *btrfsitem.Inode: - if val.InodeItem != nil { - if !reflect.DeepEqual(itemBody, *val.InodeItem) { - val.Errs = append(val.Errs, fmt.Errorf("multiple inodes")) - } - continue + } + for _, item := range items { + switch item.Key.ItemType { + case btrfsitem.INODE_ITEM_KEY: + switch itemBody := item.Body.(type) { + case *btrfsitem.Inode: + if val.InodeItem != nil { + if !reflect.DeepEqual(itemBody, *val.InodeItem) { + val.Errs = append(val.Errs, fmt.Errorf("multiple inodes")) } - bodyCopy := itemBody.Clone() - val.InodeItem = &bodyCopy - case *btrfsitem.Error: - val.Errs = append(val.Errs, fmt.Errorf("malformed INODE_ITEM: %w", itemBody.Err)) - default: - panic(fmt.Errorf("should not happen: INODE_ITEM has unexpected item type: %T", itemBody)) - } - case btrfsitem.XATTR_ITEM_KEY: - switch itemBody := item.Body.(type) { - case *btrfsitem.DirEntry: - val.XAttrs[string(itemBody.Name)] = string(itemBody.Data) - case *btrfsitem.Error: - val.Errs = append(val.Errs, fmt.Errorf("malformed XATTR_ITEM: %w", itemBody.Err)) - default: - panic(fmt.Errorf("should not happen: XATTR_ITEM has unexpected item type: %T", itemBody)) + continue } + bodyCopy := itemBody.Clone() + val.InodeItem = &bodyCopy + case *btrfsitem.Error: + val.Errs = append(val.Errs, fmt.Errorf("malformed INODE_ITEM: %w", itemBody.Err)) default: - val.OtherItems = append(val.OtherItems, item) + panic(fmt.Errorf("should not happen: INODE_ITEM has unexpected item type: %T", itemBody)) } + case btrfsitem.XATTR_ITEM_KEY: + switch itemBody := item.Body.(type) { + case *btrfsitem.DirEntry: + val.XAttrs[string(itemBody.Name)] = string(itemBody.Data) + case *btrfsitem.Error: + val.Errs = append(val.Errs, fmt.Errorf("malformed XATTR_ITEM: %w", itemBody.Err)) + default: + panic(fmt.Errorf("should not happen: XATTR_ITEM has unexpected item type: %T", itemBody)) + } + default: + val.OtherItems = append(val.OtherItems, item) } - return val - }) - if val.InodeItem == nil && val.OtherItems == nil { - return nil, val.Errs } - return val, nil } -func (sv *Subvolume) LoadDir(inode btrfsprim.ObjID) (*Dir, error) { - val := containers.LoadOrElse[btrfsprim.ObjID, *Dir](&sv.dirCache, inode, func(inode btrfsprim.ObjID) (val *Dir) { - val = new(Dir) - fullInode, err := sv.LoadFullInode(inode) - if err != nil { - val.Errs = append(val.Errs, err) - return val - } - val.FullInode = *fullInode - val.SV = sv - val.populate() - return val - }) +func (sv *Subvolume) AcquireDir(inode btrfsprim.ObjID) (*Dir, error) { + val := sv.dirCache.Acquire(sv.ctx, inode) if val.Inode == 0 { + sv.dirCache.Release(inode) return nil, val.Errs } return val, nil } -func (dir *Dir) populate() { +func (sv *Subvolume) ReleaseDir(inode btrfsprim.ObjID) { + sv.dirCache.Release(inode) +} + +func (sv *Subvolume) loadDir(_ context.Context, inode btrfsprim.ObjID, dir *Dir) { + *dir = Dir{} + fullInode, err := sv.AcquireFullInode(inode) + if err != nil { + dir.Errs = append(dir.Errs, err) + return + } + dir.FullInode = *fullInode + sv.ReleaseFullInode(inode) + dir.SV = sv + dir.ChildrenByName = make(map[string]btrfsitem.DirEntry) dir.ChildrenByIndex = make(map[uint64]btrfsitem.DirEntry) for _, item := range dir.OtherItems { @@ -337,37 +356,42 @@ func (dir *Dir) AbsPath() (string, error) { if dir.DotDot == nil { return "", fmt.Errorf("missing .. entry in dir inode %v", dir.Inode) } - parent, err := dir.SV.LoadDir(dir.DotDot.Inode) + parent, err := dir.SV.AcquireDir(dir.DotDot.Inode) if err != nil { return "", err } parentName, err := parent.AbsPath() + dir.SV.ReleaseDir(dir.DotDot.Inode) if err != nil { return "", err } return filepath.Join(parentName, string(dir.DotDot.Name)), nil } -func (sv *Subvolume) LoadFile(inode btrfsprim.ObjID) (*File, error) { - val := containers.LoadOrElse[btrfsprim.ObjID, *File](&sv.fileCache, inode, func(inode btrfsprim.ObjID) (val *File) { - val = new(File) - fullInode, err := sv.LoadFullInode(inode) - if err != nil { - val.Errs = append(val.Errs, err) - return val - } - val.FullInode = *fullInode - val.SV = sv - val.populate() - return val - }) +func (sv *Subvolume) AcquireFile(inode btrfsprim.ObjID) (*File, error) { + val := sv.fileCache.Acquire(sv.ctx, inode) if val.Inode == 0 { + sv.fileCache.Release(inode) return nil, val.Errs } return val, nil } -func (file *File) populate() { +func (sv *Subvolume) ReleaseFile(inode btrfsprim.ObjID) { + sv.fileCache.Release(inode) +} + +func (sv *Subvolume) loadFile(_ context.Context, inode btrfsprim.ObjID, file *File) { + *file = File{} + fullInode, err := sv.AcquireFullInode(inode) + if err != nil { + file.Errs = append(file.Errs, err) + return + } + file.FullInode = *fullInode + sv.ReleaseFullInode(inode) + file.SV = sv + for _, item := range file.OtherItems { switch item.Key.ItemType { case btrfsitem.INODE_REF_KEY: diff --git a/lib/btrfsutil/open.go b/lib/btrfsutil/open.go index c5ee314..abbd466 100644 --- a/lib/btrfsutil/open.go +++ b/lib/btrfsutil/open.go @@ -30,6 +30,7 @@ func Open(ctx context.Context, flag int, filenames ...string) (*btrfs.FS, error) File: osFile, } bufFile := diskio.NewBufferedFile[btrfsvol.PhysicalAddr]( + ctx, typedFile, //nolint:gomnd // False positive: gomnd.ignored-functions=[textui.Tunable] doesn't support type params. textui.Tunable[btrfsvol.PhysicalAddr](16*1024), // block size: 16KiB diff --git a/lib/btrfsutil/rebuilt_forrest.go b/lib/btrfsutil/rebuilt_forrest.go index b5c646d..811e1ac 100644 --- a/lib/btrfsutil/rebuilt_forrest.go +++ b/lib/btrfsutil/rebuilt_forrest.go @@ -7,7 +7,6 @@ package btrfsutil import ( "context" "fmt" - "sync" "github.com/datawire/dlib/dlog" @@ -140,12 +139,11 @@ type RebuiltForrest struct { treesMu nestedMutex trees map[btrfsprim.ObjID]*RebuiltTree // must hold .treesMu to access - leafs containers.ARCache[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]] - incItems containers.ARCache[btrfsprim.ObjID, *itemIndex] - excItems containers.ARCache[btrfsprim.ObjID, *itemIndex] + leafs containers.Cache[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]] + incItems containers.Cache[btrfsprim.ObjID, itemIndex] + excItems containers.Cache[btrfsprim.ObjID, itemIndex] - nodesMu sync.Mutex - nodes containers.ARCache[btrfsvol.LogicalAddr, *btrfstree.Node] + nodes containers.Cache[btrfsvol.LogicalAddr, btrfstree.Node] } // NewRebuiltForrest returns a new RebuiltForrest instance. The @@ -158,23 +156,24 @@ func NewRebuiltForrest(file diskio.File[btrfsvol.LogicalAddr], sb btrfstree.Supe cb: cb, trees: make(map[btrfsprim.ObjID]*RebuiltTree), - leafs: containers.ARCache[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]]{ - MaxLen: textui.Tunable(8), - }, - incItems: containers.ARCache[btrfsprim.ObjID, *itemIndex]{ - MaxLen: textui.Tunable(8), - }, - excItems: containers.ARCache[btrfsprim.ObjID, *itemIndex]{ - MaxLen: textui.Tunable(8), - }, - - nodes: containers.ARCache[btrfsvol.LogicalAddr, *btrfstree.Node]{ - MaxLen: textui.Tunable(8), - OnRemove: func(_ btrfsvol.LogicalAddr, node *btrfstree.Node) { - node.Free() - }, - }, } + + ret.leafs = containers.NewARCache[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]](textui.Tunable(8), + containers.SourceFunc[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]]( + func(ctx context.Context, treeID btrfsprim.ObjID, leafs *map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]) { + *leafs = ret.trees[treeID].uncachedLeafToRoots(ctx) + })) + ret.incItems = containers.NewARCache[btrfsprim.ObjID, itemIndex](textui.Tunable(8), + containers.SourceFunc[btrfsprim.ObjID, itemIndex](func(ctx context.Context, treeID btrfsprim.ObjID, incItems *itemIndex) { + *incItems = ret.trees[treeID].uncachedIncItems(ctx) + })) + ret.excItems = containers.NewARCache[btrfsprim.ObjID, itemIndex](textui.Tunable(8), + containers.SourceFunc[btrfsprim.ObjID, itemIndex](func(ctx context.Context, treeID btrfsprim.ObjID, excItems *itemIndex) { + *excItems = ret.trees[treeID].uncachedExcItems(ctx) + })) + ret.nodes = containers.NewARCache[btrfsvol.LogicalAddr, btrfstree.Node](textui.Tunable(8), + containers.SourceFunc[btrfsvol.LogicalAddr, btrfstree.Node](ret.readNode)) + if ret.cb == nil { ret.cb = noopRebuiltForrestCallbacks{ forrest: ret, diff --git a/lib/btrfsutil/rebuilt_readitem.go b/lib/btrfsutil/rebuilt_readitem.go index 03a7cdc..d3a2253 100644 --- a/lib/btrfsutil/rebuilt_readitem.go +++ b/lib/btrfsutil/rebuilt_readitem.go @@ -26,18 +26,14 @@ func (ptr ItemPtr) String() string { return fmt.Sprintf("node@%v[%v]", ptr.Node, ptr.Slot) } -func (ts *RebuiltForrest) readNode(ctx context.Context, laddr btrfsvol.LogicalAddr) *btrfstree.Node { - if cached, ok := ts.nodes.Load(laddr); ok { - dlog.Tracef(ctx, "cache-hit node@%v", laddr) - return cached - } +func (ts *RebuiltForrest) readNode(ctx context.Context, laddr btrfsvol.LogicalAddr, out *btrfstree.Node) { + dlog.Debugf(ctx, "cache-miss node@%v, reading...", laddr) graphInfo, ok := ts.graph.Nodes[laddr] if !ok { panic(fmt.Errorf("should not happen: node@%v is not mentioned in the in-memory graph", laddr)) } - dlog.Debugf(ctx, "cache-miss node@%v, reading...", laddr) node, err := btrfstree.ReadNode[btrfsvol.LogicalAddr](ts.file, ts.sb, laddr, btrfstree.NodeExpectations{ LAddr: containers.OptionalValue(laddr), Level: containers.OptionalValue(graphInfo.Level), @@ -56,22 +52,20 @@ func (ts *RebuiltForrest) readNode(ctx context.Context, laddr btrfsvol.LogicalAd if err != nil { panic(fmt.Errorf("should not happen: i/o error: %w", err)) } - - ts.nodes.Store(laddr, node) - - return node + *out = *node } func (ts *RebuiltForrest) readItem(ctx context.Context, ptr ItemPtr) btrfsitem.Item { - ts.nodesMu.Lock() - defer ts.nodesMu.Unlock() if ts.graph.Nodes[ptr.Node].Level != 0 { panic(fmt.Errorf("should not happen: btrfsutil.RebuiltForrest.readItem called for non-leaf node@%v", ptr.Node)) } if ptr.Slot < 0 { panic(fmt.Errorf("should not happen: btrfsutil.RebuiltForrest.readItem called for negative item slot: %v", ptr.Slot)) } - items := ts.readNode(ctx, ptr.Node).BodyLeaf + + items := ts.nodes.Acquire(ctx, ptr.Node).BodyLeaf + defer ts.nodes.Release(ptr.Node) + if ptr.Slot >= len(items) { panic(fmt.Errorf("should not happen: btrfsutil.RebuiltForrest.readItem called for out-of-bounds item slot: slot=%v len=%v", ptr.Slot, len(items))) diff --git a/lib/btrfsutil/rebuilt_tree.go b/lib/btrfsutil/rebuilt_tree.go index 96d5a75..e65a3f6 100644 --- a/lib/btrfsutil/rebuilt_tree.go +++ b/lib/btrfsutil/rebuilt_tree.go @@ -47,33 +47,37 @@ type RebuiltTree struct { // leafToRoots returns all leafs (lvl=0) in the filesystem that pass // .isOwnerOK, whether or not they're in the tree. func (tree *RebuiltTree) leafToRoots(ctx context.Context) map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr] { - return containers.LoadOrElse[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]](&tree.forrest.leafs, tree.ID, func(btrfsprim.ObjID) map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr] { - ctx = dlog.WithField(ctx, "btrfs.util.rebuilt-tree.index-nodes", fmt.Sprintf("tree=%v", tree.ID)) + ret := *tree.forrest.leafs.Acquire(ctx, tree.ID) + tree.forrest.leafs.Release(tree.ID) + return ret +} - nodeToRoots := make(map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]) +func (tree *RebuiltTree) uncachedLeafToRoots(ctx context.Context) map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr] { + ctx = dlog.WithField(ctx, "btrfs.util.rebuilt-tree.index-nodes", fmt.Sprintf("tree=%v", tree.ID)) - var stats textui.Portion[int] - stats.D = len(tree.forrest.graph.Nodes) - progressWriter := textui.NewProgress[textui.Portion[int]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)) - progress := func() { - stats.N = len(nodeToRoots) - progressWriter.Set(stats) - } + nodeToRoots := make(map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]) - progress() - for _, node := range maps.SortedKeys(tree.forrest.graph.Nodes) { - tree.indexNode(ctx, node, nodeToRoots, progress, nil) - } - progressWriter.Done() + var stats textui.Portion[int] + stats.D = len(tree.forrest.graph.Nodes) + progressWriter := textui.NewProgress[textui.Portion[int]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)) + progress := func() { + stats.N = len(nodeToRoots) + progressWriter.Set(stats) + } - ret := make(map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]) - for node, roots := range nodeToRoots { - if tree.forrest.graph.Nodes[node].Level == 0 && len(roots) > 0 { - ret[node] = roots - } + progress() + for _, node := range maps.SortedKeys(tree.forrest.graph.Nodes) { + tree.indexNode(ctx, node, nodeToRoots, progress, nil) + } + progressWriter.Done() + + ret := make(map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]) + for node, roots := range nodeToRoots { + if tree.forrest.graph.Nodes[node].Level == 0 && len(roots) > 0 { + ret[node] = roots } - return ret - }) + } + return ret } func (tree *RebuiltTree) indexNode(ctx context.Context, node btrfsvol.LogicalAddr, index map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr], progress func(), stack []btrfsvol.LogicalAddr) { @@ -136,8 +140,9 @@ func (tree *RebuiltTree) isOwnerOK(owner btrfsprim.ObjID, gen btrfsprim.Generati // Do not mutate the returned map; it is a pointer to the // RebuiltTree's internal map! func (tree *RebuiltTree) RebuiltItems(ctx context.Context) *containers.SortedMap[btrfsprim.Key, ItemPtr] { - ctx = dlog.WithField(ctx, "btrfs.util.rebuilt-tree.index-inc-items", fmt.Sprintf("tree=%v", tree.ID)) - return tree.items(ctx, &tree.forrest.incItems, tree.Roots.HasAny) + ret := *tree.forrest.incItems.Acquire(ctx, tree.ID) + tree.forrest.incItems.Release(tree.ID) + return ret } // RebuiltPotentialItems returns a map of items that could be added to @@ -146,14 +151,25 @@ func (tree *RebuiltTree) RebuiltItems(ctx context.Context) *containers.SortedMap // Do not mutate the returned map; it is a pointer to the // RebuiltTree's internal map! func (tree *RebuiltTree) RebuiltPotentialItems(ctx context.Context) *containers.SortedMap[btrfsprim.Key, ItemPtr] { + ret := *tree.forrest.excItems.Acquire(ctx, tree.ID) + tree.forrest.excItems.Release(tree.ID) + return ret +} + +func (tree *RebuiltTree) uncachedIncItems(ctx context.Context) *containers.SortedMap[btrfsprim.Key, ItemPtr] { + ctx = dlog.WithField(ctx, "btrfs.util.rebuilt-tree.index-inc-items", fmt.Sprintf("tree=%v", tree.ID)) + return tree.items(ctx, tree.Roots.HasAny) +} + +func (tree *RebuiltTree) uncachedExcItems(ctx context.Context) *containers.SortedMap[btrfsprim.Key, ItemPtr] { ctx = dlog.WithField(ctx, "btrfs.util.rebuilt-tree.index-exc-items", fmt.Sprintf("tree=%v", tree.ID)) - return tree.items(ctx, &tree.forrest.excItems, + return tree.items(ctx, func(roots containers.Set[btrfsvol.LogicalAddr]) bool { return !tree.Roots.HasAny(roots) }) } -type itemIndex = containers.SortedMap[btrfsprim.Key, ItemPtr] +type itemIndex = *containers.SortedMap[btrfsprim.Key, ItemPtr] type itemStats struct { Leafs textui.Portion[int] @@ -166,54 +182,50 @@ func (s itemStats) String() string { s.Leafs, s.NumItems, s.NumDups) } -func (tree *RebuiltTree) items(ctx context.Context, cache containers.Map[btrfsprim.ObjID, *itemIndex], - leafFn func(roots containers.Set[btrfsvol.LogicalAddr]) bool, -) *containers.SortedMap[btrfsprim.Key, ItemPtr] { +func (tree *RebuiltTree) items(ctx context.Context, leafFn func(roots containers.Set[btrfsvol.LogicalAddr]) bool) *containers.SortedMap[btrfsprim.Key, ItemPtr] { tree.mu.RLock() defer tree.mu.RUnlock() - return containers.LoadOrElse(cache, tree.ID, func(btrfsprim.ObjID) *itemIndex { - var leafs []btrfsvol.LogicalAddr - for leaf, roots := range tree.leafToRoots(ctx) { - if leafFn(roots) { - leafs = append(leafs, leaf) - } + var leafs []btrfsvol.LogicalAddr + for leaf, roots := range tree.leafToRoots(ctx) { + if leafFn(roots) { + leafs = append(leafs, leaf) } - slices.Sort(leafs) + } + slices.Sort(leafs) - var stats itemStats - stats.Leafs.D = len(leafs) - progressWriter := textui.NewProgress[itemStats](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)) + var stats itemStats + stats.Leafs.D = len(leafs) + progressWriter := textui.NewProgress[itemStats](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)) - index := new(containers.SortedMap[btrfsprim.Key, ItemPtr]) - for i, leaf := range leafs { - stats.Leafs.N = i - progressWriter.Set(stats) - for j, itemKey := range tree.forrest.graph.Nodes[leaf].Items { - newPtr := ItemPtr{ - Node: leaf, - Slot: j, - } - if oldPtr, exists := index.Load(itemKey); !exists { + index := new(containers.SortedMap[btrfsprim.Key, ItemPtr]) + for i, leaf := range leafs { + stats.Leafs.N = i + progressWriter.Set(stats) + for j, itemKey := range tree.forrest.graph.Nodes[leaf].Items { + newPtr := ItemPtr{ + Node: leaf, + Slot: j, + } + if oldPtr, exists := index.Load(itemKey); !exists { + index.Store(itemKey, newPtr) + stats.NumItems++ + } else { + if tree.RebuiltShouldReplace(oldPtr.Node, newPtr.Node) { index.Store(itemKey, newPtr) - stats.NumItems++ - } else { - if tree.RebuiltShouldReplace(oldPtr.Node, newPtr.Node) { - index.Store(itemKey, newPtr) - } - stats.NumDups++ } - progressWriter.Set(stats) + stats.NumDups++ } - } - if stats.Leafs.N > 0 { - stats.Leafs.N = len(leafs) progressWriter.Set(stats) - progressWriter.Done() } + } + if stats.Leafs.N > 0 { + stats.Leafs.N = len(leafs) + progressWriter.Set(stats) + progressWriter.Done() + } - return index - }) + return index } // main public API ///////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/lib/containers/arcache.go b/lib/containers/arcache.go index 1dc3b7e..3f15f67 100644 --- a/lib/containers/arcache.go +++ b/lib/containers/arcache.go @@ -2,13 +2,29 @@ // // SPDX-License-Identifier: GPL-2.0-or-later +// This file should be reasonably readable from top-to-bottom; I've +// tried to write it in a sort-of "literate programming" style. That +// makes the file comparatively huge--but don't let that intimidate +// you, it's only huge because of the detailed comments; it's less +// than 300 lines without the comments. + package containers import ( + "context" + "fmt" "sync" ) -// ARCache is a thread-safe Adaptive Replacement Cache. +// NewARCache returns a new thread-safe Adaptive Replacement Cache +// (ARC). +// +// Fundamentally, the point of ARC is to combine both recency +// information and frequency information together to form a cache +// policy that is better than both least-recently-used eviction (LRU) +// or least-frequently-used eviction (LFU); and the balance between +// how much weight is given to recency vs frequency is "adaptive" +// based on the characteristics of the current workload. // // The Adaptive Replacement Cache is patented by IBM (patent // US-6,996,676-B2 is set to expire 2024-02-22). @@ -17,331 +33,598 @@ import ( // enhanced ARC, which are patented by Sun (now Oracle) (patent // US-7,469,320-B2 is set to expire 2027-02-13). // -// It is invalid to adjust any public members after first use. -type ARCache[K comparable, V any] struct { - MaxLen int // must be >= 2 - New func(K) V // may be nil (.Load) - OnHit func(K, V) // may be nil (.Load) - OnMiss func(K) // may be nil (.Load) - OnEvict func(K, V) // may be nil (.Load, .Store) - OnRemove func(K, V) // may be nil (.Load, .Store, .Delete) +// This implementation has a few enhancements compared to standard +// ARC: +// +// - This implementation supports explicitly deleting/invalidating +// cache entries; the standard ARC algorithm assumes that the only +// reason an entry is ever removed from the cache is because the +// cache is full and the entry had to be evicted to make room for +// a new entry. +// +// - This implementation supports pinning entries such that they +// cannot be evicted. This is one of the enhancement from the +// enhanced version of ARC used by ZFS, but the version here is +// not based on the ZFS version. +// +// It is invalid (runtime-panic) to call NewARCache with a +// non-positive capacity or a nil source. +// +//nolint:predeclared // 'cap' is the best name for it. +func NewARCache[K comparable, V any](cap int, src Source[K, V]) Cache[K, V] { + // Pass the parameters in. + if cap <= 0 { + panic(fmt.Errorf("containers.NewARCache: invalid capacity: %v", cap)) + } + if src == nil { + panic(fmt.Errorf("containers.NewARCache: nil source")) + } + ret := &arCache[K, V]{ + cap: cap, + src: src, + // Do allocations up-front. Don't yet worry about + // what these members are; we'll get to that in the + // below description of the datatypes. + liveByName: make(map[K]*LinkedListEntry[arcLiveEntry[K, V]], cap), + ghostByName: make(map[K]*LinkedListEntry[arcGhostEntry[K]], cap), + } + for i := 0; i < cap; i++ { + ret.unusedLive.Store(new(LinkedListEntry[arcLiveEntry[K, V]])) + ret.unusedGhost.Store(new(LinkedListEntry[arcGhostEntry[K]])) + } + // Return. + return ret +} - mu sync.RWMutex - // Some of the ARC literature calls these "MRU" and "MFU" for - // "most {recently,frequently} used", but that's wrong. The - // `frequent` list is still ordered by most-recent-use. The - // distinction is that the `recent` list is entries that have - // been used "only once recently", while the `frequent` list - // is entries that have been used "at least twice recently" - // (to quote the definitions from the original ARC paper); the - // affect being that the `recent` list is for - // recently-but-not-frequently used entries, while the - // `frequent` list is there to ensure that frequently-used - // entries don't get evicted. They're both still MRU lists. - recentLive lruCache[K, V] - recentGhost lruCache[K, struct{}] - frequentLive lruCache[K, V] - frequentGhost lruCache[K, struct{}] - // recentLiveTarget is the "target" len of recentLive. We - // allow the actual len to deviate from this if the ARCache as - // a whole isn't over-len. That is: recentLiveTarget is used - // to decide *which* list to evict from, not *whether* we need - // to evict. - recentLiveTarget int +// Related literature: +// +// The comments in this file use terminology from the original ARC +// paper: "ARC: A Self-Tuning, Low Overhead Replacement Cache" by +// N. Megiddo & D. Modha, FAST 2003. +// https://www.usenix.org/legacy/events/fast03/tech/full_papers/megiddo/megiddo.pdf +// +// But, this file tries to be clear enough that it makes sense +// without reading the paper. +// +// If you do read the paper, a difference to keep an eye out for is +// that in order to support "delete", several of the assumptions +// related to DBL(2c) are no longer true. Specifically, we must +// handle the cache not being full in cases other than a DBL(2c) miss; +// and two of the invariants from Π(c) are no longer true (see the bit +// about invariants below). Besides the obvious (calls to +// synchronization primitives, things with "del" or "pin" in the +// name), places where the standard ARC algorithm is modified to +// support deletion or pinning should be clearly commented. +// +// Background / data structures: +// +// `ARC(c)` -- that is, an adaptive replacement cache with capacity +// `c` -- is most reasonably understood in terms of an "imaginary" +// simpler `DBL(2c)` algorithm. +// +// DBL(2c) is an cache that maintains 2c entries in a set of lists +// ordered by LRU/MRU. These lists are called L₁ or "recent" and L₂ +// or "frequent"; |L₁| + |L₂| ≤ 2c. L₁/recent is for entries that +// have only been used only once "recently", and L₂/frequent is for +// entries that have been used twice or more "recently" (for a +// particular definition of "recently" that we don't need to get +// into). +// +// Aside: Some of the ARC literature calls these lists "MRU" and +// "MFU" for "most {recently,frequently} used", but that's wrong. +// The L₂/frequent list is still ordered by recency of use. +// +// Put another way, L₁/recent is an recency-ordered list of +// recently-but-not-frequently-used entries, while L₂/frequent is an +// recency-ordered list of frequently-used entries. +// +// We'll get to DBL(2c)'s replacement algorithm later; the above +// "shape" is enough of an introduction for now. +// +// Now, the difference of ARC(c) over DBL(2c) is that ARC(c) splits +// those lists into segments; L₁ gets split into a "top" part T₁ and +// a "bottom" part B₁, and similarly L₂ gets split into a "top" part +// T₂ and a "bottom" part B₂. The "cache" is only made of T₁ and +// T₂; entries in B₁ and B₂ are evicted; the 4 lists together make +// up a "directory" of what would be in DBL(2c). That is: +// +// cache = T₁ ∪ T₂ +// directory = T₁ ∪ T₂ ∪ B₁ ∪ B₂ +// L₁ = T₁ ∪ B₁ +// L₂ = T₂ ∪ B₂ +// +// Let us say that entries in the T₁ or T₂ are "live entries", and +// entries in B₁ or B₂ are "ghost entries". The ghost entries make +// up a record of recent evictions. We could use the same struct +// for live entries and ghost entries, and just have everything but +// the key zeroed-out for ghost entries; but to simplify things +// let's just have different structures: + +type arcLiveEntry[K comparable, V any] struct { + key K + val V + + refs int // for pinning + del chan struct{} // non-nil if a delete is waiting on .refs to drop to zero +} - // Add a no-op .check() method that the tests can override. - noopChecker //nolint:unused // False positive; it is used. +type arcGhostEntry[K comparable] struct { + key K } -var _ Map[int, string] = (*ARCache[int, string])(nil) +type arCache[K comparable, V any] struct { + cap int // "c" + src Source[K, V] -//nolint:unused // False positive; it is used. -type noopChecker struct{} + mu sync.RWMutex -//nolint:unused // False positive; it is used. -func (noopChecker) check() {} + // Now, the above was a sort of lie for this implementation; + // for our pinning implementation, we actually segment L₁ and + // L₂ into *three* segments rather than two segments: the top + // part is pinned (and thus live) entries, the middle part is + // live-but-not-pinned entries, and the bottom part is ghost + // entries. + // + // We don't actually care about the order of the pinned + // entries (the lists are ordered by recency-of-use, and + // pinned entries are all "in-use", so they're all tied), but + // it's convenient to maintain the set of them as sorted lists + // the same as everything else. + + // L₁ / recently-but-not-frequently used entries + recentPinned LinkedList[arcLiveEntry[K, V]] // top of L₁ + recentLive LinkedList[arcLiveEntry[K, V]] // "T₁" for "top of L₁" (but really the middle) + recentGhost LinkedList[arcGhostEntry[K]] // "B₁" for "bottom of L₁" + + // L₂ / frequently used entries + frequentPinned LinkedList[arcLiveEntry[K, V]] // top of L₂ + frequentLive LinkedList[arcLiveEntry[K, V]] // "T₂" for "top of L₂" (but really the middle) + frequentGhost LinkedList[arcGhostEntry[K]] // "B₂" for "bottom of L₂" + + // Now, where to draw the split between the "live" part and + // "ghost" parts of each list? We'll use a paramenter + // "p"/recentLiveTarget to decide which list to evict + // (transition live→ghost) from whenever we need to do an + // eviction. + // + // recentLiveTarget is the "target" len of + // recentPinned+recentLive. We allow the actual len to + // deviate from this if the arCache as a whole isn't + // over-capacity. To say it again: recentLiveTarget is used + // to decide *which* list to evict from, not *whether* we need + // to evict. + // + // recentLiveTarget is always in the range [0, cap]; it never + // goes negative, and never goes beyond cap. Adjusting this + // target is the main way that ARC is "adaptive", we could + // instead define a "fixed replacement cache" policy FRC(p, c) + // that has a static target. But we'll get into that later. + recentLiveTarget int // "p" + + // Other book-keeping. + + // For lookups. The above ordered lists are about + // eviction/replacement policies, but they don't help us when + // we want to read something from the cache; we'd have to do + // an O(n) scan through each list to find the item we're + // looking for. So maintain this separate index in order to + // do O(1) lookups when we want to read from the cache. + liveByName map[K]*LinkedListEntry[arcLiveEntry[K, V]] + ghostByName map[K]*LinkedListEntry[arcGhostEntry[K]] + + // Free lists. Like the pinned lists, we don't actually care + // about the order here, it's just convenient to use the same + // ordered lists. + unusedLive LinkedList[arcLiveEntry[K, V]] + unusedGhost LinkedList[arcGhostEntry[K]] + + // For blocking related to pinning. + waiters LinkedList[chan struct{}] +} -func max(a, b int) int { - if a > b { - return a +// Algorithms: +// +// Now that all of our data structures are defined, let's get into +// the algorithms of updating them. +// +// Before getting to the meaty ARC stuff, let's get some +// boring/simple synchronization/blocking primitives out of the way: + +// waitForEval is called before storing something into the cache. +// This is nescessary because if the cache is full and all entries are +// pinned, then we won't have to store the entry until something gets +// unpinned ("Release()d"). +func (c *arCache[K, V]) waitForAvail() { + if !(c.recentLive.IsEmpty() && c.frequentLive.IsEmpty() && c.unusedLive.IsEmpty()) { + // There is already an available `arcLiveEntry` that + // we can either use or evict. + return } - return b + ch := make(chan struct{}) + c.waiters.Store(&LinkedListEntry[chan struct{}]{Value: ch}) + c.mu.Unlock() + <-ch + c.mu.Lock() } -func bound(low, val, high int) int { - if val < low { - return low +// notifyAvail is called when an entry gets unpinned ("Release()d"), +// and wakes up the highest-priority .waitForAvail() waiter (if there +// is one). +func (c *arCache[K, V]) notifyAvail() { + waiter := c.waiters.Oldest + if waiter == nil { + return } - if val > high { - return high - } - return val + c.waiters.Delete(waiter) + close(waiter.Value) } -func (c *ARCache[K, V]) onLRUEvictRecent(k K, v V) { - if c.recentLive.Len() < c.MaxLen { - for c.recentLen() >= c.MaxLen { - c.recentGhost.EvictOldest() - } - for c.ghostLen() >= c.MaxLen { - if c.recentGhost.Len() > 0 { - c.recentGhost.EvictOldest() - } else { - c.frequentGhost.EvictOldest() - } - } - c.recentGhost.Store(k, struct{}{}) +// Calling .Delete(k) on an entry that is pinned needs to block until +// the entry is no longer pinned. +func (c *arCache[K, V]) unlockAndWaitForDel(entry *LinkedListEntry[arcLiveEntry[K, V]]) { + if entry.Value.del == nil { + entry.Value.del = make(chan struct{}) } - if c.OnRemove != nil { - c.OnRemove(k, v) - } - if c.OnEvict != nil { - c.OnEvict(k, v) + ch := entry.Value.del + c.mu.Unlock() + <-ch +} + +// notifyOfDel unblocks any calls to .Delete(k), notifying them that +// the entry has been deleted and they can now return. +func (*arCache[K, V]) notifyOfDel(entry *LinkedListEntry[arcLiveEntry[K, V]]) { + if entry.Value.del != nil { + close(entry.Value.del) + entry.Value.del = nil } } -func (c *ARCache[K, V]) onLRUEvictFrequent(k K, v V) { - if c.frequentLive.Len() < c.MaxLen { - for c.frequentLen() >= c.MaxLen { - c.frequentGhost.EvictOldest() +// OK, now to the main algorithm(s)! +// +// To get this out of the way: Here are the invariants that the +// algorithm(s) must enforce (see the paper for justification): +// +// from DBL(2c): +// +// • 0 ≤ |L₁|+|L₂| ≤ 2c +// • 0 ≤ |L₁| ≤ c +// • 0 ≤ |L₂| ≤ 2c +// +// from Π(c): +// +// Π(c) is the class of policies that ARC(c) belongs to... I +// suppose that because of the changes we make to support +// deletion, this implementation doesn't actually belong to +// Π(c). +// +// • A.1: The lists T₁, B₁, T₂, and B₂ are all mutually +// disjoint. +// • (not true) A.2: If |L₁|+|L₂| < c, then both B₁ and B₂ are +// empty. But supporting "delete" invalidates this! +// • (not true) A.3: If |L₁|+|L₂| ≥ c, then |T₁|+|T₂| = c. But +// supporting "delete" invalidates this! +// • A.4(a): Either (T₁ or B₁ is empty), or (the LRU page in T₁ +// is more recent than the MRU page in B₁). +// • A.4(b): Either (T₂ or B₂ is empty), or (the LRU page in T₂ +// is more recent than the MRU page in B₂). +// • A.5: |T₁∪T₂| is the set of pages that would be maintained +// by the cache policy π(c). +// +// The algorithm presented in the paper relies on A.2 and A.3 in +// order to be correct; the algorithm had to be adjusted in +// order to behave properly without relying on those two +// invariants. +// +// from FRC(p, c): +// +// • 0 ≤ p ≤ c +// +// OK, at the beginning I said that ARC(c) is most reasonably +// understood in terms of DBL(2c); to that end, we'll have two +// replacement policies that are layered; the "dblReplace" policy +// that is used in the cases when DBL(2c) would have a cache-miss, +// and the "arcReplace" policy that is used when ARC(c) has a +// cache-miss but DBL(2c) wouldn't have (and within dblReplace). + +// dblReplace is the DBL(2c) replacement policy. +// +// It returns an entry that is not in any list (c.recentPinned, +// c.recentLive, c.frequentPinned, c.frequentLive, or c.unusedLive), +// and is ready to be stored into a list by the caller. +func (c *arCache[K, V]) dblReplace() *LinkedListEntry[arcLiveEntry[K, V]] { + c.waitForAvail() + + // The DBL(2c) replacement policy is quite simple: "Replace + // the LRU page in L₁, if L₁ contains exactly c pages; + // otherwise, replace the LRU page in L₂" + // + // This looks a touch more complicated than a simple DBL(2c) + // implementation would look, but that's just because L₁ and + // L₂ are not implemented as uniform lists; instead of "the + // LRU entry of L₁" being a simple `c.recent.Oldest`, we have + // to check the 3 different segments of L₁. + + recentLen := c.recentPinned.Len + c.recentLive.Len + c.recentGhost.Len // |L₁| + switch { + case recentLen == c.cap: + // Use the LRU entry from L₁. + // + // Note that *even when* there are available entries + // from c.unusedLive, we still do this and evict the + // LRU entry from L₁ in order to avoid violating the + // `0 ≤ |L₁| ≤ c` invariant. + switch { + case !c.recentGhost.IsEmpty(): // bottom + return c.arcReplace(c.recentGhost.Oldest, true, false) + case !c.recentLive.IsEmpty(): // middle + entry := c.recentLive.Oldest + c.recentLive.Delete(entry) + delete(c.liveByName, entry.Value.key) + return entry + default: // case !c.recentPinned.IsEmpty(): // top + + // This can't happen because `c.recentLen == c.cap && + // c.recentGhost.IsEmpty() && c.recentLive.IsEmpty()` + // implies that `c.recentPinned.Len == c.cap`, which + // can't be true if c.waitForAvail() returned. + panic(fmt.Errorf("should not happen: lengths don't match up")) } - for c.ghostLen() >= c.MaxLen { - if c.frequentGhost.Len() > 0 { - c.frequentGhost.EvictOldest() - } else { - c.recentGhost.EvictOldest() - } + case recentLen < c.cap: + // If the directory is full, use the LRU entry from + // L₂; otherwise use a free (unused) entry. + switch { + // Cache is not full: use a free entry. + case !c.unusedLive.IsEmpty(): + entry := c.unusedLive.Oldest + c.unusedLive.Delete(entry) + return entry + case !c.unusedGhost.IsEmpty(): + return c.arcReplace(c.unusedGhost.Oldest, false, false) + // Cache is full: use the LRU entry from L₂ + case !c.frequentGhost.IsEmpty(): + return c.arcReplace(c.frequentGhost.Oldest, false, false) + default: + // This can't happen because `recentLen < c.cap` implies + // that `c.recentGhost.Len < c.cap`, which means that + // there is at least one ghost entry that is available + // in c.unusedGhost or c.frequentGhost. + panic(fmt.Errorf("should not happen: lengths don't match up")) } - c.frequentGhost.Store(k, struct{}{}) - } - if c.OnRemove != nil { - c.OnRemove(k, v) - } - if c.OnEvict != nil { - c.OnEvict(k, v) + default: // case recentLen > c.cap: + // Can't happen because `0 ≤ |L₁| ≤ c` is one of the + // invariants from DBL(2c); the above policy will + // never let it happen. + panic(fmt.Errorf("should not happen: recentLen:%v > cap:%v", recentLen, c.cap)) } } -func (c *ARCache[K, V]) init() { - c.check() - if c.recentLive.OnEvict == nil { - c.recentLive.OnEvict = c.onLRUEvictRecent - c.frequentLive.OnEvict = c.onLRUEvictFrequent +// arcReplace is the ARC(c) replacement policy. +// +// It returns an entry that is not in any list (c.recentPinned, +// c.recentLive, c.frequentPinned, c.frequentLive, or c.unusedLive), +// and is ready to be stored into a list by the caller. +// +// If an eviction is performed, `ghostEntry` is a pointer to the entry +// object that is used as a record of the eviction. `ghostEntry` +// should still be present in its old list, in case an eviction is not +// performed and the `ghostEntry` object is not modified. +// +// The `arbitrary` argument is arbitrary, see the quote about it +// below. +func (c *arCache[K, V]) arcReplace(ghostEntry *LinkedListEntry[arcGhostEntry[K]], forceEviction, arbitrary bool) *LinkedListEntry[arcLiveEntry[K, V]] { + c.waitForAvail() + + // If the cache isn't full, no need to do an eviction. (This + // check is a nescessary enhancement over standard ARC in + // order to support "delete"; because without "delete", + // arcReplace wouldn't ever be called when the cache isn't + // full.) + if entry := c.unusedLive.Oldest; entry != nil && !forceEviction { + c.unusedLive.Delete(entry) + return entry } - c.check() -} - -func (c *ARCache[K, V]) liveLen() int { return c.recentLive.Len() + c.frequentLive.Len() } -func (c *ARCache[K, V]) ghostLen() int { return c.recentGhost.Len() + c.frequentGhost.Len() } -func (c *ARCache[K, V]) recentLen() int { return c.recentLive.Len() + c.recentGhost.Len() } -func (c *ARCache[K, V]) frequentLen() int { return c.frequentLive.Len() + c.frequentGhost.Len() } -func (c *ARCache[K, V]) fullLen() int { - return c.recentLive.Len() + c.recentGhost.Len() + c.frequentLive.Len() + c.frequentGhost.Len() -} -// Store a key/value pair in to the cache. -func (c *ARCache[K, V]) Store(k K, v V) { - c.mu.Lock() - defer c.mu.Unlock() - c.init() - - c.unlockedStore(k, v) -} - -func (c *ARCache[K, V]) unlockedStore(k K, v V) { - // The "Case" comments here reflect Fig. 4. in the original paper "ARC: A - // Self-Tuning, Low Overhead Replacement Cache" by N. Megiddo & D. Modha, FAST - // 2003. - switch { - case c.recentLive.Has(k): // Case I(a): cache hit - // Make room - for c.frequentLen() >= c.MaxLen { - if c.frequentGhost.Len() > 0 { - c.frequentGhost.EvictOldest() - } else { - c.frequentLive.EvictOldest() - } - } - c.check() - // Move - oldV, _ := c.recentLive.Peek(k) - c.recentLive.Delete(k) - c.frequentLive.Store(k, v) - if c.OnRemove != nil { - c.OnRemove(k, oldV) - } - c.check() - case c.frequentLive.Has(k): // Case I(b): cache hit - oldV, _ := c.frequentLive.Peek(k) - c.frequentLive.Store(k, v) - if c.OnRemove != nil { - c.OnRemove(k, oldV) - } - c.check() - case c.recentGhost.Has(k): // Case II: cache miss (that "should" have been a hit) - // Adapt - c.recentLiveTarget = bound( - 0, - c.recentLiveTarget+max(1, c.frequentGhost.Len()/c.recentGhost.Len()), - c.MaxLen) - // Make room - for c.liveLen() >= c.MaxLen { - if c.recentLive.Len() > c.recentLiveTarget { - c.recentLive.EvictOldest() - } else { - c.frequentLive.EvictOldest() - } - } - for c.frequentLen() >= c.MaxLen { - if c.frequentGhost.Len() > 0 { - c.frequentGhost.EvictOldest() - } else { - c.frequentLive.EvictOldest() - } - } - c.check() - // Store - c.recentGhost.Delete(k) - c.frequentLive.Store(k, v) - c.check() - case c.frequentGhost.Has(k): // Case III: cache miss (that "should" have been a hit) - // Adapt - c.recentLiveTarget = bound( - 0, - c.recentLiveTarget-max(1, c.recentGhost.Len()/c.frequentGhost.Len()), - c.MaxLen) - // Make room - for c.liveLen() >= c.MaxLen { - // TODO(lukeshu): I don't understand why this .recentLiveTarget - // check needs to be `>=` instead of `>` like all of the others. - if c.recentLive.Len() >= c.recentLiveTarget && c.recentLive.Len() > 0 { - c.recentLive.EvictOldest() - } else { - c.frequentLive.EvictOldest() - } - } - c.check() - // Store - c.frequentGhost.Delete(k) - c.frequentLive.Store(k, v) - c.check() - default: // Case IV: cache miss - // Make room - if c.recentLen() < c.MaxLen { - for c.liveLen() >= c.MaxLen { - if c.recentLive.Len() > c.recentLiveTarget { - c.recentLive.EvictOldest() - } else { - c.frequentLive.EvictOldest() - } - } + // We'll be evicting. Prep ghostEntry to record that fact. + if ghostEntry.List != &c.unusedGhost { + delete(c.ghostByName, ghostEntry.Value.key) + } + ghostEntry.List.Delete(ghostEntry) + + // Note that from here on out, this policy changes *neither* + // |L₁| nor |L₂|; shortenings were already done by the above + // `ghostEntry.List.Delete(ghostEntry)` call, and lengthenings + // will be done by the caller with the returned `entry`. All + // this policy is doing from here on out is changing the split + // between T/B. + + // We have to make a binary choice about whether to evict + // c.recentLive→c.recentGhost or + // c.frequentLive→c.frequentGhost. + var evictFrom *LinkedList[arcLiveEntry[K, V]] + var evictTo *LinkedList[arcGhostEntry[K]] + + // Make the decision. + recentLive := c.recentPinned.Len + c.recentLive.Len + switch { // NB: Also check .IsEmpty() in order to support pinning. + case recentLive > c.recentLiveTarget, c.frequentLive.IsEmpty(): + evictFrom, evictTo = &c.recentLive, &c.recentGhost + case recentLive < c.recentLiveTarget, c.recentLive.IsEmpty(): + evictFrom, evictTo = &c.frequentLive, &c.frequentGhost + case recentLive == c.recentLiveTarget: + // The original paper says "The last replacement + // decision is somewhat arbitrary, and can be made + // differently if desired." + if arbitrary && c.recentLive.Len > 0 { + evictFrom, evictTo = &c.recentLive, &c.recentGhost } else { - c.recentLive.EvictOldest() - c.recentGhost.EvictOldest() + evictFrom, evictTo = &c.frequentLive, &c.frequentGhost } - c.check() - // Store - c.recentLive.Store(k, v) - c.check() } + + // Act on the decision. + entry := evictFrom.Oldest + // Evict. + delete(c.liveByName, entry.Value.key) + evictFrom.Delete(entry) + // Record the eviction. + ghostEntry.Value.key = entry.Value.key + evictTo.Store(ghostEntry) + c.ghostByName[ghostEntry.Value.key] = ghostEntry + + return entry } -// Load an entry from the cache, recording a "use" for the purposes of -// "least-recently-used" eviction. +// OK, now that we have our replacement policies defined, it's +// pretty obvious how to wire them into the "acquire an entry" +// algorithm. The only parts here that aren't obvious are: // -// Calls .OnHit or .OnMiss depending on whether it's a cache-hit or -// cache-miss. +// - the "adapt" step that adjusts c.recentLiveTarget. Read the +// paper for an explanation of why the formulas used do a good +// job of quickly adapting to various workloads. // -// If .New is non-nil, then .Load will never return (zero, false). -func (c *ARCache[K, V]) Load(k K) (v V, ok bool) { +// - the `ghostEntry.List == &c.frequentGhost` argument to +// arcReplace in the "cache-miss, but would have been a +// cache-hit in DBL(2c)" case. The short answer is that it's +// arbitrary (as discussed in comments in arcReplace), but +// matches what's in the original paper. + +// Acquire implements the 'Cache' interface. +func (c *arCache[K, V]) Acquire(ctx context.Context, k K) *V { c.mu.Lock() defer c.mu.Unlock() - c.init() - defer c.check() - - if v, ok := c.recentLive.Peek(k); ok { - // Make room - for c.frequentLen() >= c.MaxLen { - if c.frequentGhost.Len() > 0 { - c.frequentGhost.EvictOldest() - } else { - c.frequentLive.EvictOldest() - } + + var entry *LinkedListEntry[arcLiveEntry[K, V]] + switch { + case c.liveByName[k] != nil: // cache-hit + entry = c.liveByName[k] + if entry.List != &c.frequentPinned { + // Move to frequentPinned (unless it's already + // there; in which case, don't bother). + entry.List.Delete(entry) + c.frequentPinned.Store(entry) } - // Store - c.recentLive.Delete(k) - c.frequentLive.Store(k, v) - if c.OnHit != nil { - c.OnHit(k, v) + entry.Value.refs++ + case c.ghostByName[k] != nil: // cache-miss, but would have been a cache-hit in DBL(2c) + ghostEntry := c.ghostByName[k] + // Adapt. + switch ghostEntry.List { + case &c.recentGhost: + // Recency is doing well right now; invest toward recency. + c.recentLiveTarget = min(c.recentLiveTarget+max(1, c.frequentGhost.Len/c.recentGhost.Len), c.cap) + case &c.frequentGhost: + // Frequency is doing well right now; invest toward frequency. + c.recentLiveTarget = max(c.recentLiveTarget-max(1, c.recentGhost.Len/c.frequentGhost.Len), 0) } - return v, true + // Whether or not we do an eviction, this ghost entry + // needs to go away. + ghostEntry.List.Delete(ghostEntry) + delete(c.ghostByName, k) + c.unusedGhost.Store(ghostEntry) + // Replace. + entry = c.arcReplace(ghostEntry, false, ghostEntry.List == &c.frequentGhost) + entry.Value.key = k + c.src.Load(ctx, k, &entry.Value.val) + entry.Value.refs = 1 + c.frequentPinned.Store(entry) + c.liveByName[k] = entry + default: // cache-miss, and would have even been a cache-miss in DBL(2c) + // Replace. + entry = c.dblReplace() + entry.Value.key = k + c.src.Load(ctx, k, &entry.Value.val) + entry.Value.refs = 1 + c.recentPinned.Store(entry) + c.liveByName[k] = entry } - if v, ok := c.frequentLive.Load(k); ok { - if c.OnHit != nil { - c.OnHit(k, v) + return &entry.Value.val +} + +// Given everything that we've already explained, I think it's fair to call +// the remaining code "boilerplate". + +// Delete implements the 'Cache' interface. +func (c *arCache[K, V]) Delete(k K) { + c.mu.Lock() + + if entry := c.liveByName[k]; entry != nil { + if entry.Value.refs > 0 { + // Let .Release(k) do the deletion when the + // refcount drops to 0. + c.unlockAndWaitForDel(entry) + return } - return v, true + delete(c.liveByName, entry.Value.key) + entry.List.Delete(entry) + c.unusedLive.Store(entry) + } else if entry := c.ghostByName[k]; entry != nil { + delete(c.ghostByName, k) + entry.List.Delete(entry) + c.unusedGhost.Store(entry) } - if c.OnMiss != nil { - c.OnMiss(k) - } - if c.New != nil { - v := c.New(k) - c.unlockedStore(k, v) - return v, true - } - var zero V - return zero, false + // No need to call c.notifyAvail(); if we were able to delete + // it, it was already available. + + c.mu.Unlock() } -// Delete an entry from the cache. -func (c *ARCache[K, V]) Delete(k K) { +// Release implements the 'Cache' interface. +func (c *arCache[K, V]) Release(k K) { c.mu.Lock() defer c.mu.Unlock() - v, ok := c.unlockedPeek(k) - - c.recentLive.Delete(k) - c.recentGhost.Delete(k) - c.frequentLive.Delete(k) - c.frequentGhost.Delete(k) + entry := c.liveByName[k] + if entry == nil || entry.Value.refs <= 0 { + panic(fmt.Errorf("containers.arCache.Release called on key that is not held: %v", k)) + } - if ok && c.OnRemove != nil { - c.OnRemove(k, v) + entry.Value.refs-- + if entry.Value.refs == 0 { + switch { + case entry.Value.del != nil: + delete(c.liveByName, entry.Value.key) + entry.List.Delete(entry) + c.unusedLive.Store(entry) + c.notifyOfDel(entry) + case entry.List == &c.recentPinned: + c.recentPinned.Delete(entry) + c.recentLive.Store(entry) + case entry.List == &c.frequentPinned: + c.frequentPinned.Delete(entry) + c.frequentLive.Store(entry) + } + c.notifyAvail() } } -// Peek is like Load, but doesn't count as a "use" for -// "least-recently-used". -func (c *ARCache[K, V]) Peek(k K) (v V, ok bool) { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.unlockedPeek(k) -} +// Flush implements the 'Cache' interface. +func (c *arCache[K, V]) Flush(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() -func (c *ARCache[K, V]) unlockedPeek(k K) (v V, ok bool) { - if v, ok := c.recentLive.Peek(k); ok { - return v, true + for _, list := range []*LinkedList[arcLiveEntry[K, V]]{ + &c.recentPinned, + &c.recentLive, + &c.frequentPinned, + &c.frequentLive, + &c.unusedLive, + } { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + c.src.Flush(ctx, &entry.Value.val) + } } - - return c.frequentLive.Peek(k) } -// Has returns whether an entry is present in the cache. Calling Has -// does not count as a "use" for "least-recently-used". -func (c *ARCache[K, V]) Has(k K) bool { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.recentLive.Has(k) || c.frequentLive.Has(k) +func min(a, b int) int { + if a < b { + return a + } + return b } -// Len returns the number of entries in the cache. -func (c *ARCache[K, V]) Len() int { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.liveLen() +func max(a, b int) int { + if a > b { + return a + } + return b } diff --git a/lib/containers/arcache_string_test.go b/lib/containers/arcache_string_test.go index b72b21b..86f222c 100644 --- a/lib/containers/arcache_string_test.go +++ b/lib/containers/arcache_string_test.go @@ -5,6 +5,7 @@ package containers import ( + "context" "fmt" "strings" "testing" @@ -12,21 +13,22 @@ import ( "github.com/stretchr/testify/assert" ) -func (c *ARCache[K, V]) String() string { +func (c *arCache[K, V]) String() string { c.mu.RLock() defer c.mu.RUnlock() - keys := make([]string, 0, c.fullLen()) - for entry := c.recentGhost.byAge.oldest; entry != nil; entry = entry.newer { + fullLen := len(c.liveByName) + len(c.ghostByName) + keys := make([]string, 0, fullLen) + for entry := c.recentGhost.Oldest; entry != nil; entry = entry.Newer { keys = append(keys, fmt.Sprint(entry.Value.key)) } - for entry := c.recentLive.byAge.oldest; entry != nil; entry = entry.newer { + for entry := c.recentLive.Oldest; entry != nil; entry = entry.Newer { keys = append(keys, fmt.Sprint(entry.Value.key)) } - for entry := c.frequentLive.byAge.newest; entry != nil; entry = entry.older { + for entry := c.frequentLive.Newest; entry != nil; entry = entry.Older { keys = append(keys, fmt.Sprint(entry.Value.key)) } - for entry := c.frequentGhost.byAge.newest; entry != nil; entry = entry.older { + for entry := c.frequentGhost.Newest; entry != nil; entry = entry.Older { keys = append(keys, fmt.Sprint(entry.Value.key)) } @@ -36,26 +38,26 @@ func (c *ARCache[K, V]) String() string { } var out strings.Builder - blankLeft := c.MaxLen - c.recentLen() - for i := 0; i <= 2*c.MaxLen; i++ { + blankLeft := c.cap - (c.recentLive.Len + c.recentGhost.Len) + for i := 0; i <= 2*c.cap; i++ { sep := []byte(" ") - if i == blankLeft+c.recentGhost.Len() { + if i == blankLeft+c.recentGhost.Len { sep[0] = '[' } - if i == blankLeft+c.recentGhost.Len()+c.recentLive.Len() { + if i == blankLeft+c.recentGhost.Len+c.recentLive.Len { sep[1] = '!' } - if i == blankLeft+c.recentGhost.Len()+c.recentLive.Len()-c.recentLiveTarget { + if i == blankLeft+c.recentGhost.Len+c.recentLive.Len-c.recentLiveTarget { sep[2] = '^' } - if i == blankLeft+c.recentGhost.Len()+c.recentLive.Len()+c.frequentLive.Len() { + if i == blankLeft+c.recentGhost.Len+c.recentLive.Len+c.frequentLive.Len { sep[3] = ']' } out.Write(sep) - if i < 2*c.MaxLen { + if i < 2*c.cap { key := "" - if i >= blankLeft && i < blankLeft+c.fullLen() { + if i >= blankLeft && i < blankLeft+fullLen { key = keys[i-blankLeft] } spaceLeft := (keyLen - len(key)) / 2 @@ -70,9 +72,7 @@ func (c *ARCache[K, V]) String() string { func TestARCacheString(t *testing.T) { t.Parallel() - cache := &ARCache[int, int]{ - MaxLen: 4, - } + cache := NewARCache[int, int](4, SourceFunc[int, int](func(context.Context, int, *int) {})).(*arCache[int, int]) assert.Equal(t, ` ___ ___ ___ ___[!^]___ ___ ___ ___ `, cache.String()) } diff --git a/lib/containers/arcache_test.go b/lib/containers/arcache_test.go index 507dd04..3e858ec 100644 --- a/lib/containers/arcache_test.go +++ b/lib/containers/arcache_test.go @@ -9,103 +9,262 @@ package containers import ( "bytes" + "context" "crypto/rand" "fmt" - "math" "math/big" + "sort" "testing" "github.com/datawire/dlib/derror" + "github.com/datawire/dlib/dlog" "github.com/stretchr/testify/require" ) // Add runtime validity checks ///////////////////////////////////////////////// -func (c *ARCache[K, V]) err(e error) error { - return fmt.Errorf("%[1]T(%[1]p): %w (b1:%v t1:%v / t2:%v b2:%v)", - c, e, - c.recentGhost.Len(), - c.recentLive.Len(), - c.frequentLive.Len(), - c.frequentGhost.Len()) +func (c *arc[K, V]) logf(format string, a ...any) { + c.t.Helper() + c.t.Logf("%[1]T(%[1]p): %s (b1:%v t1:%v p1:%v / p1:%v, t2:%v b2:%v)", + c, + fmt.Sprintf(format, a...), + c.recentGhost.Len, + c.recentLive.Len, + c.recentPinned.Len, + c.frequentPinned.Len, + c.frequentLive.Len, + c.frequentGhost.Len) } -func (c *ARCache[K, V]) check() { - if c.MaxLen < 2 { - panic(c.err(fmt.Errorf("MaxLen:%v < 2", c.MaxLen))) - } +func (c *arc[K, V]) fatalf(format string, a ...any) { + c.logf(format, a...) + c.t.FailNow() +} - if fullLen := c.fullLen(); fullLen > 2*c.MaxLen { - panic(c.err(fmt.Errorf("fullLen:%v > MaxLen:%v", fullLen, c.MaxLen))) +func (c *arc[K, V]) check() { + if c.noCheck { + return } + c.t.Helper() - if liveLen := c.liveLen(); liveLen > c.MaxLen { - panic(c.err(fmt.Errorf("liveLen:%v > MaxLen:%v", liveLen, c.MaxLen))) - } - if ghostLen := c.ghostLen(); ghostLen > c.MaxLen { - panic(c.err(fmt.Errorf("ghostLen:%v > MaxLen:%v", ghostLen, c.MaxLen))) - } - if recentLen := c.recentLen(); recentLen > c.MaxLen { - panic(c.err(fmt.Errorf("recentLen:%v > MaxLen:%v", recentLen, c.MaxLen))) + // Do the slow parts for 1/32 of all calls. + fullCheck := getRand(c.t, 32) == 0 + + // Check that the lists are in-sync with the maps. + if fullCheck { + liveEntries := make(map[*LinkedListEntry[arcLiveEntry[K, V]]]int, len(c.liveByName)) + for _, list := range c.liveLists { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + liveEntries[entry]++ + } + } + for _, entry := range c.liveByName { + liveEntries[entry]-- + if liveEntries[entry] == 0 { + delete(liveEntries, entry) + } + } + require.Len(c.t, liveEntries, 0) + + ghostEntries := make(map[*LinkedListEntry[arcGhostEntry[K]]]int, len(c.ghostByName)) + for _, list := range c.ghostLists { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + ghostEntries[entry]++ + } + } + for _, entry := range c.ghostByName { + ghostEntries[entry]-- + if ghostEntries[entry] == 0 { + delete(ghostEntries, entry) + } + } + require.Len(c.t, ghostEntries, 0) + } + + // Check the invariants. + + // from DBL(2c): + // + // • 0 ≤ |L₁|+|L₂| ≤ 2c + if fullLen := c.recentPinned.Len + c.recentLive.Len + c.recentGhost.Len + c.frequentPinned.Len + c.frequentLive.Len + c.frequentGhost.Len; fullLen < 0 || fullLen > 2*c.cap { + c.fatalf("! ( 0 <= fullLen:%v <= 2*cap:%v )", fullLen, c.cap) + } + // • 0 ≤ |L₁| ≤ c + if recentLen := c.recentPinned.Len + c.recentLive.Len + c.recentGhost.Len; recentLen < 0 || recentLen > c.cap { + c.fatalf("! ( 0 <= recentLen:%v <= cap:%v )", recentLen, c.cap) + } + // • 0 ≤ |L₂| ≤ 2c + if frequentLen := c.frequentPinned.Len + c.frequentLive.Len + c.frequentGhost.Len; frequentLen < 0 || frequentLen > 2*c.cap { + c.fatalf("! ( 0 <= frequentLen:%v <= 2*cap:%v )", frequentLen, c.cap) + } + // + // from Π(c): + // + // • A.1: The lists T₁, B₁, T₂, and B₂ are all mutually + // disjoint. + if fullCheck { + keys := make(map[K]int, len(c.liveByName)+len(c.ghostByName)) + for _, list := range c.liveLists { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + keys[entry.Value.key]++ + } + } + for _, list := range c.ghostLists { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + keys[entry.Value.key]++ + } + } + for key, cnt := range keys { + if cnt > 1 { + listNames := make([]string, 0, cnt) + for listName, list := range c.liveLists { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + if entry.Value.key == key { + listNames = append(listNames, listName) + } + } + } + for listName, list := range c.ghostLists { + for entry := list.Oldest; entry != nil; entry = entry.Newer { + if entry.Value.key == key { + listNames = append(listNames, listName) + } + } + } + sort.Strings(listNames) + c.fatalf("dup key: %v is in %v", key, listNames) + } + } } - if frequentLen := c.frequentLen(); frequentLen > c.MaxLen { - panic(c.err(fmt.Errorf("frequentLen:%v > MaxLen:%v", frequentLen, c.MaxLen))) + // • (not true) A.2: If |L₁|+|L₂| < c, then both B₁ and B₂ are + // empty. But supporting "delete" invalidates this! + // • (not true) A.3: If |L₁|+|L₂| ≥ c, then |T₁|+|T₂| = c. But + // supporting "delete" invalidates this! + // • A.4(a): Either (T₁ or B₁ is empty), or (the LRU page in T₁ + // is more recent than the MRU page in B₁). + // • A.4(b): Either (T₂ or B₂ is empty), or (the LRU page in T₂ + // is more recent than the MRU page in B₂). + // • A.5: |T₁∪T₂| is the set of pages that would be maintained + // by the cache policy π(c). + // + // from FRC(p, c): + // + // • 0 ≤ p ≤ c + if c.recentLiveTarget < 0 || c.recentLiveTarget > c.cap { + c.fatalf("! ( 0 <= p:%v <= cap:%v )", c.recentLiveTarget, c.cap) } } // Compatibility layer for hashicorp/golang-lru //////////////////////////////// +type lenFunc func() int + +func (fn lenFunc) Len() int { return fn() } + type arc[K comparable, V any] struct { - ARCache[K, V] - t1, t2 *lruCache[K, V] - b1, b2 *lruCache[K, struct{}] + *arCache[K, V] + ctx context.Context //nolint:containedctx // have no choice to keep the hashicorp-compatible API + t testing.TB + + t1, t2, b1, b2 lenFunc + + // For speeding up .check() + noCheck bool + liveLists map[string]*LinkedList[arcLiveEntry[K, V]] + ghostLists map[string]*LinkedList[arcGhostEntry[K]] } -func NewARC[K comparable, V any](size int) (*arc[K, V], error) { +func NewARC[K comparable, V any](t testing.TB, size int) (*arc[K, V], error) { + src := SourceFunc[K, V](func(context.Context, K, *V) {}) + _, isBench := t.(*testing.B) ret := &arc[K, V]{ - ARCache: ARCache[K, V]{ - MaxLen: size, - }, - } - ret.t1 = &ret.recentLive - ret.t2 = &ret.frequentLive - ret.b1 = &ret.recentGhost - ret.b2 = &ret.frequentGhost + ctx: dlog.NewTestContext(t, true), + t: t, + noCheck: isBench, + } + ret.init(size, src) return ret, nil } -func (c *arc[K, V]) Contains(k K) bool { return c.Has(k) } -func (c *arc[K, V]) Get(k K) (V, bool) { return c.Load(k) } -func (c *arc[K, V]) Add(k K, v V) { c.Store(k, v) } -func (c *arc[K, V]) Remove(k K) { c.Delete(k) } +func (c *arc[K, V]) init(size int, src Source[K, V]) { + c.arCache = NewARCache[K, V](size, src).(*arCache[K, V]) + c.t1 = lenFunc(func() int { return c.arCache.recentLive.Len }) + c.t2 = lenFunc(func() int { return c.arCache.frequentLive.Len }) + c.b1 = lenFunc(func() int { return c.arCache.recentGhost.Len }) + c.b2 = lenFunc(func() int { return c.arCache.frequentGhost.Len }) + + c.liveLists = map[string]*LinkedList[arcLiveEntry[K, V]]{ + "p1": &c.recentPinned, + "t1": &c.recentLive, + "p2": &c.frequentPinned, + "t2": &c.frequentLive, + } + c.ghostLists = map[string]*LinkedList[arcGhostEntry[K]]{ + "b1": &c.recentGhost, + "b2": &c.frequentGhost, + } +} + +// non-mutators + func (c *arc[K, V]) p() int { return c.recentLiveTarget } -func (c *arc[K, V]) Purge() { - c.mu.Lock() - defer c.mu.Unlock() - c.recentLive = lruCache[K, V]{} - c.recentGhost = lruCache[K, struct{}]{} - c.frequentLive = lruCache[K, V]{} - c.frequentGhost = lruCache[K, struct{}]{} - c.recentLiveTarget = 0 +func (c *arc[K, V]) Len() int { return len(c.liveByName) } +func (c *arc[K, V]) Contains(k K) bool { return c.liveByName[k] != nil } + +func (c *arc[K, V]) Peek(k K) (V, bool) { + entry := c.liveByName[k] + if entry == nil { + var zero V + return zero, false + } + return entry.Value.val, true } func (c *arc[K, V]) Keys() []K { - c.mu.RLock() - defer c.mu.RUnlock() - ret := make([]K, 0, c.Len()) - for entry := c.recentLive.byAge.oldest; entry != nil; entry = entry.newer { + ret := make([]K, 0, len(c.liveByName)) + for entry := c.recentLive.Oldest; entry != nil; entry = entry.Newer { ret = append(ret, entry.Value.key) } - for entry := c.frequentLive.byAge.oldest; entry != nil; entry = entry.newer { + for entry := c.frequentLive.Oldest; entry != nil; entry = entry.Newer { ret = append(ret, entry.Value.key) } return ret } +// mutators + +func (c *arc[K, V]) Remove(k K) { + defer c.check() + c.Delete(k) +} + +func (c *arc[K, V]) Purge() { + defer c.check() + c.init(c.cap, c.src) +} + +func (c *arc[K, V]) Get(k K) (V, bool) { + defer c.check() + if !c.Contains(k) { + var zero V + return zero, false + } + val := *c.Acquire(c.ctx, k) + c.Release(k) + return val, true +} + +func (c *arc[K, V]) Add(k K, v V) { + defer c.check() + ptr := c.Acquire(c.ctx, k) + *ptr = v + c.Release(k) +} + // Tests from hashicorp/golang-lru ///////////////////////////////////////////// -func getRand(tb testing.TB) int64 { - out, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt64)) +func getRand(tb testing.TB, limit int64) int64 { + out, err := rand.Int(rand.Reader, big.NewInt(limit)) if err != nil { tb.Fatal(err) } @@ -113,14 +272,14 @@ func getRand(tb testing.TB) int64 { } func BenchmarkARC_Rand(b *testing.B) { - l, err := NewARC[int64, int64](8192) + l, err := NewARC[int64, int64](b, 8192) if err != nil { b.Fatalf("err: %v", err) } trace := make([]int64, b.N*2) for i := 0; i < b.N*2; i++ { - trace[i] = getRand(b) % 32768 + trace[i] = getRand(b, 32768) } b.ResetTimer() @@ -141,7 +300,7 @@ func BenchmarkARC_Rand(b *testing.B) { } func BenchmarkARC_Freq(b *testing.B) { - l, err := NewARC[int64, int64](8192) + l, err := NewARC[int64, int64](b, 8192) if err != nil { b.Fatalf("err: %v", err) } @@ -149,9 +308,9 @@ func BenchmarkARC_Freq(b *testing.B) { trace := make([]int64, b.N*2) for i := 0; i < b.N*2; i++ { if i%2 == 0 { - trace[i] = getRand(b) % 16384 + trace[i] = getRand(b, 16384) } else { - trace[i] = getRand(b) % 32768 + trace[i] = getRand(b, 32768) } } @@ -234,7 +393,7 @@ func FuzzARC(f *testing.F) { func testARC_RandomOps(t *testing.T, ops []arcOp) { size := 128 - l, err := NewARC[int64, int64](128) + l, err := NewARC[int64, int64](t, 128) if err != nil { t.Fatalf("err: %v", err) } @@ -264,7 +423,7 @@ func testARC_RandomOps(t *testing.T, ops []arcOp) { func TestARC_Get_RecentToFrequent(t *testing.T) { t.Parallel() - l, err := NewARC[int, int](128) + l, err := NewARC[int, int](t, 128) if err != nil { t.Fatalf("err: %v", err) } @@ -309,7 +468,7 @@ func TestARC_Get_RecentToFrequent(t *testing.T) { func TestARC_Add_RecentToFrequent(t *testing.T) { t.Parallel() - l, err := NewARC[int, int](128) + l, err := NewARC[int, int](t, 128) if err != nil { t.Fatalf("err: %v", err) } @@ -344,7 +503,7 @@ func TestARC_Add_RecentToFrequent(t *testing.T) { func TestARC_Adaptive(t *testing.T) { t.Parallel() - l, err := NewARC[int, int](4) + l, err := NewARC[int, int](t, 4) if err != nil { t.Fatalf("err: %v", err) } @@ -353,13 +512,16 @@ func TestARC_Adaptive(t *testing.T) { for i := 0; i < 4; i++ { l.Add(i, i) } + require.Equal(t, `[ _0_ _1_ _2_ _3_ !^]___ ___ ___ ___ `, l.String()) if n := l.t1.Len(); n != 4 { t.Fatalf("bad: %d", n) } // Move to t2 l.Get(0) + require.Equal(t, ` ___[ _1_ _2_ _3_ !^ _0_ ]___ ___ ___ `, l.String()) l.Get(1) + require.Equal(t, ` ___ ___[ _2_ _3_ !^ _1_ _0_ ]___ ___ `, l.String()) if n := l.t2.Len(); n != 2 { t.Fatalf("bad: %d", n) } @@ -463,7 +625,7 @@ func TestARC_Adaptive(t *testing.T) { func TestARC(t *testing.T) { t.Parallel() - l, err := NewARC[int, int](128) + l, err := NewARC[int, int](t, 128) if err != nil { t.Fatalf("err: %v", err) } @@ -509,7 +671,7 @@ func TestARC(t *testing.T) { // Test that Contains doesn't update recent-ness func TestARC_Contains(t *testing.T) { t.Parallel() - l, err := NewARC[int, int](2) + l, err := NewARC[int, int](t, 2) if err != nil { t.Fatalf("err: %v", err) } @@ -529,7 +691,7 @@ func TestARC_Contains(t *testing.T) { // Test that Peek doesn't update recent-ness func TestARC_Peek(t *testing.T) { t.Parallel() - l, err := NewARC[int, int](2) + l, err := NewARC[int, int](t, 2) if err != nil { t.Fatalf("err: %v", err) } diff --git a/lib/containers/cache.go b/lib/containers/cache.go new file mode 100644 index 0000000..f38ff7a --- /dev/null +++ b/lib/containers/cache.go @@ -0,0 +1,64 @@ +// Copyright (C) 2023 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package containers + +import ( + "context" +) + +// A Source is something that a Cache sits in front of. +type Source[K comparable, V any] interface { + // Load updates a 'V' (which is reused across the lifetime of + // the cache, and may or may not be zero) to be set to the + // value for the 'K'. + Load(context.Context, K, *V) + + // Flush does whatever it needs to in order to ensure that if + // the program exited right now, no one would be upset. Flush + // being called does not mean that the entry is being evicted + // from the cache. + Flush(context.Context, *V) +} + +type Cache[K comparable, V any] interface { + // Acquire loads the value for `k` (possibly from the cache), + // records that value in to the cache, and increments the + // cache entry's in-use counter preventing it from being + // evicted. + // + // If the cache is at capacity and all entries are in-use, + // then Acquire blocks until an entry becomes available (via + // `Release`). + Acquire(context.Context, K) *V + + // Release decrements the in-use counter for the cache entry + // for `k`. If the in-use counter drops to 0, then that entry + // may be evicted. + // + // It is invalid (runtime-panic) to call Release for an entry + // that does not have a positive in-use counter. + Release(K) + + // Delete invalidates/removes an entry from the cache. Blocks + // until the in-user counter drops to 0. + // + // It is valid to call Delete on an entry that does not exist + // in the cache. + Delete(K) + + // Flush does whatever it needs to in order to ensure that if + // the program exited right now, no one would be upset. Flush + // does not empty the cache. + Flush(context.Context) +} + +// SourceFunc implements Source. Load calls the function, and Flush +// is a no-op. +type SourceFunc[K comparable, V any] func(context.Context, K, *V) + +var _ Source[int, string] = SourceFunc[int, string](nil) + +func (fn SourceFunc[K, V]) Load(ctx context.Context, k K, v *V) { fn(ctx, k, v) } +func (SourceFunc[K, V]) Flush(context.Context, *V) {} diff --git a/lib/containers/linkedlist.go b/lib/containers/linkedlist.go index 07b4760..c8e264a 100644 --- a/lib/containers/linkedlist.go +++ b/lib/containers/linkedlist.go @@ -5,12 +5,13 @@ package containers import ( - "git.lukeshu.com/go/typedsync" + "fmt" ) // LinkedListEntry [T] is an entry in a LinkedList [T]. type LinkedListEntry[T any] struct { - older, newer *LinkedListEntry[T] + List *LinkedList[T] + Older, Newer *LinkedListEntry[T] Value T } @@ -24,18 +25,17 @@ type LinkedListEntry[T any] struct { // of a linked-list, because these applications do not need such // features. // -// An advantage over `container/list.List` is that LinkedList -// maintains a Pool of entries, so churning through the list does not -// churn out garbage. However, LinkedList also has the disadvantages -// that it has fewer safety checks and fewer features in general. +// Compared to `containers/list.List`, LinkedList has the +// disadvantages that it has fewer safety checks and fewer features in +// general. type LinkedList[T any] struct { - oldest, newest *LinkedListEntry[T] - pool typedsync.Pool[*LinkedListEntry[T]] + Len int + Oldest, Newest *LinkedListEntry[T] } // IsEmpty returns whether the list empty or not. func (l *LinkedList[T]) IsEmpty() bool { - return l.oldest == nil + return l.Oldest == nil } // Delete removes an entry from the list. The entry is invalid once @@ -44,42 +44,50 @@ func (l *LinkedList[T]) IsEmpty() bool { // // It is invalid (runtime-panic) to call Delete on a nil entry. // -// It is invalid (corrupt the list) to call Delete on an entry that +// It is invalid (runtime-panic) to call Delete on an entry that // isn't in the list. func (l *LinkedList[T]) Delete(entry *LinkedListEntry[T]) { - if entry.newer == nil { - l.newest = entry.older + if entry.List != l { + panic(fmt.Errorf("LinkedList.Delete: entry %p not in list", entry)) + } + l.Len-- + if entry.Newer == nil { + l.Newest = entry.Older } else { - entry.newer.older = entry.older + entry.Newer.Older = entry.Older } - if entry.older == nil { - l.oldest = entry.newer + if entry.Older == nil { + l.Oldest = entry.Newer } else { - entry.older.newer = entry.newer + entry.Older.Newer = entry.Newer } - *entry = LinkedListEntry[T]{} // no memory leaks - l.pool.Put(entry) + // no memory leaks + entry.List = nil + entry.Older = nil + entry.Newer = nil } // Store appends a value to the "newest" end of the list, returning // the created entry. -func (l *LinkedList[T]) Store(val T) *LinkedListEntry[T] { - entry, ok := l.pool.Get() - if !ok { - entry = new(LinkedListEntry[T]) - } - *entry = LinkedListEntry[T]{ - older: l.newest, - Value: val, +// +// It is invalid (runtime-panic) to call Store on a nil entry. +// +// It is invalid (runtime-panic) to call Store on an entry that is +// already in a list. +func (l *LinkedList[T]) Store(entry *LinkedListEntry[T]) { + if entry.List != nil { + panic(fmt.Errorf("LinkedList.Store: entry %p is already in a list", entry)) } - l.newest = entry - if entry.older == nil { - l.oldest = entry + l.Len++ + entry.List = l + entry.Older = l.Newest + l.Newest = entry + if entry.Older == nil { + l.Oldest = entry } else { - entry.older.newer = entry + entry.Older.Newer = entry } - return entry } // MoveToNewest moves an entry fron any position in the list to the @@ -88,29 +96,26 @@ func (l *LinkedList[T]) Store(val T) *LinkedListEntry[T] { // // It is invalid (runtime-panic) to call MoveToNewest on a nil entry. // -// It is invalid (corrupt the list) to call MoveToNewest on an entry -// that isn't in the list. +// It is invalid (runtime-panic) to call MoveToNewest on an entry that +// isn't in the list. func (l *LinkedList[T]) MoveToNewest(entry *LinkedListEntry[T]) { - if entry.newer == nil { + if entry.List != l { + panic(fmt.Errorf("LinkedList.MoveToNewest: entry %p not in list", entry)) + } + if entry.Newer == nil { // Already newest. return } - entry.newer.older = entry.older - if entry.older == nil { - l.oldest = entry.newer + entry.Newer.Older = entry.Older + if entry.Older == nil { + l.Oldest = entry.Newer } else { - entry.older.newer = entry.newer + entry.Older.Newer = entry.Newer } - entry.older = l.newest - l.newest.newer = entry - - entry.newer = nil - l.newest = entry -} + entry.Older = l.Newest + l.Newest.Newer = entry -// Oldest returns the entry at the "oldest" end of the list, or nil if -// the list is empty. -func (l *LinkedList[T]) Oldest() *LinkedListEntry[T] { - return l.oldest + entry.Newer = nil + l.Newest = entry } diff --git a/lib/containers/lrucache.go b/lib/containers/lrucache.go index 94094b9..d2aff41 100644 --- a/lib/containers/lrucache.go +++ b/lib/containers/lrucache.go @@ -4,112 +4,209 @@ package containers +import ( + "context" + "fmt" + "sync" +) + +// NewLRUCache returns a new thread-safe Cache with a simple +// Least-Recently-Used eviction policy. +// +// It is invalid (runtime-panic) to call NewLRUCache with a +// non-positive capacity or a nil source. +// +//nolint:predeclared // 'cap' is the best name for it. +func NewLRUCache[K comparable, V any](cap int, src Source[K, V]) Cache[K, V] { + if cap <= 0 { + panic(fmt.Errorf("containers.NewLRUCache: invalid capacity: %v", cap)) + } + if src == nil { + panic(fmt.Errorf("containers.NewLRUCache: nil source")) + } + ret := &lruCache[K, V]{ + cap: cap, + src: src, + + byName: make(map[K]*LinkedListEntry[lruEntry[K, V]], cap), + } + for i := 0; i < cap; i++ { + ret.unused.Store(new(LinkedListEntry[lruEntry[K, V]])) + } + return ret +} + type lruEntry[K comparable, V any] struct { key K val V + + refs int + del chan struct{} // non-nil if a delete is waiting on .refs to drop to zero } -// lruCache is a NON-thread-safe cache with Least Recently Used -// eviction. -// -// lruCache is non-thread-safe and unexported because it only exists -// for internal use by the more sophisticated ARCache. -// -// Similarly, it does not implement some common features of an LRU -// cache, such as specifying a maximum size (instead requiring the -// `.EvictOldest` method to be called), because it is only meant for -// use by the ARCache; which does not need such features. type lruCache[K comparable, V any] struct { - // OnRemove is (if non-nil) called *after* removal whenever - // an entry is removed, for any reason: - // - // - evicted by .EvictOldest() - // - replaced by .Store(k, v) - // - deleted by .Delete(k) - OnRemove func(K, V) - // OnEvict is (if non-nil) called *after* removal whenever an - // entry is evicted by .EvictOldest(). If both OnEvict and - // OnRemove are set, OnRemove is called first. - OnEvict func(K, V) - - byAge LinkedList[lruEntry[K, V]] - byName map[K]*LinkedListEntry[lruEntry[K, V]] + cap int + src Source[K, V] + + mu sync.Mutex + + // Pinned entries are in .byName, but not in any LinkedList. + unused LinkedList[lruEntry[K, V]] + evictable LinkedList[lruEntry[K, V]] // only entries with .refs==0 + byName map[K]*LinkedListEntry[lruEntry[K, V]] + + waiters LinkedList[chan struct{}] } -var _ Map[int, string] = (*lruCache[int, string])(nil) +// Blocking primitives ///////////////////////////////////////////////////////// -func (c *lruCache[K, V]) rem(entry *LinkedListEntry[lruEntry[K, V]]) { - k, v := entry.Value.key, entry.Value.val - delete(c.byName, entry.Value.key) - c.byAge.Delete(entry) - if c.OnRemove != nil { - c.OnRemove(k, v) +// Because of pinning, there might not actually be an available entry +// for us to use/evict. If we need an entry to use or evict, we'll +// call waitForAvail to block until there is en entry that is either +// unused or evictable. We'll give waiters FIFO priority. +func (c *lruCache[K, V]) waitForAvail() { + if !(c.unused.IsEmpty() && c.evictable.IsEmpty()) { + return } + ch := make(chan struct{}) + c.waiters.Store(&LinkedListEntry[chan struct{}]{Value: ch}) + c.mu.Unlock() + <-ch + c.mu.Lock() } -func (c *lruCache[K, V]) evict(entry *LinkedListEntry[lruEntry[K, V]]) { - k, v := entry.Value.key, entry.Value.val - c.rem(entry) - if c.OnEvict != nil { - c.OnEvict(k, v) +// notifyAvail is called when an entry becomes unused or evictable, +// and wakes up the highest-priority .waitForAvail() waiter (if there +// is one). +func (c *lruCache[K, V]) notifyAvail() { + waiter := c.waiters.Oldest + if waiter == nil { + return } + c.waiters.Delete(waiter) + close(waiter.Value) } -// EvictOldest deletes the oldest entry in the cache. -// -// It is a panic to call EvictOldest if the cache is empty. -func (c *lruCache[K, V]) EvictOldest() { - c.evict(c.byAge.Oldest()) +// Calling .Delete(k) on an entry that is pinned needs to block until +// the entry is no longer pinned. +func (c *lruCache[K, V]) unlockAndWaitForDel(entry *LinkedListEntry[lruEntry[K, V]]) { + if entry.Value.del == nil { + entry.Value.del = make(chan struct{}) + } + ch := entry.Value.del + c.mu.Unlock() + <-ch } -// Store a key/value pair in to the cache. -func (c *lruCache[K, V]) Store(k K, v V) { - if c.byName == nil { - c.byName = make(map[K]*LinkedListEntry[lruEntry[K, V]]) - } else if old, ok := c.byName[k]; ok { - c.rem(old) +// notifyOfDel unblocks any calls to .Delete(k), notifying them that +// the entry has been deleted and they can now return. +func (*lruCache[K, V]) notifyOfDel(entry *LinkedListEntry[lruEntry[K, V]]) { + if entry.Value.del != nil { + close(entry.Value.del) + entry.Value.del = nil } - c.byName[k] = c.byAge.Store(lruEntry[K, V]{key: k, val: v}) } -// Load an entry from the cache, recording a "use" for the purposes of -// "least-recently-used" eviction. -func (c *lruCache[K, V]) Load(k K) (v V, ok bool) { - entry, ok := c.byName[k] - if !ok { - var zero V - return zero, false +// Main implementation ///////////////////////////////////////////////////////// + +// lruReplace is the LRU(c) replacement policy. It returns an entry +// that is not in any list. +func (c *lruCache[K, V]) lruReplace() *LinkedListEntry[lruEntry[K, V]] { + c.waitForAvail() + + // If the cache isn't full, no need to do an eviction. + if entry := c.unused.Oldest; entry != nil { + c.unused.Delete(entry) + return entry } - c.byAge.MoveToNewest(entry) - return entry.Value.val, true + + // Replace the oldest entry. + entry := c.evictable.Oldest + c.evictable.Delete(entry) + delete(c.byName, entry.Value.key) + return entry } -// Peek is like Load, but doesn't count as a "use" for -// "least-recently-used". -func (c *lruCache[K, V]) Peek(k K) (v V, ok bool) { - entry, ok := c.byName[k] - if !ok { - var zero V - return zero, false +// Acquire implements the 'Cache' interface. +func (c *lruCache[K, V]) Acquire(ctx context.Context, k K) *V { + c.mu.Lock() + defer c.mu.Unlock() + + entry := c.byName[k] + if entry != nil { + if entry.Value.refs == 0 { + c.evictable.Delete(entry) + } + entry.Value.refs++ + } else { + entry = c.lruReplace() + + entry.Value.key = k + c.src.Load(ctx, k, &entry.Value.val) + entry.Value.refs = 1 + + c.byName[k] = entry } - return entry.Value.val, true -} -// Has returns whether an entry is present in the cache. Calling Has -// does not count as a "use" for "least-recently-used". -func (c *lruCache[K, V]) Has(k K) bool { - _, ok := c.byName[k] - return ok + return &entry.Value.val } -// Delete an entry from the cache. +// Delete implements the 'Cache' interface. func (c *lruCache[K, V]) Delete(k K) { - if entry, ok := c.byName[k]; ok { - c.rem(entry) + c.mu.Lock() + + entry := c.byName[k] + if entry == nil { + return + } + if entry.Value.refs > 0 { + // Let .Release(k) do the deletion when the + // refcount drops to 0. + c.unlockAndWaitForDel(entry) + return + } + delete(c.byName, k) + c.evictable.Delete(entry) + c.unused.Store(entry) + + // No need to call c.notifyAvail(); if we were able to delete + // it, it was already available. + + c.mu.Unlock() +} + +// Release implements the 'Cache' interface. +func (c *lruCache[K, V]) Release(k K) { + c.mu.Lock() + defer c.mu.Unlock() + + entry := c.byName[k] + if entry == nil || entry.Value.refs <= 0 { + panic(fmt.Errorf("containers.lruCache.Release called on key that is not held: %v", k)) + } + + entry.Value.refs-- + if entry.Value.refs == 0 { + if entry.Value.del != nil { + delete(c.byName, k) + c.unused.Store(entry) + c.notifyOfDel(entry) + } else { + c.evictable.Store(entry) + } + c.notifyAvail() } } -// Len returns the number of entries in the cache. -func (c *lruCache[K, V]) Len() int { - return len(c.byName) +// Flush implements the 'Cache' interface. +func (c *lruCache[K, V]) Flush(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, entry := range c.byName { + c.src.Flush(ctx, &entry.Value.val) + } + for entry := c.unused.Oldest; entry != nil; entry = entry.Newer { + c.src.Flush(ctx, &entry.Value.val) + } } diff --git a/lib/containers/lrucache_test.go b/lib/containers/lrucache_test.go index f04df99..4918303 100644 --- a/lib/containers/lrucache_test.go +++ b/lib/containers/lrucache_test.go @@ -5,67 +5,97 @@ package containers import ( - "runtime/debug" + "context" "testing" + "time" + "github.com/datawire/dlib/dlog" "github.com/stretchr/testify/assert" ) +func TestLRUBlocking(t *testing.T) { + t.Parallel() + const tick = time.Second / 2 + + ctx := dlog.NewTestContext(t, false) + + cache := NewLRUCache[int, int](4, + SourceFunc[int, int](func(_ context.Context, k int, v *int) { *v = k * k })) + + assert.Equal(t, 1, *cache.Acquire(ctx, 1)) + assert.Equal(t, 4, *cache.Acquire(ctx, 2)) + assert.Equal(t, 9, *cache.Acquire(ctx, 3)) + assert.Equal(t, 16, *cache.Acquire(ctx, 4)) + + ch := make(chan int) + start := time.Now() + go func() { + ch <- *cache.Acquire(ctx, 5) + }() + go func() { + time.Sleep(tick) + cache.Release(3) + }() + result := <-ch + dur := time.Since(start) + assert.Equal(t, 25, result) + assert.Greater(t, dur, tick) +} + //nolint:paralleltest // Can't be parallel because we test testing.AllocsPerRun. -func TestLRU(t *testing.T) { +func TestLRUAllocs(t *testing.T) { const ( cacheLen = 8 bigNumber = 128 ) + + ctx := dlog.NewTestContext(t, false) + evictions := 0 - cache := &lruCache[int, int]{ - OnEvict: func(_, _ int) { + cache := NewLRUCache[int, int](cacheLen, SourceFunc[int, int](func(_ context.Context, k int, v *int) { + if *v > 0 { evictions++ - }, - } - i := 0 - store := func() { - for cache.Len()+1 > cacheLen { - cache.EvictOldest() } - cache.Store(i, i) + *v = k + })) + + i := 1 + store := func() { + cache.Acquire(ctx, i) + cache.Release(i) i++ } - // Disable the GC temporarily to prevent cache.byAge.pool from - // being cleaned in the middle of an AllocsPerRun and causing - // spurious allocations. - percent := debug.SetGCPercent(-1) - defer debug.SetGCPercent(percent) - - // 1 alloc each as we fill the cache - assert.Equal(t, float64(1), testing.AllocsPerRun(cacheLen-1, store)) - assert.Equal(t, 0, evictions) - // after that, it should be alloc-free - assert.Equal(t, float64(0), testing.AllocsPerRun(1, store)) + // it should be alloc-free after construction + assert.Equal(t, float64(0), testing.AllocsPerRun(cacheLen+1, store)) assert.Equal(t, 2, evictions) assert.Equal(t, float64(0), testing.AllocsPerRun(bigNumber, store)) assert.Equal(t, 3+bigNumber, evictions) // check the len - assert.Equal(t, cacheLen, len(cache.byName)) + assert.Equal(t, cacheLen, len(cache.(*lruCache[int, int]).byName)) + assert.Equal(t, cacheLen, cache.(*lruCache[int, int]).evictable.Len) cnt := 0 - for entry := cache.byAge.newest; entry != nil; entry = entry.older { + for entry := cache.(*lruCache[int, int]).evictable.Oldest; entry != nil; entry = entry.Newer { cnt++ } assert.Equal(t, cacheLen, cnt) cnt = 0 - for entry := cache.byAge.oldest; entry != nil; entry = entry.newer { + for entry := cache.(*lruCache[int, int]).evictable.Newest; entry != nil; entry = entry.Older { cnt++ } assert.Equal(t, cacheLen, cnt) // check contents cnt = 0 - for j := i - 1; j >= 0; j-- { + for j := i - 1; j > 0; j-- { + entry, ok := cache.(*lruCache[int, int]).byName[j] if cnt < cacheLen { - assert.True(t, cache.Has(j), j) + if assert.True(t, ok, j) { + val := entry.Value.val + assert.Equal(t, j, val, j) + } cnt++ } else { - assert.False(t, cache.Has(j), j) + assert.False(t, ok, j) } } } diff --git a/lib/diskio/file_blockbuf.go b/lib/diskio/file_blockbuf.go index 0bb3156..8c0fec2 100644 --- a/lib/diskio/file_blockbuf.go +++ b/lib/diskio/file_blockbuf.go @@ -5,63 +5,75 @@ package diskio import ( + "context" "sync" - "git.lukeshu.com/go/typedsync" + "github.com/datawire/dlib/dlog" "git.lukeshu.com/btrfs-progs-ng/lib/containers" ) -type bufferedBlock struct { +type bufferedBlock[A ~int64] struct { + Mu sync.RWMutex + Addr A + Dirty bool + Dat []byte Err error } type bufferedFile[A ~int64] struct { + ctx context.Context //nolint:containedctx // don't have an option while keeping the io.ReaderAt/io.WriterAt API inner File[A] - mu sync.RWMutex blockSize A - blockCache containers.ARCache[A, bufferedBlock] - blockPool typedsync.Pool[[]byte] + blockCache containers.Cache[A, bufferedBlock[A]] } var _ File[assertAddr] = (*bufferedFile[assertAddr])(nil) -func NewBufferedFile[A ~int64](file File[A], blockSize A, cacheSize int) *bufferedFile[A] { +func NewBufferedFile[A ~int64](ctx context.Context, file File[A], blockSize A, cacheSize int) *bufferedFile[A] { ret := &bufferedFile[A]{ + ctx: ctx, inner: file, blockSize: blockSize, - blockCache: containers.ARCache[A, bufferedBlock]{ - MaxLen: cacheSize, - }, } - ret.blockPool.New = ret.malloc - ret.blockCache.OnRemove = ret.free - ret.blockCache.New = ret.readBlock + ret.blockCache = containers.NewARCache[A, bufferedBlock[A]](cacheSize, bufferedBlockSource[A]{ret}) return ret } -func (bf *bufferedFile[A]) malloc() []byte { - return make([]byte, bf.blockSize) +type bufferedBlockSource[A ~int64] struct { + bf *bufferedFile[A] } -func (bf *bufferedFile[A]) free(_ A, buf bufferedBlock) { - bf.blockPool.Put(buf.Dat) +func (src bufferedBlockSource[A]) Flush(ctx context.Context, block *bufferedBlock[A]) { + if !block.Dirty { + return + } + if _, err := src.bf.inner.WriteAt(block.Dat, block.Addr); err != nil { + dlog.Errorf(ctx, "i/o error: write: %v", err) + } + block.Dirty = false } -func (bf *bufferedFile[A]) readBlock(blockOffset A) bufferedBlock { - dat, _ := bf.blockPool.Get() - n, err := bf.inner.ReadAt(dat, blockOffset) - return bufferedBlock{ - Dat: dat[:n], - Err: err, +func (src bufferedBlockSource[A]) Load(ctx context.Context, blockAddr A, block *bufferedBlock[A]) { + src.Flush(ctx, block) + if block.Dat == nil { + block.Dat = make([]byte, src.bf.blockSize) } + n, err := src.bf.inner.ReadAt(block.Dat[:src.bf.blockSize], blockAddr) + block.Addr = blockAddr + block.Dat = block.Dat[:n] + block.Err = err } func (bf *bufferedFile[A]) Name() string { return bf.inner.Name() } func (bf *bufferedFile[A]) Size() A { return bf.inner.Size() } func (bf *bufferedFile[A]) Close() error { return bf.inner.Close() } +func (bf *bufferedFile[A]) Flush() { + bf.blockCache.Flush(bf.ctx) +} + func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) { done := 0 for done < len(dat) { @@ -75,11 +87,14 @@ func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) { } func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error) { - bf.mu.RLock() - defer bf.mu.RUnlock() offsetWithinBlock := off % bf.blockSize blockOffset := off - offsetWithinBlock - cachedBlock, _ := bf.blockCache.Load(blockOffset) + + cachedBlock := bf.blockCache.Acquire(bf.ctx, blockOffset) + defer bf.blockCache.Release(blockOffset) + cachedBlock.Mu.RLock() + defer cachedBlock.Mu.RUnlock() + n = copy(dat, cachedBlock.Dat[offsetWithinBlock:]) if n < len(dat) { return n, cachedBlock.Err @@ -88,16 +103,30 @@ func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error } func (bf *bufferedFile[A]) WriteAt(dat []byte, off A) (n int, err error) { - bf.mu.Lock() - defer bf.mu.Unlock() + done := 0 + for done < len(dat) { + n, err := bf.maybeShortWriteAt(dat[done:], off+A(done)) + done += n + if err != nil { + return done, err + } + } + return done, nil +} - // Do the work - n, err = bf.inner.WriteAt(dat, off) +func (bf *bufferedFile[A]) maybeShortWriteAt(dat []byte, off A) (n int, err error) { + offsetWithinBlock := off % bf.blockSize + blockOffset := off - offsetWithinBlock - // Cache invalidation - for blockOffset := off - (off % bf.blockSize); blockOffset < off+A(n); blockOffset += bf.blockSize { - bf.blockCache.Delete(blockOffset) - } + cachedBlock := bf.blockCache.Acquire(bf.ctx, blockOffset) + defer bf.blockCache.Release(blockOffset) + cachedBlock.Mu.Lock() + defer cachedBlock.Mu.Unlock() - return n, err + cachedBlock.Dirty = true + n = copy(cachedBlock.Dat[offsetWithinBlock:], dat) + if n < len(dat) { + return n, cachedBlock.Err + } + return n, nil } diff --git a/lib/diskio/file_state_test.go b/lib/diskio/file_state_test.go index 32ca705..b0cc6a7 100644 --- a/lib/diskio/file_state_test.go +++ b/lib/diskio/file_state_test.go @@ -9,6 +9,8 @@ import ( "testing" "testing/iotest" + "github.com/datawire/dlib/dlog" + "git.lukeshu.com/btrfs-progs-ng/lib/diskio" ) @@ -50,7 +52,8 @@ func FuzzStatefulBufferedReader(f *testing.F) { Reader: bytes.NewReader(content), name: t.Name(), } - file = diskio.NewBufferedFile[int64](file, 4, 2) + ctx := dlog.NewTestContext(t, false) + file = diskio.NewBufferedFile[int64](ctx, file, 4, 2) reader := diskio.NewStatefulFile[int64](file) if err := iotest.TestReader(reader, content); err != nil { t.Error(err) -- cgit v1.2.3-54-g00ecf