exp/cmd/camserver: add server that serves webcam images

A first cut at a webcam server. Only works on MacOS machines with built
in webcams for now. Requires ffmpeg be installed in the system PATH.

Change-Id: Ie87e7f39e955bd660573e55e99997e1ad5c6a5f8
Reviewed-on: https://upspin-review.googlesource.com/9761
Reviewed-by: Rob Pike <r@golang.org>
diff --git a/cmd/camserver/main.go b/cmd/camserver/main.go
new file mode 100644
index 0000000..fb03da3
--- /dev/null
+++ b/cmd/camserver/main.go
@@ -0,0 +1,343 @@
+// Copyright 2017 The Upspin Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Command camserver is an Upspin Directory and Store server that serves JPEG
+// images read from a webcam. It requires an ffmpeg binary be present in PATH.
+// It only works with the built in camera on MacOS machines, for now.
+package main
+
+// TODO(adg): configurable access controls.
+// TODO(adg): implement Watch.
+
+import (
+	"io/ioutil"
+	"mime/multipart"
+	"net/http"
+	"os/exec"
+	"sync"
+	"time"
+
+	"upspin.io/access"
+	"upspin.io/cache"
+	"upspin.io/cloud/https"
+	"upspin.io/config"
+	"upspin.io/errors"
+	"upspin.io/flags"
+	"upspin.io/key/sha256key"
+	"upspin.io/log"
+	"upspin.io/pack"
+	"upspin.io/path"
+	"upspin.io/rpc/dirserver"
+	"upspin.io/rpc/storeserver"
+	"upspin.io/serverutil"
+	"upspin.io/transports"
+	"upspin.io/upspin"
+
+	_ "upspin.io/pack/eeintegrity"
+)
+
+func main() {
+	flags.Parse(flags.Server)
+
+	cfg, err := config.FromFile(flags.Config)
+	if err != nil {
+		log.Fatal(err)
+	}
+	transports.Init(cfg)
+	addr := upspin.NetAddr(flags.NetAddr)
+	ep := upspin.Endpoint{
+		Transport: upspin.Remote,
+		NetAddr:   addr,
+	}
+
+	s, err := newServer(cfg, ep)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	http.Handle("/api/Dir/", dirserver.New(cfg, dirServer{server: s}, addr))
+	http.Handle("/api/Store/", storeserver.New(cfg, storeServer{server: s}, addr))
+
+	https.ListenAndServeFromFlags(nil)
+}
+
+// server is the base for a combined Upspin DirServer and StoreServer
+// implementation that serves frames from a webcam.
+// Each frame is served as frame.jpg in the root of the tree of cfg's user.
+// As each new frame is read from the camera, it replaces frame.jpg.
+type server struct {
+	// Set by newServer.
+	cfg         upspin.Config
+	ep          upspin.Endpoint
+	accessEntry *upspin.DirEntry
+	accessBytes []byte
+
+	// Set by Dial.
+	user upspin.UserName
+
+	// state is embedded here as a struct pointer so that the Dial methods
+	// do not make a copy of it when they copy the server struct.
+	*state
+}
+
+// state contains mutable state shared by all users of the server.
+type state struct {
+	frameData *cache.LRU // map[upspin.Reference][]byte
+	sequence  int64      // read/written only by capture method
+
+	mu         sync.Mutex
+	frameEntry *upspin.DirEntry // The current frame.
+}
+
+// dirServer is a shim around server that implements upspin.DirServer.
+type dirServer struct {
+	*server
+	stubService
+}
+
+// storeServer is a shim around server that implements upspin.StoreServer.
+type storeServer struct {
+	*server
+	stubService
+}
+
+const (
+	accessFile     = "Read, List: all\n"
+	accessFileName = access.AccessFile
+	accessRef      = upspin.Reference(accessFileName)
+	frameFileName  = "frame.jpg"
+	packing        = upspin.EEIntegrityPack
+	numFrames      = 100 // The number of frames to keep in memory.
+)
+
+var (
+	accessRefdata     = upspin.Refdata{Reference: accessRef}
+	errNotImplemented = errors.Str("not implemented")
+)
+
+// newServer initializes a server with the given Config and Endpoint,
+// and starts ffmpeg to read frames from the built-in webcam.
+func newServer(cfg upspin.Config, ep upspin.Endpoint) (*server, error) {
+	s := &server{
+		cfg: cfg,
+		ep:  ep,
+		state: &state{
+			frameData: cache.NewLRU(numFrames),
+		},
+	}
+
+	var err error
+	s.accessEntry, s.accessBytes, err = s.pack(accessFileName, accessRef, []byte(accessFile), 0)
+	if err != nil {
+		return nil, err
+	}
+
+	if err := s.capture(); err != nil {
+		return nil, err
+	}
+
+	return s, nil
+}
+
+// pack packs the given file using packing
+// and returns the resulting DirEntry and ciphertext.
+func (s *server) pack(filePath string, ref upspin.Reference, data []byte, seq int64) (*upspin.DirEntry, []byte, error) {
+	name := upspin.PathName(s.cfg.UserName()) + "/" + upspin.PathName(filePath)
+	de := &upspin.DirEntry{
+		Writer:     s.cfg.UserName(),
+		Name:       name,
+		SignedName: name,
+		Packing:    packing,
+		Time:       upspin.Now(),
+		Sequence:   seq,
+	}
+
+	bp, err := pack.Lookup(packing).Pack(s.cfg, de)
+	if err != nil {
+		return nil, nil, err
+	}
+	cipher, err := bp.Pack(data)
+	if err != nil {
+		return nil, nil, err
+	}
+	bp.SetLocation(upspin.Location{
+		Endpoint:  s.ep,
+		Reference: ref,
+	})
+	return de, cipher, bp.Close()
+}
+
+// capture starts ffmpeg to read a video stream from the built-in webcam,
+// packing each frame as a DirEntry and storing it in frameEntry and
+// frameBytes. It returns after the first frame has been packed.
+func (s *server) capture() error {
+	// TODO(adg): make this command line configurable.
+	cmd := exec.Command("ffmpeg",
+		// Input from the FaceTime webcam (present in most Macs).
+		"-f", "avfoundation", "-pix_fmt", "0rgb", "-s", "1280x720", "-r", "30", "-i", "FaceTime",
+		// Output Motion JPEG at 2fps at high quality.
+		"-f", "mpjpeg", "-r", "2", "-b:v", "1M", "-")
+	out, err := cmd.StdoutPipe()
+	if err != nil {
+		return err
+	}
+	if err := cmd.Start(); err != nil {
+		return err
+	}
+	mr := multipart.NewReader(out, "ffserver")
+	readFrame := func() error {
+		p, err := mr.NextPart()
+		if err != nil {
+			return err
+		}
+		b, err := ioutil.ReadAll(p)
+		if err != nil {
+			return err
+		}
+		ref := upspin.Reference(sha256key.Of(b).String())
+		de, data, err := s.pack(frameFileName, ref, b, s.sequence)
+		if err != nil {
+			return err
+		}
+		s.sequence++
+		s.frameData.Add(ref, data)
+		s.mu.Lock()
+		s.frameEntry = de
+		s.mu.Unlock()
+		return nil
+	}
+	// Read the first frame so that when this function returns
+	// the frameEntry and frameData fields are initialized.
+	// This has the pleasant side effect of making sure that
+	// ffmpeg is working correctly.
+	if err := readFrame(); err != nil {
+		return err
+	}
+	go func() {
+		for {
+			if err := readFrame(); err != nil {
+				log.Println("readFrame:", err)
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+// upspin.Service and upspin.Dialer methods.
+
+func (s dirServer) Endpoint() upspin.Endpoint { return s.ep }
+
+func (s dirServer) Dial(cfg upspin.Config, ep upspin.Endpoint) (upspin.Service, error) {
+	s2 := *s.server
+	s2.user = cfg.UserName()
+	return dirServer{server: &s2}, nil
+}
+
+func (s storeServer) Endpoint() upspin.Endpoint { return s.ep }
+
+func (s storeServer) Dial(cfg upspin.Config, ep upspin.Endpoint) (upspin.Service, error) {
+	s2 := *s.server
+	s2.user = cfg.UserName()
+	return storeServer{server: &s2}, nil
+}
+
+// upspin.DirServer methods.
+
+func (s dirServer) Lookup(name upspin.PathName) (*upspin.DirEntry, error) {
+	p, err := path.Parse(name)
+	if err != nil {
+		return nil, err
+	}
+	if p.User() != s.cfg.UserName() {
+		return nil, errors.E(name, errors.NotExist)
+	}
+
+	fp := p.FilePath()
+	switch fp {
+	case "": // Root directory.
+		return &upspin.DirEntry{
+			Name:       p.Path(),
+			SignedName: p.Path(),
+			Attr:       upspin.AttrDirectory,
+			Time:       upspin.Now(),
+		}, nil
+	case accessFileName:
+		return s.accessEntry, nil
+	case frameFileName:
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		return s.frameEntry, nil
+	default:
+		return nil, errors.E(name, errors.NotExist)
+	}
+}
+
+func (s dirServer) Glob(pattern string) ([]*upspin.DirEntry, error) {
+	return serverutil.Glob(pattern, s.Lookup, s.listDir)
+}
+
+func (s dirServer) listDir(name upspin.PathName) ([]*upspin.DirEntry, error) {
+	p, err := path.Parse(name)
+	if err != nil {
+		return nil, err
+	}
+	if p.User() != s.cfg.UserName() || p.FilePath() != "" {
+		return nil, errors.E(name, errors.NotExist)
+	}
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return []*upspin.DirEntry{
+		s.accessEntry,
+		s.frameEntry,
+	}, nil
+}
+
+func (s dirServer) WhichAccess(upspin.PathName) (*upspin.DirEntry, error) {
+	return s.accessEntry, nil
+}
+
+func (s dirServer) Watch(name upspin.PathName, order int64, done <-chan struct{}) (<-chan upspin.Event, error) {
+	return nil, upspin.ErrNotSupported
+}
+
+func (s dirServer) Put(*upspin.DirEntry) (*upspin.DirEntry, error) {
+	return nil, errNotImplemented
+}
+
+func (s dirServer) Delete(upspin.PathName) (*upspin.DirEntry, error) {
+	return nil, errNotImplemented
+}
+
+// upspin.StoreServer methods.
+
+func (s storeServer) Get(ref upspin.Reference) ([]byte, *upspin.Refdata, []upspin.Location, error) {
+	if ref == accessRef {
+		return s.accessBytes, &accessRefdata, nil, nil
+	}
+	if b, ok := s.frameData.Get(ref); ok {
+		return b.([]byte), &upspin.Refdata{
+			Reference: ref,
+			Volatile:  true,
+			Duration:  time.Second,
+		}, nil, nil
+	}
+	return nil, nil, nil, errors.E(errors.NotExist)
+}
+
+func (s storeServer) Put([]byte) (*upspin.Refdata, error) {
+	return nil, errNotImplemented
+}
+
+func (s storeServer) Delete(upspin.Reference) error {
+	return errNotImplemented
+}
+
+// stubService provides a stub implementation of upspin.Service.
+type stubService struct {
+}
+
+func (s stubService) Endpoint() upspin.Endpoint { return upspin.Endpoint{} }
+func (s stubService) Ping() bool                { return true }
+func (s stubService) Close()                    {}
diff --git a/cmd/camserver/test.sh b/cmd/camserver/test.sh
new file mode 100755
index 0000000..6a2658d
--- /dev/null
+++ b/cmd/camserver/test.sh
@@ -0,0 +1,6 @@
+#!/bin/sh
+
+echo '
+ls camserver@example.com
+cp camserver@example.com/frame.jpg .
+' | upbox -config=upbox.config
diff --git a/cmd/camserver/upbox.config b/cmd/camserver/upbox.config
new file mode 100644
index 0000000..dae571f
--- /dev/null
+++ b/cmd/camserver/upbox.config
@@ -0,0 +1,14 @@
+users:
+  - name: user
+  - name: camserver
+    dirserver: $camserver
+    storeserver: $camserver
+
+servers:
+  - name: keyserver
+  - name: storeserver
+  - name: dirserver
+  - name: camserver
+    importpath: upspin.io/exp/cmd/camserver
+
+domain: example.com