summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jwg.go88
-rw-r--r--main.go23
2 files changed, 101 insertions, 10 deletions
diff --git a/jwg.go b/jwg.go
new file mode 100644
index 0000000..6ef06d0
--- /dev/null
+++ b/jwg.go
@@ -0,0 +1,88 @@
+package main
+
+import (
+ "fmt"
+ "os"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+)
+
+type JobWaitGroup struct {
+ lock sync.RWMutex
+ jobs map[string]bool
+ wg sync.WaitGroup
+}
+
+func (jwg *JobWaitGroup) Do(name string, fn func()) {
+ jwg.lock.Lock()
+ defer jwg.lock.Unlock()
+
+ if jwg.jobs == nil {
+ jwg.jobs = make(map[string]bool)
+ }
+ if _, dup := jwg.jobs[name]; dup {
+ panic(fmt.Sprintf("job %q already exists", name))
+ }
+ jwg.jobs[name] = true
+ //fmt.Fprintf(os.Stderr, "add %p %v\n", &jwg.wg, jwg.wg)
+ jwg.wg.Add(1)
+ go func() {
+ defer func() {
+ jwg.lock.Lock()
+ defer jwg.lock.Unlock()
+ jwg.jobs[name] = false
+ jwg.wg.Done()
+ //fmt.Fprintf(os.Stderr, "del %p %v\n", &jwg.wg, jwg.wg)
+ }()
+ fn()
+ }()
+}
+
+func (jwg *JobWaitGroup) Wait() {
+ jwg.wg.Wait()
+}
+
+func (jwg *JobWaitGroup) Status() (int, []string) {
+ jwg.lock.RLock()
+
+ n := len(jwg.jobs)
+ var jobs []string
+ for job, active := range jwg.jobs {
+ if active {
+ jobs = append(jobs, job)
+ }
+ }
+ defer jwg.lock.RUnlock()
+ sort.Strings(jobs)
+
+ return n, jobs
+}
+
+func (jwg *JobWaitGroup) Watch(d time.Duration) {
+ ticker := time.NewTicker(d)
+ done := make(chan struct{})
+ go func() {
+ jwg.Wait()
+ ticker.Stop()
+ done <- struct{}{}
+ }()
+ for {
+ select {
+ case <-ticker.C:
+ n, jobs := jwg.Status()
+ if len(jobs) == 0 {
+ panic("no active jobs, but wg still waiting")
+ }
+ line := fmt.Sprintf("%d/%d : %v", len(jobs), n, strings.Join(jobs, ", "))
+ if len(line) > 70 {
+ line = line[:67] + "..."
+ }
+ fmt.Fprintf(os.Stderr, "\r%-70s", line)
+ case <-done:
+ fmt.Fprintf(os.Stderr, "\r%-70s\n", "done")
+ return
+ }
+ }
+}
diff --git a/main.go b/main.go
index eba5024..4f4b983 100644
--- a/main.go
+++ b/main.go
@@ -8,6 +8,7 @@ import (
"path/filepath"
"strings"
"sync"
+ "time"
)
func fmtAddress(node, port string) string {
@@ -23,7 +24,7 @@ func isIPv6(node string) bool {
return ip != nil && ip.To4() == nil
}
-var wg sync.WaitGroup
+var jwg JobWaitGroup
func Emit(pt Point) {
if pt == nil {
@@ -33,23 +34,25 @@ func Emit(pt Point) {
}
func DoHostfile(fname string) {
- defer wg.Done()
hostname := filepath.Base(fname)
+
cfg, err := readConfigFile(fname)
if err != nil {
Emit(NewPoint("public", map[string]string{"host": hostname}, map[string]interface{}{"error": err.Error()}))
return
}
+
+ addresses := make(map[string]struct{})
for _, address := range getAddresses(cfg) {
- addressStr := fmtAddress(address.Node, address.Port)
- wg.Add(2)
- go Emit(DoAddress(hostname, "tcp4", addressStr))
- go Emit(DoAddress(hostname, "tcp6", addressStr))
+ addresses[fmtAddress(address.Node, address.Port)] = struct{}{}
+ }
+ for address := range addresses {
+ jwg.Do(hostname+"/"+address+"/tcp4", func() { Emit(DoAddress(hostname, "tcp4", address)) })
+ jwg.Do(hostname+"/"+address+"/tcp6", func() { Emit(DoAddress(hostname, "tcp6", address)) })
}
}
func DoAddress(host, network, address string) Point {
- defer wg.Done()
tags := map[string]string{
"host": host,
"network": network,
@@ -100,6 +103,7 @@ func Hello(addr *net.TCPAddr) (name, version string, err error) {
return "", "", err
}
defer conn.Close()
+ conn.SetDeadline(time.Now().Add(10 * time.Second))
conn.CloseWrite()
all, _ := ioutil.ReadAll(conn)
line := strings.TrimRight(string(all), "\n")
@@ -112,8 +116,7 @@ func Hello(addr *net.TCPAddr) (name, version string, err error) {
func main() {
for _, fname := range os.Args[1:] {
- wg.Add(1)
- go DoHostfile(fname)
+ DoHostfile(fname)
}
- wg.Wait()
+ jwg.Watch(time.Second)
}