summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/btrfs/btrfssum/shortsum.go10
-rw-r--r--lib/btrfsprogs/btrfsinspect/mount.go5
-rw-r--r--lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go3
-rw-r--r--lib/containers/set.go4
-rw-r--r--lib/containers/syncmap.go53
-rw-r--r--lib/profile/cobra.go109
-rw-r--r--lib/profile/profile.go89
-rw-r--r--lib/streamio/runescanner.go106
-rw-r--r--lib/textui/log.go62
-rw-r--r--lib/textui/progress.go12
10 files changed, 362 insertions, 91 deletions
diff --git a/lib/btrfs/btrfssum/shortsum.go b/lib/btrfs/btrfssum/shortsum.go
index c3d6f8b..e3441ae 100644
--- a/lib/btrfs/btrfssum/shortsum.go
+++ b/lib/btrfs/btrfssum/shortsum.go
@@ -248,7 +248,7 @@ func (sg SumRunWithGaps[Addr]) EncodeJSON(w io.Writer) error {
}
fallthrough
default:
- if err := lowmemjson.Encode(w, run); err != nil {
+ if err := lowmemjson.NewEncoder(w).Encode(run); err != nil {
return err
}
cur = run.Addr.Add(run.Size())
@@ -274,18 +274,18 @@ func (sg *SumRunWithGaps[Addr]) DecodeJSON(r io.RuneScanner) error {
var name string
return lowmemjson.DecodeObject(r,
func(r io.RuneScanner) error {
- return lowmemjson.Decode(r, &name)
+ return lowmemjson.NewDecoder(r).Decode(&name)
},
func(r io.RuneScanner) error {
switch name {
case "Addr":
- return lowmemjson.Decode(r, &sg.Addr)
+ return lowmemjson.NewDecoder(r).Decode(&sg.Addr)
case "Size":
- return lowmemjson.Decode(r, &sg.Size)
+ return lowmemjson.NewDecoder(r).Decode(&sg.Size)
case "Runs":
return lowmemjson.DecodeArray(r, func(r io.RuneScanner) error {
var run SumRun[Addr]
- if err := lowmemjson.Decode(r, &run); err != nil {
+ if err := lowmemjson.NewDecoder(r).Decode(&run); err != nil {
return err
}
if run.ChecksumSize > 0 {
diff --git a/lib/btrfsprogs/btrfsinspect/mount.go b/lib/btrfsprogs/btrfsinspect/mount.go
index ee3c0ec..0ac8497 100644
--- a/lib/btrfsprogs/btrfsinspect/mount.go
+++ b/lib/btrfsprogs/btrfsinspect/mount.go
@@ -14,6 +14,7 @@ import (
"sync/atomic"
"syscall"
+ "git.lukeshu.com/go/typedsync"
"github.com/datawire/dlib/dcontext"
"github.com/datawire/dlib/dgroup"
"github.com/datawire/dlib/dlog"
@@ -109,8 +110,8 @@ type subvolume struct {
fuseutil.NotImplementedFileSystem
lastHandle uint64
- dirHandles containers.SyncMap[fuseops.HandleID, *dirState]
- fileHandles containers.SyncMap[fuseops.HandleID, *fileState]
+ dirHandles typedsync.Map[fuseops.HandleID, *dirState]
+ fileHandles typedsync.Map[fuseops.HandleID, *fileState]
subvolMu sync.Mutex
subvols containers.Set[string]
diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go
index ff6b1c5..45a5bb2 100644
--- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go
+++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go
@@ -7,6 +7,7 @@ package btrees
import (
"context"
+ "git.lukeshu.com/go/typedsync"
"github.com/datawire/dlib/dlog"
"git.lukeshu.com/btrfs-progs-ng/lib/btrfs/btrfsitem"
@@ -62,7 +63,7 @@ type RebuiltForrest struct {
cbLookupUUID func(ctx context.Context, uuid btrfsprim.UUID) (id btrfsprim.ObjID, ok bool)
// mutable
- trees containers.SyncMap[btrfsprim.ObjID, *RebuiltTree]
+ trees typedsync.Map[btrfsprim.ObjID, *RebuiltTree]
leafs *containers.LRUCache[btrfsprim.ObjID, map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr]]
incItems *containers.LRUCache[btrfsprim.ObjID, *itemIndex]
excItems *containers.LRUCache[btrfsprim.ObjID, *itemIndex]
diff --git a/lib/containers/set.go b/lib/containers/set.go
index 4fc8aad..b2af494 100644
--- a/lib/containers/set.go
+++ b/lib/containers/set.go
@@ -71,7 +71,7 @@ func (o Set[T]) EncodeJSON(w io.Writer) error {
return less(keys[i], keys[j])
})
- return lowmemjson.Encode(w, keys)
+ return lowmemjson.NewEncoder(w).Encode(keys)
}
func (o *Set[T]) DecodeJSON(r io.RuneScanner) error {
@@ -87,7 +87,7 @@ func (o *Set[T]) DecodeJSON(r io.RuneScanner) error {
*o = Set[T]{}
return lowmemjson.DecodeArray(r, func(r io.RuneScanner) error {
var val T
- if err := lowmemjson.Decode(r, &val); err != nil {
+ if err := lowmemjson.NewDecoder(r).Decode(&val); err != nil {
return err
}
(*o)[val] = struct{}{}
diff --git a/lib/containers/syncmap.go b/lib/containers/syncmap.go
deleted file mode 100644
index 74da4b3..0000000
--- a/lib/containers/syncmap.go
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright (C) 2022-2023 Luke Shumaker <lukeshu@lukeshu.com>
-//
-// SPDX-License-Identifier: GPL-2.0-or-later
-
-package containers
-
-import (
- "sync"
-)
-
-type SyncMap[K comparable, V any] struct {
- inner sync.Map
-}
-
-func (m *SyncMap[K, V]) Delete(key K) {
- m.inner.Delete(key)
-}
-
-func (m *SyncMap[K, V]) Load(key K) (value V, ok bool) {
- _value, ok := m.inner.Load(key)
- if ok {
- //nolint:forcetypeassert // Typed wrapper around untyped lib.
- value = _value.(V)
- }
- return value, ok
-}
-
-func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
- _value, ok := m.inner.LoadAndDelete(key)
- if ok {
- //nolint:forcetypeassert // Typed wrapper around untyped lib.
- value = _value.(V)
- }
- return value, ok
-}
-
-func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
- _actual, loaded := m.inner.LoadOrStore(key, value)
- //nolint:forcetypeassert // Typed wrapper around untyped lib.
- actual = _actual.(V)
- return actual, loaded
-}
-
-func (m *SyncMap[K, V]) Range(f func(key K, value V) bool) {
- m.inner.Range(func(key, value any) bool {
- //nolint:forcetypeassert // Typed wrapper around untyped lib.
- return f(key.(K), value.(V))
- })
-}
-
-func (m *SyncMap[K, V]) Store(key K, value V) {
- m.inner.Store(key, value)
-}
diff --git a/lib/profile/cobra.go b/lib/profile/cobra.go
new file mode 100644
index 0000000..3094015
--- /dev/null
+++ b/lib/profile/cobra.go
@@ -0,0 +1,109 @@
+// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package profile
+
+import (
+ "io"
+ "os"
+
+ "github.com/datawire/dlib/derror"
+ "github.com/spf13/cobra"
+ "github.com/spf13/pflag"
+)
+
+type flagSet struct {
+ shutdown []StopFunc
+}
+
+func (fs *flagSet) Stop() error {
+ var errs derror.MultiError
+ for _, fn := range fs.shutdown {
+ if err := fn(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ if len(errs) > 0 {
+ return errs
+ }
+ return nil
+}
+
+type flagValue struct {
+ parent *flagSet
+ start startFunc
+ curVal string
+}
+
+var _ pflag.Value = (*flagValue)(nil)
+
+// String implements pflag.Value.
+func (fv *flagValue) String() string { return fv.curVal }
+
+// Set implements pflag.Value.
+func (fv *flagValue) Set(filename string) error {
+ if filename == "" {
+ return nil
+ }
+ w, err := os.Create(filename)
+ if err != nil {
+ return err
+ }
+ shutdown, err := fv.start(w)
+ if err != nil {
+ return err
+ }
+ fv.curVal = filename
+ fv.parent.shutdown = append(fv.parent.shutdown, func() error {
+ err1 := shutdown()
+ err2 := w.Close()
+ if err1 != nil {
+ return err1
+ }
+ return err2
+ })
+ return nil
+}
+
+// Type implements pflag.Value.
+func (fv *flagValue) Type() string { return "filename" }
+
+func pStart(name string) startFunc {
+ return func(w io.Writer) (StopFunc, error) {
+ return Profile(w, name)
+ }
+}
+
+// AddProfileFlags adds flags to a pflag.FlagSet to write any (or all)
+// of the standard profiles to a file, and returns a "stop" function
+// to be called at program shutdown.
+func AddProfileFlags(flags *pflag.FlagSet, prefix string) StopFunc {
+ var root flagSet
+
+ flags.Var(&flagValue{parent: &root, start: CPU}, prefix+"cpu", "Write a CPU profile to the file `cpu.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"cpu")
+
+ flags.Var(&flagValue{parent: &root, start: Trace}, prefix+"trace", "Write a trace (https://pkg.go.dev/runtime/trace) to the file `trace.out`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"trace")
+
+ flags.Var(&flagValue{parent: &root, start: pStart("goroutine")}, prefix+"goroutine", "Write a goroutine profile to the file `goroutine.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"goroutine")
+
+ flags.Var(&flagValue{parent: &root, start: pStart("threadcreate")}, prefix+"threadcreate", "Write a threadcreate profile to the file `threadcreate.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"threadcreate")
+
+ flags.Var(&flagValue{parent: &root, start: pStart("heap")}, prefix+"heap", "Write a heap profile to the file `heap.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"heap")
+
+ flags.Var(&flagValue{parent: &root, start: pStart("allocs")}, prefix+"allocs", "Write an allocs profile to the file `allocs.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"allocs")
+
+ flags.Var(&flagValue{parent: &root, start: pStart("block")}, prefix+"block", "Write a block profile to the file `block.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"block")
+
+ flags.Var(&flagValue{parent: &root, start: pStart("mutex")}, prefix+"mutex", "Write a mutex profile to the file `mutex.pprof`")
+ _ = cobra.MarkFlagFilename(flags, prefix+"mutex")
+
+ return root.Stop
+}
diff --git a/lib/profile/profile.go b/lib/profile/profile.go
new file mode 100644
index 0000000..27c7e61
--- /dev/null
+++ b/lib/profile/profile.go
@@ -0,0 +1,89 @@
+// Copyright (C) 2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package profile
+
+import (
+ "io"
+ "runtime/pprof"
+ "runtime/trace"
+)
+
+type StopFunc = func() error
+
+type startFunc = func(io.Writer) (StopFunc, error)
+
+// CPU arranges to write a CPU profile to the given Writer, and
+// returns a function to be called on shutdown.
+func CPU(w io.Writer) (StopFunc, error) {
+ if err := pprof.StartCPUProfile(w); err != nil {
+ return nil, err
+ }
+ return func() error {
+ pprof.StopCPUProfile()
+ return nil
+ }, nil
+}
+
+var _ startFunc = CPU
+
+// Profile arranges to write the given named-profile to the given
+// Writer, and returns a function to be called on shutdown.
+//
+// CPU profiles are not named profiles; there is a separate .CPU()
+// function for writing CPU profiles.
+//
+// The Go runtime has several built-in named profiles, and it is
+// possible for programs to create their own named profiles with
+// runtime/pprof.NewProfile().
+//
+// This package provides ProfileXXX constants for the built-in named
+// profiles, and a .Profiles() function that return the list of all
+// profile names.
+func Profile(w io.Writer, name string) (StopFunc, error) {
+ return func() error {
+ if prof := pprof.Lookup(name); prof != nil {
+ return prof.WriteTo(w, 0)
+ }
+ return nil
+ }, nil
+}
+
+// Profiles returns a list of all profile names that may be passed to
+// .Profile(); both profiles built-in to the Go runtime, and
+// program-added profiles.
+func Profiles() []string {
+ full := pprof.Profiles()
+ names := make([]string, len(full))
+ for i, prof := range full {
+ names[i] = prof.Name()
+ }
+ return names
+}
+
+// The Go runtime's built-in named profiles; to be passed to
+// .Profile().
+const (
+ ProfileGoroutine = "goroutine"
+ ProfileThreadCreate = "threadcreate"
+ ProfileHeap = "heap"
+ ProfileAllocs = "allocs"
+ ProfileBlock = "block"
+ ProfileMutex = "mutex"
+)
+
+// Trace arranges to write a trace (https://pkg.go.dev/runtime/trace)
+// to the given Writer, and returns a function to be called on
+// shutdown.
+func Trace(w io.Writer) (StopFunc, error) {
+ if err := trace.Start(w); err != nil {
+ return nil, err
+ }
+ return func() error {
+ trace.Stop()
+ return nil
+ }, nil
+}
+
+var _ startFunc = Trace
diff --git a/lib/streamio/runescanner.go b/lib/streamio/runescanner.go
new file mode 100644
index 0000000..451f32f
--- /dev/null
+++ b/lib/streamio/runescanner.go
@@ -0,0 +1,106 @@
+// Copyright (C) 2022-2023 Luke Shumaker <lukeshu@lukeshu.com>
+//
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+package streamio
+
+import (
+ "bufio"
+ "context"
+ "io"
+ "os"
+ "time"
+
+ "github.com/datawire/dlib/dlog"
+
+ "git.lukeshu.com/btrfs-progs-ng/lib/textui"
+)
+
+type RuneScanner interface {
+ io.RuneScanner
+ io.Closer
+}
+
+type runeScanner struct {
+ ctx context.Context //nolint:containedctx // For detecting shutdown from methods
+ done <-chan struct{}
+ progress textui.Portion[int64]
+ progressWriter *textui.Progress[textui.Portion[int64]]
+ unreadCnt uint64
+ reader *bufio.Reader
+ closer io.Closer
+}
+
+// NewRuneScanner returns an io.RuneScanner (and io.Closer) that
+// bufferes a file, similar to bufio.NewReader. There are two
+// advantages over bufio.NewReader:
+//
+// - It takes a Context, and causes reads to fail once the Context is
+// canceled; allowing large parse operations to be gracefully cut
+// short.
+//
+// - It logs the progress of reading the file via textui.Progress.
+func NewRuneScanner(ctx context.Context, fh *os.File) (RuneScanner, error) {
+ fi, err := fh.Stat()
+ if err != nil {
+ return nil, err
+ }
+ ret := &runeScanner{
+ ctx: ctx,
+ done: ctx.Done(),
+ progress: textui.Portion[int64]{
+ D: fi.Size(),
+ },
+ progressWriter: textui.NewProgress[textui.Portion[int64]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)),
+ reader: bufio.NewReader(fh),
+ closer: fh,
+ }
+ return ret, nil
+}
+
+func isClosed(ch <-chan struct{}) bool {
+ select {
+ case <-ch:
+ return true
+ default:
+ return false
+ }
+}
+
+//nolint:gomnd // False positive: gomnd.ignored-functions=[textui.Tunable] doesn't support type params.
+var runeThrottle = textui.Tunable[int64](64)
+
+// ReadRune implements io.RuneReader.
+func (rs *runeScanner) ReadRune() (r rune, size int, err error) {
+ // According to the profiler, checking if the rs.ctx.Done()
+ // channel is closed is faster than checking if rs.ctx.Err()
+ // is non-nil.
+ if rs.unreadCnt == 0 && isClosed(rs.done) {
+ return 0, 0, rs.ctx.Err()
+ }
+ r, size, err = rs.reader.ReadRune()
+ if rs.unreadCnt > 0 {
+ rs.unreadCnt--
+ } else {
+ rs.progress.N += int64(size)
+ if rs.progress.D < runeThrottle || rs.progress.N%runeThrottle == 0 || rs.progress.N > rs.progress.D-runeThrottle {
+ rs.progressWriter.Set(rs.progress)
+ }
+ }
+ return
+}
+
+// ReadRune implements io.RuneScanner.
+func (rs *runeScanner) UnreadRune() error {
+ if err := rs.reader.UnreadRune(); err != nil {
+ return err
+ }
+ rs.unreadCnt++
+ return nil
+}
+
+// ReadRune implements io.Closer.
+func (rs *runeScanner) Close() error {
+ rs.progressWriter.Done()
+ return rs.closer.Close()
+}
diff --git a/lib/textui/log.go b/lib/textui/log.go
index f73b271..2bcd9af 100644
--- a/lib/textui/log.go
+++ b/lib/textui/log.go
@@ -18,12 +18,12 @@ import (
"path/filepath"
"runtime"
"sort"
- "strconv"
"strings"
"sync"
"time"
"unicode"
+ "git.lukeshu.com/go/typedsync"
"github.com/datawire/dlib/dlog"
"github.com/spf13/pflag"
)
@@ -153,7 +153,11 @@ func (l *logger) UnformattedLogf(lvl dlog.LogLevel, format string, args ...any)
}
var (
- logBuf bytes.Buffer
+ logBufPool = typedsync.Pool[*bytes.Buffer]{
+ New: func() *bytes.Buffer {
+ return new(bytes.Buffer)
+ },
+ }
logMu sync.Mutex
thisModDir string
)
@@ -169,13 +173,8 @@ func (l *logger) log(lvl dlog.LogLevel, writeMsg func(io.Writer)) {
if lvl > l.lvl {
return
}
- // This is optimized for mostly-single-threaded usage. If I cared more
- // about multi-threaded performance, I'd trade in some
- // memory-use/allocations and (1) instead of using a static `logBuf`,
- // I'd have a `logBufPool` `sync.Pool`, and (2) have the final call to
- // `l.out.Write()` be the only thing protected by `logMu`.
- logMu.Lock()
- defer logMu.Unlock()
+ logBuf, _ := logBufPool.Get()
+ defer logBufPool.Put(logBuf)
defer logBuf.Reset()
// time ////////////////////////////////////////////////////////////////
@@ -222,19 +221,19 @@ func (l *logger) log(lvl dlog.LogLevel, writeMsg func(io.Writer)) {
nextField = i
break
}
- writeField(&logBuf, fieldKey, fields[fieldKey])
+ writeField(logBuf, fieldKey, fields[fieldKey])
}
// message /////////////////////////////////////////////////////////////
logBuf.WriteString(" : ")
- writeMsg(&logBuf)
+ writeMsg(logBuf)
// fields (late) ///////////////////////////////////////////////////////
if nextField < len(fieldKeys) {
logBuf.WriteString(" :")
}
for _, fieldKey := range fieldKeys[nextField:] {
- writeField(&logBuf, fieldKey, fields[fieldKey])
+ writeField(logBuf, fieldKey, fields[fieldKey])
}
// caller //////////////////////////////////////////////////////////////
@@ -258,13 +257,16 @@ func (l *logger) log(lvl dlog.LogLevel, writeMsg func(io.Writer)) {
logBuf.WriteString(" :")
}
file := f.File[strings.LastIndex(f.File, thisModDir+"/")+len(thisModDir+"/"):]
- fmt.Fprintf(&logBuf, " (from %s:%d)", file, f.Line)
+ fmt.Fprintf(logBuf, " (from %s:%d)", file, f.Line)
break
}
// boilerplate /////////////////////////////////////////////////////////
logBuf.WriteByte('\n')
+
+ logMu.Lock()
_, _ = l.out.Write(logBuf.Bytes())
+ logMu.Unlock()
}
// fieldOrd returns the sort-position for a given log-field-key. Lower return
@@ -344,35 +346,49 @@ func fieldOrd(key string) int {
}
func writeField(w io.Writer, key string, val any) {
- valStr := printer.Sprint(val)
+ valBuf, _ := logBufPool.Get()
+ defer func() {
+ // The wrapper `func()` is important to defer
+ // evaluating `valBuf`, since we might re-assign it
+ // below.
+ valBuf.Reset()
+ logBufPool.Put(valBuf)
+ }()
+ _, _ = printer.Fprint(valBuf, val)
needsQuote := false
- if strings.HasPrefix(valStr, `"`) {
+ if bytes.HasPrefix(valBuf.Bytes(), []byte(`"`)) {
needsQuote = true
} else {
- for _, r := range valStr {
- if !(unicode.IsPrint(r) && r != ' ') {
+ for _, r := range valBuf.Bytes() {
+ if !(unicode.IsPrint(rune(r)) && r != ' ') {
needsQuote = true
break
}
}
}
if needsQuote {
- valStr = strconv.Quote(valStr)
+ valBuf2, _ := logBufPool.Get()
+ fmt.Fprintf(valBuf2, "%q", valBuf.Bytes())
+ valBuf.Reset()
+ logBufPool.Put(valBuf)
+ valBuf = valBuf2
}
+ valStr := valBuf.Bytes()
name := key
switch {
case name == "THREAD":
name = "thread"
- switch valStr {
- case "", "/main":
+ switch {
+ case len(valStr) == 0 || bytes.Equal(valStr, []byte("/main")):
return
default:
- if strings.HasPrefix(valStr, "/main/") {
- valStr = strings.TrimPrefix(valStr, "/main")
+ if bytes.HasPrefix(valStr, []byte("/main/")) {
+ valStr = valStr[len("/main/"):]
+ } else if bytes.HasPrefix(valStr, []byte("/")) {
+ valStr = valStr[len("/"):]
}
- valStr = strings.TrimPrefix(valStr, "/")
}
case strings.HasSuffix(name, ".pass"):
fmt.Fprintf(w, "/pass-%s", valStr)
diff --git a/lib/textui/progress.go b/lib/textui/progress.go
index 68d986f..48a3901 100644
--- a/lib/textui/progress.go
+++ b/lib/textui/progress.go
@@ -7,9 +7,9 @@ package textui
import (
"context"
"fmt"
- "sync/atomic"
"time"
+ "git.lukeshu.com/go/typedsync"
"github.com/datawire/dlib/dlog"
)
@@ -26,7 +26,7 @@ type Progress[T Stats] struct {
cancel context.CancelFunc
done chan struct{}
- cur atomic.Value // Value[T]
+ cur typedsync.Value[T]
oldStat T
oldLine string
}
@@ -45,7 +45,7 @@ func NewProgress[T Stats](ctx context.Context, lvl dlog.LogLevel, interval time.
}
func (p *Progress[T]) Set(val T) {
- if p.cur.Swap(val) == nil {
+ if _, hadOld := p.cur.Swap(val); !hadOld {
go p.run()
}
}
@@ -56,8 +56,10 @@ func (p *Progress[T]) Done() {
}
func (p *Progress[T]) flush(force bool) {
- //nolint:forcetypeassert // It wasn't worth it to me (yet?) to make a typed wrapper around atomic.Value.
- cur := p.cur.Load().(T)
+ cur, ok := p.cur.Load()
+ if !ok {
+ panic("should not happen")
+ }
if !force && cur == p.oldStat {
return
}