From 493613b9b3b12d8d7235f2a187a1e3154c5dfa78 Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Tue, 17 Apr 2018 18:26:53 -0400 Subject: cooler job tracking --- jwg.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 23 +++++++++-------- 2 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 jwg.go diff --git a/jwg.go b/jwg.go new file mode 100644 index 0000000..6ef06d0 --- /dev/null +++ b/jwg.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "os" + "sort" + "strings" + "sync" + "time" +) + +type JobWaitGroup struct { + lock sync.RWMutex + jobs map[string]bool + wg sync.WaitGroup +} + +func (jwg *JobWaitGroup) Do(name string, fn func()) { + jwg.lock.Lock() + defer jwg.lock.Unlock() + + if jwg.jobs == nil { + jwg.jobs = make(map[string]bool) + } + if _, dup := jwg.jobs[name]; dup { + panic(fmt.Sprintf("job %q already exists", name)) + } + jwg.jobs[name] = true + //fmt.Fprintf(os.Stderr, "add %p %v\n", &jwg.wg, jwg.wg) + jwg.wg.Add(1) + go func() { + defer func() { + jwg.lock.Lock() + defer jwg.lock.Unlock() + jwg.jobs[name] = false + jwg.wg.Done() + //fmt.Fprintf(os.Stderr, "del %p %v\n", &jwg.wg, jwg.wg) + }() + fn() + }() +} + +func (jwg *JobWaitGroup) Wait() { + jwg.wg.Wait() +} + +func (jwg *JobWaitGroup) Status() (int, []string) { + jwg.lock.RLock() + + n := len(jwg.jobs) + var jobs []string + for job, active := range jwg.jobs { + if active { + jobs = append(jobs, job) + } + } + defer jwg.lock.RUnlock() + sort.Strings(jobs) + + return n, jobs +} + +func (jwg *JobWaitGroup) Watch(d time.Duration) { + ticker := time.NewTicker(d) + done := make(chan struct{}) + go func() { + jwg.Wait() + ticker.Stop() + done <- struct{}{} + }() + for { + select { + case <-ticker.C: + n, jobs := jwg.Status() + if len(jobs) == 0 { + panic("no active jobs, but wg still waiting") + } + line := fmt.Sprintf("%d/%d : %v", len(jobs), n, strings.Join(jobs, ", ")) + if len(line) > 70 { + line = line[:67] + "..." + } + fmt.Fprintf(os.Stderr, "\r%-70s", line) + case <-done: + fmt.Fprintf(os.Stderr, "\r%-70s\n", "done") + return + } + } +} diff --git a/main.go b/main.go index eba5024..4f4b983 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "sync" + "time" ) func fmtAddress(node, port string) string { @@ -23,7 +24,7 @@ func isIPv6(node string) bool { return ip != nil && ip.To4() == nil } -var wg sync.WaitGroup +var jwg JobWaitGroup func Emit(pt Point) { if pt == nil { @@ -33,23 +34,25 @@ func Emit(pt Point) { } func DoHostfile(fname string) { - defer wg.Done() hostname := filepath.Base(fname) + cfg, err := readConfigFile(fname) if err != nil { Emit(NewPoint("public", map[string]string{"host": hostname}, map[string]interface{}{"error": err.Error()})) return } + + addresses := make(map[string]struct{}) for _, address := range getAddresses(cfg) { - addressStr := fmtAddress(address.Node, address.Port) - wg.Add(2) - go Emit(DoAddress(hostname, "tcp4", addressStr)) - go Emit(DoAddress(hostname, "tcp6", addressStr)) + addresses[fmtAddress(address.Node, address.Port)] = struct{}{} + } + for address := range addresses { + jwg.Do(hostname+"/"+address+"/tcp4", func() { Emit(DoAddress(hostname, "tcp4", address)) }) + jwg.Do(hostname+"/"+address+"/tcp6", func() { Emit(DoAddress(hostname, "tcp6", address)) }) } } func DoAddress(host, network, address string) Point { - defer wg.Done() tags := map[string]string{ "host": host, "network": network, @@ -100,6 +103,7 @@ func Hello(addr *net.TCPAddr) (name, version string, err error) { return "", "", err } defer conn.Close() + conn.SetDeadline(time.Now().Add(10 * time.Second)) conn.CloseWrite() all, _ := ioutil.ReadAll(conn) line := strings.TrimRight(string(all), "\n") @@ -112,8 +116,7 @@ func Hello(addr *net.TCPAddr) (name, version string, err error) { func main() { for _, fname := range os.Args[1:] { - wg.Add(1) - go DoHostfile(fname) + DoHostfile(fname) } - wg.Wait() + jwg.Watch(time.Second) } -- cgit v1.2.3