cmd/upspin-warden: add daemon to run and monitor Upspin processes

This is a starting point for a backend for the various native console
implementations, as described here:
https://github.com/upspin/upspin/issues/312#issuecomment-345548105

Update upspin/upspin#312

Change-Id: I79aeeec8654c3204cc72e05ff1088cf392e4ee1b
Reviewed-on: https://upspin-review.googlesource.com/16560
Reviewed-by: David Presotto <presotto@gmail.com>
Reviewed-by: Rob Pike <r@golang.org>
diff --git a/cmd/upspin-warden/log.go b/cmd/upspin-warden/log.go
new file mode 100644
index 0000000..d08e10f
--- /dev/null
+++ b/cmd/upspin-warden/log.go
@@ -0,0 +1,53 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"bytes"
+	"sync"
+)
+
+var maxBacklog = 64 * 1024 // Tests override this.
+
+// rollingLog is an io.Writer that buffers all data written to it, purging
+// earlier entries to maintain a buffer size of maxBacklog bytes.
+// Its methods are safe for concurrent use.
+type rollingLog struct {
+	mu  sync.Mutex
+	buf []byte
+}
+
+func (l *rollingLog) Write(b []byte) (int, error) {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	if len(b) >= maxBacklog {
+		l.buf = append(l.buf[:0], b...)
+		return len(b), nil
+	}
+	if len(l.buf)+len(b) > maxBacklog {
+		// Make room for b.
+		i := len(b)
+		if len(l.buf) > maxBacklog {
+			i += len(l.buf) - maxBacklog
+		}
+		b2 := l.buf[i:]
+		// Start at the first line feed,
+		// so that we don't keep partial lines.
+		if i := bytes.IndexByte(b2, '\n'); i >= 0 {
+			b2 = b2[i+1:]
+		}
+		// Replace buffer.
+		l.buf = append(l.buf[:0], b2...)
+	}
+	l.buf = append(l.buf, b...)
+	return len(b), nil
+}
+
+// Log returns a copy of the log buffer.
+func (l *rollingLog) Log() []byte {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	return append([]byte(nil), l.buf...)
+}
diff --git a/cmd/upspin-warden/log_test.go b/cmd/upspin-warden/log_test.go
new file mode 100644
index 0000000..90dcf98
--- /dev/null
+++ b/cmd/upspin-warden/log_test.go
@@ -0,0 +1,74 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"bytes"
+	"fmt"
+	"math/rand"
+	"strconv"
+	"strings"
+	"testing"
+)
+
+func TestRollingLog(t *testing.T) {
+	oldMax := maxBacklog
+	defer func() { maxBacklog = oldMax }()
+	maxBacklog = 1024
+	l := rollingLog{}
+
+	for i := 0; i < 2000; i++ {
+		n := rand.Intn(100)
+		fmt.Fprintf(&l, "%.2d%s\n", n, strings.Repeat("n", n))
+		err := validate(l.Log())
+		if err != nil {
+			t.Fatalf("iteration %d: %v", n, err)
+		}
+	}
+
+	// Write a >maxBacklog string of m's and n's;
+	// it should just replace the log.
+	mm := strings.Repeat("m", 512)
+	nn := strings.Repeat("n", 512)
+	want := fmt.Sprintf("%s\n%s\n", mm, nn)
+	l.Write([]byte(want))
+	if got := string(l.Log()); got != want {
+		t.Fatalf("mismatch after long write\ngot %d bytes: %q\nwant %d bytes: %q",
+			len(got), got, len(want), want)
+	}
+
+	// Now this next write should leave us with
+	// the run of n's followed by "hello".
+	s := "hello\n"
+	l.Write([]byte(s))
+	want = fmt.Sprintf("%s\n%s", nn, s)
+	if got := string(l.Log()); got != want {
+		t.Fatalf("mismatch after short write after long write\ngot %d bytes: %q\nwant %d bytes: %q",
+			len(got), got, len(want), want)
+	}
+}
+
+func validate(b []byte) error {
+	lines := bytes.Split(b, []byte("\n"))
+	for i, l := range lines {
+		if len(l) == 0 {
+			if i != len(lines)-1 {
+				return fmt.Errorf("found empty line mid-log at %d", i)
+			}
+			return nil
+		}
+		if len(l) < 2 {
+			return fmt.Errorf("line %d too short", i)
+		}
+		n, err := strconv.Atoi(string(l[:2]))
+		if err != nil {
+			return fmt.Errorf("invalid length of line %d: %v", i, err)
+		}
+		if !bytes.Equal(l[2:], bytes.Repeat([]byte("n"), n)) {
+			return fmt.Errorf("bad line %d: %q", i, l)
+		}
+	}
+	return nil
+}
diff --git a/cmd/upspin-warden/main.go b/cmd/upspin-warden/main.go
new file mode 100644
index 0000000..05847c6
--- /dev/null
+++ b/cmd/upspin-warden/main.go
@@ -0,0 +1,175 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Command upspin-warden runs Upspin client daemons, such as upspinfs and
+// cacheserver, and exports information about them to external programs.
+package main
+
+import (
+	"bytes"
+	"flag"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"os/exec"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+
+	"upspin.io/flags"
+	"upspin.io/log"
+)
+
+func main() {
+	cmd := flag.String("cmd", "cacheserver,upspinfs,upspin-sharebot", "comma-separated list of `commands` to run")
+	flags.Parse(nil, "log", "config", "http")
+	w := NewWarden(strings.Split(*cmd, ","))
+	log.Fatal(http.ListenAndServe(flags.HTTPAddr, w))
+}
+
+// restartInterval specifies the time between daemon restarts.
+const restartInterval = 10 * time.Second
+
+// Warden implements the upspin-warden daemon.
+type Warden struct {
+	log   rollingLog
+	procs map[string]*Process
+}
+
+// NewWarden creates a Warden that runs the given commands.
+// It implements a http.Handler that exports server state and logs.
+// It redirects global Upspin log output to its internal rolling log.
+func NewWarden(cmds []string) *Warden {
+	w := &Warden{procs: map[string]*Process{}}
+	for _, c := range cmds {
+		w.procs[c] = &Process{name: c}
+	}
+	log.SetOutput(io.MultiWriter(os.Stderr, &w.log))
+	for _, p := range w.procs {
+		go p.Run()
+	}
+	return w
+}
+
+// ServeHTTP implements http.Handler.
+func (w *Warden) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+	switch name := r.URL.Path[1:]; name {
+	case "": // Root.
+		// Show truncated warden logs.
+		fmt.Fprintln(rw, "warden:")
+		fprintLastNLines(rw, w.log.Log(), 10, "\t")
+		// Show processes, their states, and truncated logs.
+		var names []string
+		for n := range w.procs {
+			names = append(names, n)
+		}
+		sort.Strings(names)
+		for _, n := range names {
+			p := w.procs[n]
+			fmt.Fprintf(rw, "\n%s: %s\n", n, p.State())
+			fprintLastNLines(rw, p.log.Log(), 10, "\t")
+		}
+	case "warden":
+		// Show complete warden log.
+		rw.Write(w.log.Log())
+	default:
+		// Show log for the given process.
+		p, ok := w.procs[name]
+		if !ok {
+			http.NotFound(rw, r)
+			return
+		}
+		rw.Write(p.log.Log())
+	}
+}
+
+// fprintLastNLines writes the last n lines of buf to w,
+// adding prefix to the start of each line.
+func fprintLastNLines(w io.Writer, buf []byte, n int, prefix string) {
+	lines := make([][]byte, 0, n)
+	for i := 0; i <= n; i++ {
+		j := bytes.LastIndexByte(buf, '\n')
+		if j <= 0 {
+			if len(buf) > 0 {
+				lines = append(lines, buf)
+			}
+			break
+		}
+		lines = append(lines, buf[j+1:])
+		buf = buf[:j]
+	}
+	for i := len(lines) - 1; i >= 0; i-- {
+		fmt.Fprintf(w, "%s%s\n", prefix, lines[i])
+	}
+}
+
+// ProcessState describes the state of a Process.
+type ProcessState int
+
+//go:generate stringer -type ProcessState
+
+const (
+	NotStarted ProcessState = iota
+	Starting
+	Running
+	Error
+)
+
+// Process manages the execution of a daemon process and captures its logs.
+type Process struct {
+	name string
+	log  rollingLog
+
+	mu    sync.Mutex
+	state ProcessState
+}
+
+// State reports the state of the process.
+func (p *Process) State() ProcessState {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	return p.state
+}
+
+// Run executes the process in a loop, restarting it after restartInterval
+// since its last start.
+func (p *Process) Run() {
+	for {
+		started := time.Now()
+		err := p.exec()
+		log.Error.Printf("%v: %v", p.name, err)
+		if d := time.Since(started); d < restartInterval {
+			i := restartInterval - d
+			log.Debug.Printf("%v: waiting %v before restarting", p.name, i)
+			time.Sleep(i)
+		}
+	}
+}
+
+// Exec starts the process and waits for it to return,
+// updating the process's state field as necessary.
+func (p *Process) exec() error {
+	cmd := exec.Command(p.name,
+		"-log="+flags.Log.String(),
+		"-config="+flags.Config)
+	cmd.Stdout = &p.log
+	cmd.Stderr = &p.log
+	p.setState(Starting)
+	if err := cmd.Start(); err != nil {
+		return err
+	}
+	p.setState(Running)
+	err := cmd.Wait()
+	p.setState(Error)
+	return err
+}
+
+func (p *Process) setState(s ProcessState) {
+	p.mu.Lock()
+	p.state = s
+	p.mu.Unlock()
+	log.Debug.Printf("%s: %s", p.name, s)
+}
diff --git a/cmd/upspin-warden/processstate_string.go b/cmd/upspin-warden/processstate_string.go
new file mode 100644
index 0000000..acd0d2d
--- /dev/null
+++ b/cmd/upspin-warden/processstate_string.go
@@ -0,0 +1,16 @@
+// Code generated by "stringer -type ProcessState"; DO NOT EDIT.
+
+package main
+
+import "strconv"
+
+const _ProcessState_name = "StoppedStartingRunningError"
+
+var _ProcessState_index = [...]uint8{0, 7, 15, 22, 27}
+
+func (i ProcessState) String() string {
+	if i < 0 || i >= ProcessState(len(_ProcessState_index)-1) {
+		return "ProcessState(" + strconv.FormatInt(int64(i), 10) + ")"
+	}
+	return _ProcessState_name[_ProcessState_index[i]:_ProcessState_index[i+1]]
+}