cmd/upspin-warden: add daemon to run and monitor Upspin processes
This is a starting point for a backend for the various native console
implementations, as described here:
https://github.com/upspin/upspin/issues/312#issuecomment-345548105
Update upspin/upspin#312
Change-Id: I79aeeec8654c3204cc72e05ff1088cf392e4ee1b
Reviewed-on: https://upspin-review.googlesource.com/16560
Reviewed-by: David Presotto <presotto@gmail.com>
Reviewed-by: Rob Pike <r@golang.org>
diff --git a/cmd/upspin-warden/log.go b/cmd/upspin-warden/log.go
new file mode 100644
index 0000000..d08e10f
--- /dev/null
+++ b/cmd/upspin-warden/log.go
@@ -0,0 +1,53 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "bytes"
+ "sync"
+)
+
+var maxBacklog = 64 * 1024 // Tests override this.
+
+// rollingLog is an io.Writer that buffers all data written to it, purging
+// earlier entries to maintain a buffer size of maxBacklog bytes.
+// Its methods are safe for concurrent use.
+type rollingLog struct {
+ mu sync.Mutex
+ buf []byte
+}
+
+func (l *rollingLog) Write(b []byte) (int, error) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if len(b) >= maxBacklog {
+ l.buf = append(l.buf[:0], b...)
+ return len(b), nil
+ }
+ if len(l.buf)+len(b) > maxBacklog {
+ // Make room for b.
+ i := len(b)
+ if len(l.buf) > maxBacklog {
+ i += len(l.buf) - maxBacklog
+ }
+ b2 := l.buf[i:]
+ // Start at the first line feed,
+ // so that we don't keep partial lines.
+ if i := bytes.IndexByte(b2, '\n'); i >= 0 {
+ b2 = b2[i+1:]
+ }
+ // Replace buffer.
+ l.buf = append(l.buf[:0], b2...)
+ }
+ l.buf = append(l.buf, b...)
+ return len(b), nil
+}
+
+// Log returns a copy of the log buffer.
+func (l *rollingLog) Log() []byte {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ return append([]byte(nil), l.buf...)
+}
diff --git a/cmd/upspin-warden/log_test.go b/cmd/upspin-warden/log_test.go
new file mode 100644
index 0000000..90dcf98
--- /dev/null
+++ b/cmd/upspin-warden/log_test.go
@@ -0,0 +1,74 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "strings"
+ "testing"
+)
+
+func TestRollingLog(t *testing.T) {
+ oldMax := maxBacklog
+ defer func() { maxBacklog = oldMax }()
+ maxBacklog = 1024
+ l := rollingLog{}
+
+ for i := 0; i < 2000; i++ {
+ n := rand.Intn(100)
+ fmt.Fprintf(&l, "%.2d%s\n", n, strings.Repeat("n", n))
+ err := validate(l.Log())
+ if err != nil {
+ t.Fatalf("iteration %d: %v", n, err)
+ }
+ }
+
+ // Write a >maxBacklog string of m's and n's;
+ // it should just replace the log.
+ mm := strings.Repeat("m", 512)
+ nn := strings.Repeat("n", 512)
+ want := fmt.Sprintf("%s\n%s\n", mm, nn)
+ l.Write([]byte(want))
+ if got := string(l.Log()); got != want {
+ t.Fatalf("mismatch after long write\ngot %d bytes: %q\nwant %d bytes: %q",
+ len(got), got, len(want), want)
+ }
+
+ // Now this next write should leave us with
+ // the run of n's followed by "hello".
+ s := "hello\n"
+ l.Write([]byte(s))
+ want = fmt.Sprintf("%s\n%s", nn, s)
+ if got := string(l.Log()); got != want {
+ t.Fatalf("mismatch after short write after long write\ngot %d bytes: %q\nwant %d bytes: %q",
+ len(got), got, len(want), want)
+ }
+}
+
+func validate(b []byte) error {
+ lines := bytes.Split(b, []byte("\n"))
+ for i, l := range lines {
+ if len(l) == 0 {
+ if i != len(lines)-1 {
+ return fmt.Errorf("found empty line mid-log at %d", i)
+ }
+ return nil
+ }
+ if len(l) < 2 {
+ return fmt.Errorf("line %d too short", i)
+ }
+ n, err := strconv.Atoi(string(l[:2]))
+ if err != nil {
+ return fmt.Errorf("invalid length of line %d: %v", i, err)
+ }
+ if !bytes.Equal(l[2:], bytes.Repeat([]byte("n"), n)) {
+ return fmt.Errorf("bad line %d: %q", i, l)
+ }
+ }
+ return nil
+}
diff --git a/cmd/upspin-warden/main.go b/cmd/upspin-warden/main.go
new file mode 100644
index 0000000..05847c6
--- /dev/null
+++ b/cmd/upspin-warden/main.go
@@ -0,0 +1,175 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Command upspin-warden runs Upspin client daemons, such as upspinfs and
+// cacheserver, and exports information about them to external programs.
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "os/exec"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "upspin.io/flags"
+ "upspin.io/log"
+)
+
+func main() {
+ cmd := flag.String("cmd", "cacheserver,upspinfs,upspin-sharebot", "comma-separated list of `commands` to run")
+ flags.Parse(nil, "log", "config", "http")
+ w := NewWarden(strings.Split(*cmd, ","))
+ log.Fatal(http.ListenAndServe(flags.HTTPAddr, w))
+}
+
+// restartInterval specifies the time between daemon restarts.
+const restartInterval = 10 * time.Second
+
+// Warden implements the upspin-warden daemon.
+type Warden struct {
+ log rollingLog
+ procs map[string]*Process
+}
+
+// NewWarden creates a Warden that runs the given commands.
+// It implements a http.Handler that exports server state and logs.
+// It redirects global Upspin log output to its internal rolling log.
+func NewWarden(cmds []string) *Warden {
+ w := &Warden{procs: map[string]*Process{}}
+ for _, c := range cmds {
+ w.procs[c] = &Process{name: c}
+ }
+ log.SetOutput(io.MultiWriter(os.Stderr, &w.log))
+ for _, p := range w.procs {
+ go p.Run()
+ }
+ return w
+}
+
+// ServeHTTP implements http.Handler.
+func (w *Warden) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ switch name := r.URL.Path[1:]; name {
+ case "": // Root.
+ // Show truncated warden logs.
+ fmt.Fprintln(rw, "warden:")
+ fprintLastNLines(rw, w.log.Log(), 10, "\t")
+ // Show processes, their states, and truncated logs.
+ var names []string
+ for n := range w.procs {
+ names = append(names, n)
+ }
+ sort.Strings(names)
+ for _, n := range names {
+ p := w.procs[n]
+ fmt.Fprintf(rw, "\n%s: %s\n", n, p.State())
+ fprintLastNLines(rw, p.log.Log(), 10, "\t")
+ }
+ case "warden":
+ // Show complete warden log.
+ rw.Write(w.log.Log())
+ default:
+ // Show log for the given process.
+ p, ok := w.procs[name]
+ if !ok {
+ http.NotFound(rw, r)
+ return
+ }
+ rw.Write(p.log.Log())
+ }
+}
+
+// fprintLastNLines writes the last n lines of buf to w,
+// adding prefix to the start of each line.
+func fprintLastNLines(w io.Writer, buf []byte, n int, prefix string) {
+ lines := make([][]byte, 0, n)
+ for i := 0; i <= n; i++ {
+ j := bytes.LastIndexByte(buf, '\n')
+ if j <= 0 {
+ if len(buf) > 0 {
+ lines = append(lines, buf)
+ }
+ break
+ }
+ lines = append(lines, buf[j+1:])
+ buf = buf[:j]
+ }
+ for i := len(lines) - 1; i >= 0; i-- {
+ fmt.Fprintf(w, "%s%s\n", prefix, lines[i])
+ }
+}
+
+// ProcessState describes the state of a Process.
+type ProcessState int
+
+//go:generate stringer -type ProcessState
+
+const (
+ NotStarted ProcessState = iota
+ Starting
+ Running
+ Error
+)
+
+// Process manages the execution of a daemon process and captures its logs.
+type Process struct {
+ name string
+ log rollingLog
+
+ mu sync.Mutex
+ state ProcessState
+}
+
+// State reports the state of the process.
+func (p *Process) State() ProcessState {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.state
+}
+
+// Run executes the process in a loop, restarting it after restartInterval
+// since its last start.
+func (p *Process) Run() {
+ for {
+ started := time.Now()
+ err := p.exec()
+ log.Error.Printf("%v: %v", p.name, err)
+ if d := time.Since(started); d < restartInterval {
+ i := restartInterval - d
+ log.Debug.Printf("%v: waiting %v before restarting", p.name, i)
+ time.Sleep(i)
+ }
+ }
+}
+
+// Exec starts the process and waits for it to return,
+// updating the process's state field as necessary.
+func (p *Process) exec() error {
+ cmd := exec.Command(p.name,
+ "-log="+flags.Log.String(),
+ "-config="+flags.Config)
+ cmd.Stdout = &p.log
+ cmd.Stderr = &p.log
+ p.setState(Starting)
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+ p.setState(Running)
+ err := cmd.Wait()
+ p.setState(Error)
+ return err
+}
+
+func (p *Process) setState(s ProcessState) {
+ p.mu.Lock()
+ p.state = s
+ p.mu.Unlock()
+ log.Debug.Printf("%s: %s", p.name, s)
+}
diff --git a/cmd/upspin-warden/processstate_string.go b/cmd/upspin-warden/processstate_string.go
new file mode 100644
index 0000000..acd0d2d
--- /dev/null
+++ b/cmd/upspin-warden/processstate_string.go
@@ -0,0 +1,16 @@
+// Code generated by "stringer -type ProcessState"; DO NOT EDIT.
+
+package main
+
+import "strconv"
+
+const _ProcessState_name = "StoppedStartingRunningError"
+
+var _ProcessState_index = [...]uint8{0, 7, 15, 22, 27}
+
+func (i ProcessState) String() string {
+ if i < 0 || i >= ProcessState(len(_ProcessState_index)-1) {
+ return "ProcessState(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _ProcessState_name[_ProcessState_index[i]:_ProcessState_index[i+1]]
+}