From 5b57069df62182b23d5db38180df345fc3c86a85 Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Sun, 26 Mar 2023 10:47:43 -0600 Subject: Move more things --- lib/diskio/file_blockbuf.go | 98 +++++++++++++++++++++++++++---------------- lib/diskio/file_state_test.go | 5 ++- 2 files changed, 67 insertions(+), 36 deletions(-) (limited to 'lib/diskio') diff --git a/lib/diskio/file_blockbuf.go b/lib/diskio/file_blockbuf.go index 0bb3156..580e55a 100644 --- a/lib/diskio/file_blockbuf.go +++ b/lib/diskio/file_blockbuf.go @@ -5,63 +5,74 @@ package diskio import ( + "context" "sync" - "git.lukeshu.com/go/typedsync" + "github.com/datawire/dlib/dlog" - "git.lukeshu.com/btrfs-progs-ng/lib/containers" + "git.lukeshu.com/btrfs-progs-ng/lib/caching" ) -type bufferedBlock struct { +type bufferedBlock[A ~int64] struct { + Mu sync.RWMutex + Addr A + Dirty bool + Dat []byte Err error } type bufferedFile[A ~int64] struct { + ctx context.Context inner File[A] - mu sync.RWMutex blockSize A - blockCache containers.ARCache[A, bufferedBlock] - blockPool typedsync.Pool[[]byte] + blockCache caching.Cache[A, bufferedBlock[A]] } var _ File[assertAddr] = (*bufferedFile[assertAddr])(nil) -func NewBufferedFile[A ~int64](file File[A], blockSize A, cacheSize int) *bufferedFile[A] { +func NewBufferedFile[A ~int64](ctx context.Context, file File[A], blockSize A, cacheSize int) *bufferedFile[A] { ret := &bufferedFile[A]{ inner: file, blockSize: blockSize, - blockCache: containers.ARCache[A, bufferedBlock]{ - MaxLen: cacheSize, - }, } - ret.blockPool.New = ret.malloc - ret.blockCache.OnRemove = ret.free - ret.blockCache.New = ret.readBlock + ret.blockCache = caching.NewLRUCache[A, bufferedBlock[A]](cacheSize, bufferedBlockSource[A]{ret}) return ret } -func (bf *bufferedFile[A]) malloc() []byte { - return make([]byte, bf.blockSize) +type bufferedBlockSource[A ~int64] struct { + bf *bufferedFile[A] } -func (bf *bufferedFile[A]) free(_ A, buf bufferedBlock) { - bf.blockPool.Put(buf.Dat) +func (src bufferedBlockSource[A]) Flush(ctx context.Context, block *bufferedBlock[A]) { + if !block.Dirty { + return + } + if _, err := src.bf.inner.WriteAt(block.Dat, block.Addr); err != nil { + dlog.Errorf(src.bf.ctx, "i/o error: write: %v", err) + } + block.Dirty = false } -func (bf *bufferedFile[A]) readBlock(blockOffset A) bufferedBlock { - dat, _ := bf.blockPool.Get() - n, err := bf.inner.ReadAt(dat, blockOffset) - return bufferedBlock{ - Dat: dat[:n], - Err: err, +func (src bufferedBlockSource[A]) Load(ctx context.Context, blockAddr A, block *bufferedBlock[A]) { + src.Flush(ctx, block) + if block.Dat == nil { + block.Dat = make([]byte, src.bf.blockSize) } + n, err := src.bf.inner.ReadAt(block.Dat[:src.bf.blockSize], blockAddr) + block.Addr = blockAddr + block.Dat = block.Dat[:n] + block.Err = err } func (bf *bufferedFile[A]) Name() string { return bf.inner.Name() } func (bf *bufferedFile[A]) Size() A { return bf.inner.Size() } func (bf *bufferedFile[A]) Close() error { return bf.inner.Close() } +func (bf *bufferedFile[A]) Flush() { + bf.blockCache.Flush(bf.ctx) +} + func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) { done := 0 for done < len(dat) { @@ -75,11 +86,14 @@ func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) { } func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error) { - bf.mu.RLock() - defer bf.mu.RUnlock() offsetWithinBlock := off % bf.blockSize blockOffset := off - offsetWithinBlock - cachedBlock, _ := bf.blockCache.Load(blockOffset) + + cachedBlock := bf.blockCache.Acquire(bf.ctx, blockOffset) + defer bf.blockCache.Release(blockOffset) + cachedBlock.Mu.RLock() + defer cachedBlock.Mu.RUnlock() + n = copy(dat, cachedBlock.Dat[offsetWithinBlock:]) if n < len(dat) { return n, cachedBlock.Err @@ -88,16 +102,30 @@ func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error } func (bf *bufferedFile[A]) WriteAt(dat []byte, off A) (n int, err error) { - bf.mu.Lock() - defer bf.mu.Unlock() + done := 0 + for done < len(dat) { + n, err := bf.maybeShortWriteAt(dat[done:], off+A(done)) + done += n + if err != nil { + return done, err + } + } + return done, nil +} - // Do the work - n, err = bf.inner.WriteAt(dat, off) +func (bf *bufferedFile[A]) maybeShortWriteAt(dat []byte, off A) (n int, err error) { + offsetWithinBlock := off % bf.blockSize + blockOffset := off - offsetWithinBlock - // Cache invalidation - for blockOffset := off - (off % bf.blockSize); blockOffset < off+A(n); blockOffset += bf.blockSize { - bf.blockCache.Delete(blockOffset) - } + cachedBlock := bf.blockCache.Acquire(bf.ctx, blockOffset) + defer bf.blockCache.Release(blockOffset) + cachedBlock.Mu.Lock() + defer cachedBlock.Mu.Unlock() - return n, err + cachedBlock.Dirty = true + n = copy(cachedBlock.Dat[offsetWithinBlock:], dat) + if n < len(dat) { + return n, cachedBlock.Err + } + return n, nil } diff --git a/lib/diskio/file_state_test.go b/lib/diskio/file_state_test.go index 32ca705..b0cc6a7 100644 --- a/lib/diskio/file_state_test.go +++ b/lib/diskio/file_state_test.go @@ -9,6 +9,8 @@ import ( "testing" "testing/iotest" + "github.com/datawire/dlib/dlog" + "git.lukeshu.com/btrfs-progs-ng/lib/diskio" ) @@ -50,7 +52,8 @@ func FuzzStatefulBufferedReader(f *testing.F) { Reader: bytes.NewReader(content), name: t.Name(), } - file = diskio.NewBufferedFile[int64](file, 4, 2) + ctx := dlog.NewTestContext(t, false) + file = diskio.NewBufferedFile[int64](ctx, file, 4, 2) reader := diskio.NewStatefulFile[int64](file) if err := iotest.TestReader(reader, content); err != nil { t.Error(err) -- cgit v1.2.3-54-g00ecf