diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/btrfs/btrfssum/shortsum.go | 10 | ||||
-rw-r--r-- | lib/btrfsprogs/btrfsinspect/mount.go | 5 | ||||
-rw-r--r-- | lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go | 3 | ||||
-rw-r--r-- | lib/containers/set.go | 4 | ||||
-rw-r--r-- | lib/containers/syncmap.go | 53 | ||||
-rw-r--r-- | lib/profile/cobra.go | 109 | ||||
-rw-r--r-- | lib/profile/profile.go | 89 | ||||
-rw-r--r-- | lib/streamio/runescanner.go | 106 | ||||
-rw-r--r-- | lib/textui/log.go | 62 | ||||
-rw-r--r-- | lib/textui/progress.go | 12 |
10 files changed, 362 insertions, 91 deletions
diff --git a/lib/btrfs/btrfssum/shortsum.go b/lib/btrfs/btrfssum/shortsum.go index c3d6f8b..e3441ae 100644 --- a/lib/btrfs/btrfssum/shortsum.go +++ b/lib/btrfs/btrfssum/shortsum.go @@ -248,7 +248,7 @@ func (sg SumRunWithGaps[Addr]) EncodeJSON(w io.Writer) error { } fallthrough default: - if err := lowmemjson.Encode(w, run); err != nil { + if err := lowmemjson.NewEncoder(w).Encode(run); err != nil { return err } cur = run.Addr.Add(run.Size()) @@ -274,18 +274,18 @@ func (sg *SumRunWithGaps[Addr]) DecodeJSON(r io.RuneScanner) error { var name string return lowmemjson.DecodeObject(r, func(r io.RuneScanner) error { - return lowmemjson.Decode(r, &name) + return lowmemjson.NewDecoder(r).Decode(&name) }, func(r io.RuneScanner) error { switch name { case "Addr": - return lowmemjson.Decode(r, &sg.Addr) + return lowmemjson.NewDecoder(r).Decode(&sg.Addr) case "Size": - return lowmemjson.Decode(r, &sg.Size) + return lowmemjson.NewDecoder(r).Decode(&sg.Size) case "Runs": return lowmemjson.DecodeArray(r, func(r io.RuneScanner) error { var run SumRun[Addr] - if err := lowmemjson.Decode(r, &run); err != nil { + if err := lowmemjson.NewDecoder(r).Decode(&run); err != nil { return err } if run.ChecksumSize > 0 { diff --git a/lib/btrfsprogs/btrfsinspect/mount.go b/lib/btrfsprogs/btrfsinspect/mount.go index ee3c0ec..0ac8497 100644 --- a/lib/btrfsprogs/btrfsinspect/mount.go +++ b/lib/btrfsprogs/btrfsinspect/mount.go @@ -14,6 +14,7 @@ import ( "sync/atomic" "syscall" + "git.lukeshu.com/go/typedsync" "github.com/datawire/dlib/dcontext" "github.com/datawire/dlib/dgroup" "github.com/datawire/dlib/dlog" @@ -109,8 +110,8 @@ type subvolume struct { fuseutil.NotImplementedFileSystem lastHandle uint64 - dirHandles containers.SyncMap[fuseops.HandleID, *dirState] - fileHandles containers.SyncMap[fuseops.HandleID, *fileState] + dirHandles typedsync.Map[fuseops.HandleID, *dirState] + fileHandles typedsync.Map[fuseops.HandleID, *fileState] subvolMu sync.Mutex subvols containers.Set[string] diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go index ff6b1c5..45a5bb2 100644 --- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go +++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go @@ -7,6 +7,7 @@ package btrees import ( "context" + "git.lukeshu.com/go/typedsync" "github.com/datawire/dlib/dlog" "git.lukeshu.com/btrfs-progs-ng/lib/btrfs/btrfsitem" @@ -62,7 +63,7 @@ type RebuiltForrest struct { cbLookupUUID func(ctx context.Context, uuid btrfsprim.UUID) (id btrfsprim.ObjID, ok bool) // mutable - trees containers.SyncMap[btrfsprim.ObjID, *RebuiltTree] + trees typedsync.Map[btrfsprim.ObjID, *RebuiltTree] leafs *containers.LRUCache[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]] incItems *containers.LRUCache[btrfsprim.ObjID, *itemIndex] excItems *containers.LRUCache[btrfsprim.ObjID, *itemIndex] diff --git a/lib/containers/set.go b/lib/containers/set.go index 4fc8aad..b2af494 100644 --- a/lib/containers/set.go +++ b/lib/containers/set.go @@ -71,7 +71,7 @@ func (o Set[T]) EncodeJSON(w io.Writer) error { return less(keys[i], keys[j]) }) - return lowmemjson.Encode(w, keys) + return lowmemjson.NewEncoder(w).Encode(keys) } func (o *Set[T]) DecodeJSON(r io.RuneScanner) error { @@ -87,7 +87,7 @@ func (o *Set[T]) DecodeJSON(r io.RuneScanner) error { *o = Set[T]{} return lowmemjson.DecodeArray(r, func(r io.RuneScanner) error { var val T - if err := lowmemjson.Decode(r, &val); err != nil { + if err := lowmemjson.NewDecoder(r).Decode(&val); err != nil { return err } (*o)[val] = struct{}{} diff --git a/lib/containers/syncmap.go b/lib/containers/syncmap.go deleted file mode 100644 index 74da4b3..0000000 --- a/lib/containers/syncmap.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (C) 2022-2023 Luke Shumaker <lukeshu@lukeshu.com> -// -// SPDX-License-Identifier: GPL-2.0-or-later - -package containers - -import ( - "sync" -) - -type SyncMap[K comparable, V any] struct { - inner sync.Map -} - -func (m *SyncMap[K, V]) Delete(key K) { - m.inner.Delete(key) -} - -func (m *SyncMap[K, V]) Load(key K) (value V, ok bool) { - _value, ok := m.inner.Load(key) - if ok { - //nolint:forcetypeassert // Typed wrapper around untyped lib. - value = _value.(V) - } - return value, ok -} - -func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool) { - _value, ok := m.inner.LoadAndDelete(key) - if ok { - //nolint:forcetypeassert // Typed wrapper around untyped lib. - value = _value.(V) - } - return value, ok -} - -func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { - _actual, loaded := m.inner.LoadOrStore(key, value) - //nolint:forcetypeassert // Typed wrapper around untyped lib. - actual = _actual.(V) - return actual, loaded -} - -func (m *SyncMap[K, V]) Range(f func(key K, value V) bool) { - m.inner.Range(func(key, value any) bool { - //nolint:forcetypeassert // Typed wrapper around untyped lib. - return f(key.(K), value.(V)) - }) -} - -func (m *SyncMap[K, V]) Store(key K, value V) { - m.inner.Store(key, value) -} diff --git a/lib/profile/cobra.go b/lib/profile/cobra.go new file mode 100644 index 0000000..3094015 --- /dev/null +++ b/lib/profile/cobra.go @@ -0,0 +1,109 @@ +// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com> +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package profile + +import ( + "io" + "os" + + "github.com/datawire/dlib/derror" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +type flagSet struct { + shutdown []StopFunc +} + +func (fs *flagSet) Stop() error { + var errs derror.MultiError + for _, fn := range fs.shutdown { + if err := fn(); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errs + } + return nil +} + +type flagValue struct { + parent *flagSet + start startFunc + curVal string +} + +var _ pflag.Value = (*flagValue)(nil) + +// String implements pflag.Value. +func (fv *flagValue) String() string { return fv.curVal } + +// Set implements pflag.Value. +func (fv *flagValue) Set(filename string) error { + if filename == "" { + return nil + } + w, err := os.Create(filename) + if err != nil { + return err + } + shutdown, err := fv.start(w) + if err != nil { + return err + } + fv.curVal = filename + fv.parent.shutdown = append(fv.parent.shutdown, func() error { + err1 := shutdown() + err2 := w.Close() + if err1 != nil { + return err1 + } + return err2 + }) + return nil +} + +// Type implements pflag.Value. +func (fv *flagValue) Type() string { return "filename" } + +func pStart(name string) startFunc { + return func(w io.Writer) (StopFunc, error) { + return Profile(w, name) + } +} + +// AddProfileFlags adds flags to a pflag.FlagSet to write any (or all) +// of the standard profiles to a file, and returns a "stop" function +// to be called at program shutdown. +func AddProfileFlags(flags *pflag.FlagSet, prefix string) StopFunc { + var root flagSet + + flags.Var(&flagValue{parent: &root, start: CPU}, prefix+"cpu", "Write a CPU profile to the file `cpu.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"cpu") + + flags.Var(&flagValue{parent: &root, start: Trace}, prefix+"trace", "Write a trace (https://pkg.go.dev/runtime/trace) to the file `trace.out`") + _ = cobra.MarkFlagFilename(flags, prefix+"trace") + + flags.Var(&flagValue{parent: &root, start: pStart("goroutine")}, prefix+"goroutine", "Write a goroutine profile to the file `goroutine.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"goroutine") + + flags.Var(&flagValue{parent: &root, start: pStart("threadcreate")}, prefix+"threadcreate", "Write a threadcreate profile to the file `threadcreate.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"threadcreate") + + flags.Var(&flagValue{parent: &root, start: pStart("heap")}, prefix+"heap", "Write a heap profile to the file `heap.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"heap") + + flags.Var(&flagValue{parent: &root, start: pStart("allocs")}, prefix+"allocs", "Write an allocs profile to the file `allocs.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"allocs") + + flags.Var(&flagValue{parent: &root, start: pStart("block")}, prefix+"block", "Write a block profile to the file `block.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"block") + + flags.Var(&flagValue{parent: &root, start: pStart("mutex")}, prefix+"mutex", "Write a mutex profile to the file `mutex.pprof`") + _ = cobra.MarkFlagFilename(flags, prefix+"mutex") + + return root.Stop +} diff --git a/lib/profile/profile.go b/lib/profile/profile.go new file mode 100644 index 0000000..27c7e61 --- /dev/null +++ b/lib/profile/profile.go @@ -0,0 +1,89 @@ +// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com> +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package profile + +import ( + "io" + "runtime/pprof" + "runtime/trace" +) + +type StopFunc = func() error + +type startFunc = func(io.Writer) (StopFunc, error) + +// CPU arranges to write a CPU profile to the given Writer, and +// returns a function to be called on shutdown. +func CPU(w io.Writer) (StopFunc, error) { + if err := pprof.StartCPUProfile(w); err != nil { + return nil, err + } + return func() error { + pprof.StopCPUProfile() + return nil + }, nil +} + +var _ startFunc = CPU + +// Profile arranges to write the given named-profile to the given +// Writer, and returns a function to be called on shutdown. +// +// CPU profiles are not named profiles; there is a separate .CPU() +// function for writing CPU profiles. +// +// The Go runtime has several built-in named profiles, and it is +// possible for programs to create their own named profiles with +// runtime/pprof.NewProfile(). +// +// This package provides ProfileXXX constants for the built-in named +// profiles, and a .Profiles() function that return the list of all +// profile names. +func Profile(w io.Writer, name string) (StopFunc, error) { + return func() error { + if prof := pprof.Lookup(name); prof != nil { + return prof.WriteTo(w, 0) + } + return nil + }, nil +} + +// Profiles returns a list of all profile names that may be passed to +// .Profile(); both profiles built-in to the Go runtime, and +// program-added profiles. +func Profiles() []string { + full := pprof.Profiles() + names := make([]string, len(full)) + for i, prof := range full { + names[i] = prof.Name() + } + return names +} + +// The Go runtime's built-in named profiles; to be passed to +// .Profile(). +const ( + ProfileGoroutine = "goroutine" + ProfileThreadCreate = "threadcreate" + ProfileHeap = "heap" + ProfileAllocs = "allocs" + ProfileBlock = "block" + ProfileMutex = "mutex" +) + +// Trace arranges to write a trace (https://pkg.go.dev/runtime/trace) +// to the given Writer, and returns a function to be called on +// shutdown. +func Trace(w io.Writer) (StopFunc, error) { + if err := trace.Start(w); err != nil { + return nil, err + } + return func() error { + trace.Stop() + return nil + }, nil +} + +var _ startFunc = Trace diff --git a/lib/streamio/runescanner.go b/lib/streamio/runescanner.go new file mode 100644 index 0000000..451f32f --- /dev/null +++ b/lib/streamio/runescanner.go @@ -0,0 +1,106 @@ +// Copyright (C) 2022-2023 Luke Shumaker <lukeshu@lukeshu.com> +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package streamio + +import ( + "bufio" + "context" + "io" + "os" + "time" + + "github.com/datawire/dlib/dlog" + + "git.lukeshu.com/btrfs-progs-ng/lib/textui" +) + +type RuneScanner interface { + io.RuneScanner + io.Closer +} + +type runeScanner struct { + ctx context.Context //nolint:containedctx // For detecting shutdown from methods + done <-chan struct{} + progress textui.Portion[int64] + progressWriter *textui.Progress[textui.Portion[int64]] + unreadCnt uint64 + reader *bufio.Reader + closer io.Closer +} + +// NewRuneScanner returns an io.RuneScanner (and io.Closer) that +// bufferes a file, similar to bufio.NewReader. There are two +// advantages over bufio.NewReader: +// +// - It takes a Context, and causes reads to fail once the Context is +// canceled; allowing large parse operations to be gracefully cut +// short. +// +// - It logs the progress of reading the file via textui.Progress. +func NewRuneScanner(ctx context.Context, fh *os.File) (RuneScanner, error) { + fi, err := fh.Stat() + if err != nil { + return nil, err + } + ret := &runeScanner{ + ctx: ctx, + done: ctx.Done(), + progress: textui.Portion[int64]{ + D: fi.Size(), + }, + progressWriter: textui.NewProgress[textui.Portion[int64]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)), + reader: bufio.NewReader(fh), + closer: fh, + } + return ret, nil +} + +func isClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + +//nolint:gomnd // False positive: gomnd.ignored-functions=[textui.Tunable] doesn't support type params. +var runeThrottle = textui.Tunable[int64](64) + +// ReadRune implements io.RuneReader. +func (rs *runeScanner) ReadRune() (r rune, size int, err error) { + // According to the profiler, checking if the rs.ctx.Done() + // channel is closed is faster than checking if rs.ctx.Err() + // is non-nil. + if rs.unreadCnt == 0 && isClosed(rs.done) { + return 0, 0, rs.ctx.Err() + } + r, size, err = rs.reader.ReadRune() + if rs.unreadCnt > 0 { + rs.unreadCnt-- + } else { + rs.progress.N += int64(size) + if rs.progress.D < runeThrottle || rs.progress.N%runeThrottle == 0 || rs.progress.N > rs.progress.D-runeThrottle { + rs.progressWriter.Set(rs.progress) + } + } + return +} + +// ReadRune implements io.RuneScanner. +func (rs *runeScanner) UnreadRune() error { + if err := rs.reader.UnreadRune(); err != nil { + return err + } + rs.unreadCnt++ + return nil +} + +// ReadRune implements io.Closer. +func (rs *runeScanner) Close() error { + rs.progressWriter.Done() + return rs.closer.Close() +} diff --git a/lib/textui/log.go b/lib/textui/log.go index f73b271..2bcd9af 100644 --- a/lib/textui/log.go +++ b/lib/textui/log.go @@ -18,12 +18,12 @@ import ( "path/filepath" "runtime" "sort" - "strconv" "strings" "sync" "time" "unicode" + "git.lukeshu.com/go/typedsync" "github.com/datawire/dlib/dlog" "github.com/spf13/pflag" ) @@ -153,7 +153,11 @@ func (l *logger) UnformattedLogf(lvl dlog.LogLevel, format string, args ...any) } var ( - logBuf bytes.Buffer + logBufPool = typedsync.Pool[*bytes.Buffer]{ + New: func() *bytes.Buffer { + return new(bytes.Buffer) + }, + } logMu sync.Mutex thisModDir string ) @@ -169,13 +173,8 @@ func (l *logger) log(lvl dlog.LogLevel, writeMsg func(io.Writer)) { if lvl > l.lvl { return } - // This is optimized for mostly-single-threaded usage. If I cared more - // about multi-threaded performance, I'd trade in some - // memory-use/allocations and (1) instead of using a static `logBuf`, - // I'd have a `logBufPool` `sync.Pool`, and (2) have the final call to - // `l.out.Write()` be the only thing protected by `logMu`. - logMu.Lock() - defer logMu.Unlock() + logBuf, _ := logBufPool.Get() + defer logBufPool.Put(logBuf) defer logBuf.Reset() // time //////////////////////////////////////////////////////////////// @@ -222,19 +221,19 @@ func (l *logger) log(lvl dlog.LogLevel, writeMsg func(io.Writer)) { nextField = i break } - writeField(&logBuf, fieldKey, fields[fieldKey]) + writeField(logBuf, fieldKey, fields[fieldKey]) } // message ///////////////////////////////////////////////////////////// logBuf.WriteString(" : ") - writeMsg(&logBuf) + writeMsg(logBuf) // fields (late) /////////////////////////////////////////////////////// if nextField < len(fieldKeys) { logBuf.WriteString(" :") } for _, fieldKey := range fieldKeys[nextField:] { - writeField(&logBuf, fieldKey, fields[fieldKey]) + writeField(logBuf, fieldKey, fields[fieldKey]) } // caller ////////////////////////////////////////////////////////////// @@ -258,13 +257,16 @@ func (l *logger) log(lvl dlog.LogLevel, writeMsg func(io.Writer)) { logBuf.WriteString(" :") } file := f.File[strings.LastIndex(f.File, thisModDir+"/")+len(thisModDir+"/"):] - fmt.Fprintf(&logBuf, " (from %s:%d)", file, f.Line) + fmt.Fprintf(logBuf, " (from %s:%d)", file, f.Line) break } // boilerplate ///////////////////////////////////////////////////////// logBuf.WriteByte('\n') + + logMu.Lock() _, _ = l.out.Write(logBuf.Bytes()) + logMu.Unlock() } // fieldOrd returns the sort-position for a given log-field-key. Lower return @@ -344,35 +346,49 @@ func fieldOrd(key string) int { } func writeField(w io.Writer, key string, val any) { - valStr := printer.Sprint(val) + valBuf, _ := logBufPool.Get() + defer func() { + // The wrapper `func()` is important to defer + // evaluating `valBuf`, since we might re-assign it + // below. + valBuf.Reset() + logBufPool.Put(valBuf) + }() + _, _ = printer.Fprint(valBuf, val) needsQuote := false - if strings.HasPrefix(valStr, `"`) { + if bytes.HasPrefix(valBuf.Bytes(), []byte(`"`)) { needsQuote = true } else { - for _, r := range valStr { - if !(unicode.IsPrint(r) && r != ' ') { + for _, r := range valBuf.Bytes() { + if !(unicode.IsPrint(rune(r)) && r != ' ') { needsQuote = true break } } } if needsQuote { - valStr = strconv.Quote(valStr) + valBuf2, _ := logBufPool.Get() + fmt.Fprintf(valBuf2, "%q", valBuf.Bytes()) + valBuf.Reset() + logBufPool.Put(valBuf) + valBuf = valBuf2 } + valStr := valBuf.Bytes() name := key switch { case name == "THREAD": name = "thread" - switch valStr { - case "", "/main": + switch { + case len(valStr) == 0 || bytes.Equal(valStr, []byte("/main")): return default: - if strings.HasPrefix(valStr, "/main/") { - valStr = strings.TrimPrefix(valStr, "/main") + if bytes.HasPrefix(valStr, []byte("/main/")) { + valStr = valStr[len("/main/"):] + } else if bytes.HasPrefix(valStr, []byte("/")) { + valStr = valStr[len("/"):] } - valStr = strings.TrimPrefix(valStr, "/") } case strings.HasSuffix(name, ".pass"): fmt.Fprintf(w, "/pass-%s", valStr) diff --git a/lib/textui/progress.go b/lib/textui/progress.go index 68d986f..48a3901 100644 --- a/lib/textui/progress.go +++ b/lib/textui/progress.go @@ -7,9 +7,9 @@ package textui import ( "context" "fmt" - "sync/atomic" "time" + "git.lukeshu.com/go/typedsync" "github.com/datawire/dlib/dlog" ) @@ -26,7 +26,7 @@ type Progress[T Stats] struct { cancel context.CancelFunc done chan struct{} - cur atomic.Value // Value[T] + cur typedsync.Value[T] oldStat T oldLine string } @@ -45,7 +45,7 @@ func NewProgress[T Stats](ctx context.Context, lvl dlog.LogLevel, interval time. } func (p *Progress[T]) Set(val T) { - if p.cur.Swap(val) == nil { + if _, hadOld := p.cur.Swap(val); !hadOld { go p.run() } } @@ -56,8 +56,10 @@ func (p *Progress[T]) Done() { } func (p *Progress[T]) flush(force bool) { - //nolint:forcetypeassert // It wasn't worth it to me (yet?) to make a typed wrapper around atomic.Value. - cur := p.cur.Load().(T) + cur, ok := p.cur.Load() + if !ok { + panic("should not happen") + } if !force && cur == p.oldStat { return } |