cmd/cam{server,client}: reliability tweaks

In the camserver, replace the LRU of previous frames with a map of frame
data and an ordered list of frame references, so the oldest frames are
always freed first. (Using an LRU for this purpose actually has
pathological worst-case behavior when a client falls behind.)

In camclient, continue fetching frames if we fail to fetch one (if we
fall behind, for instance), and log frame rate statistics.

Change-Id: Iacc3f3a6e06ab8445f5de9e47231f949c5af07b2
Reviewed-on: https://upspin-review.googlesource.com/15900
Reviewed-by: Rob Pike <r@golang.org>
diff --git a/cmd/camclient/main.go b/cmd/camclient/main.go
index ee8262b..4250515 100644
--- a/cmd/camclient/main.go
+++ b/cmd/camclient/main.go
@@ -14,6 +14,7 @@
 	"net/textproto"
 	"os"
 	"sync"
+	"time"
 
 	"upspin.io/client"
 	"upspin.io/client/clientutil"
@@ -72,23 +73,14 @@
 	if err != nil {
 		return nil, err
 	}
-	done := make(chan struct{})
-	events, err := dir.Watch(name, 0, done)
-	if err != nil {
-		return nil, err
-	}
+	entries := make(chan *upspin.DirEntry)
 	go func() {
-		defer close(done)
-		for e := range events {
-			if e.Error != nil {
-				log.Println(e.Error)
-				return
-			}
+		for e := range entries {
 			// Read the latest frame.
-			frame, err := clientutil.ReadAll(cfg, e.Entry)
+			frame, err := clientutil.ReadAll(cfg, e)
 			if err != nil {
-				log.Println(err)
-				return
+				log.Error.Print(err)
+				continue
 			}
 			// Share it with viewers.
 			h.mu.Lock()
@@ -97,6 +89,49 @@
 			h.update.Broadcast()
 		}
 	}()
+	go func() {
+		var (
+			fetched, skipped int
+			lastUpdate       = time.Now()
+			done             chan struct{}
+		)
+		for {
+			if done != nil {
+				close(done)
+			}
+			done = make(chan struct{})
+			events, err := dir.Watch(name, 0, done)
+			if err != nil {
+				log.Error.Print(err)
+				time.Sleep(5 * time.Second)
+				continue
+			}
+			for e := range events {
+				if e.Error != nil {
+					log.Error.Print(e.Error)
+					break
+				}
+				// Do a non-blocking send here so that we skip
+				// this frame if we're still fetching an old
+				// frame, to prevent us from failling behind.
+				select {
+				case entries <- e.Entry:
+					log.Debug.Printf("fetching frame %d", e.Entry.Sequence)
+					fetched++
+				default:
+					log.Debug.Printf("skipped frame %d", e.Entry.Sequence)
+					skipped++
+				}
+				if d := time.Since(lastUpdate); d > 10*time.Second {
+					sec := float64(d) / float64(time.Second)
+					fps := float64(fetched) / sec
+					sps := float64(skipped) / sec
+					log.Info.Printf("frames per second: %.3g fetched, %.3g skipped", fps, sps)
+					fetched, skipped, lastUpdate = 0, 0, time.Now()
+				}
+			}
+		}
+	}()
 
 	return h, nil
 }
diff --git a/cmd/camserver/main.go b/cmd/camserver/main.go
index d0cc539..9dc6be6 100644
--- a/cmd/camserver/main.go
+++ b/cmd/camserver/main.go
@@ -19,7 +19,6 @@
 
 	"upspin.io/access"
 	"upspin.io/bind"
-	"upspin.io/cache"
 	"upspin.io/client"
 	"upspin.io/cloud/https"
 	"upspin.io/config"
@@ -89,12 +88,13 @@
 
 // state contains mutable state shared by all users of the server.
 type state struct {
-	frameData *cache.LRU // map[upspin.Reference][]byte
-	sequence  int64      // read/written only by capture method
+	sequence int64 // read/written only by capture method
 
 	mu         sync.Mutex
 	update     *sync.Cond
-	frameEntry *upspin.DirEntry // The current frame.
+	frameEntry *upspin.DirEntry            // The current frame.
+	frames     map[upspin.Reference][]byte // Frame data.
+	frameSeq   []upspin.Reference          // Ordered list of frames (oldest first).
 }
 
 // dirServer is a shim around server that implements upspin.DirServer.
@@ -113,8 +113,8 @@
 	accessFileName    = access.AccessFile
 	accessRef         = upspin.Reference(accessFileName)
 	frameFileName     = "frame.jpg"
-	numFrames         = 100 // The number of frames to keep in memory.
-	framesPerSecond   = 30
+	numFrames         = 500 // The number of frames to keep in memory.
+	framesPerSecond   = 5
 	watchEventTimeout = 5 * time.Second
 )
 
@@ -130,9 +130,7 @@
 		cfg:          cfg,
 		ep:           ep,
 		framePacking: upspin.EEPack,
-		state: &state{
-			frameData: cache.NewLRU(numFrames),
-		},
+		state:        &state{frames: map[upspin.Reference][]byte{}},
 	}
 	s.update = sync.NewCond(&s.mu)
 
@@ -269,9 +267,15 @@
 			pack.Lookup(upspin.EEPack).Share(s.cfg, s.readerKeys, packdata)
 		}
 
-		// Update frameData and frameEntry.
-		s.frameData.Add(ref, cipher)
+		// Update frameSeq, frames, and frameEntry.
 		s.mu.Lock()
+		for len(s.frameSeq) >= numFrames {
+			// Delete the oldest frames.
+			delete(s.frames, s.frameSeq[0])
+			s.frameSeq = s.frameSeq[1:]
+		}
+		s.frames[ref] = cipher
+		s.frameSeq = append(s.frameSeq, ref)
 		s.frameEntry = de
 		s.mu.Unlock()
 
@@ -491,14 +495,17 @@
 	if ref == accessRef {
 		return s.accessBytes, &accessRefdata, nil, nil
 	}
-	if b, ok := s.frameData.Get(ref); ok {
-		return b.([]byte), &upspin.Refdata{
-			Reference: ref,
-			Volatile:  true,
-			Duration:  time.Second,
-		}, nil, nil
+	s.mu.Lock()
+	b, ok := s.frames[ref]
+	s.mu.Unlock()
+	if !ok {
+		return nil, nil, nil, errors.E(errors.NotExist)
 	}
-	return nil, nil, nil, errors.E(errors.NotExist)
+	return b, &upspin.Refdata{
+		Reference: ref,
+		Volatile:  true,
+		Duration:  time.Second,
+	}, nil, nil
 }
 
 func (s storeServer) Put([]byte) (*upspin.Refdata, error) {
diff --git a/cmd/camserver/test.sh b/cmd/camserver/test.sh
index a290e38..4cc336a 100755
--- a/cmd/camserver/test.sh
+++ b/cmd/camserver/test.sh
@@ -4,4 +4,4 @@
 ls camserver@example.com
 info camserver@example.com/frame.jpg
 cp camserver@example.com/frame.jpg .
-' | upbox -config=upbox.config
+' | upbox -schema=camserver.upbox