| // Copyright 2017 The Upspin Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Command camclient serves a video stream generated by |
| // exp.upspin.io/cmd/camserver over HTTP as a Motion JPEG stream. |
| package main // import "exp.upspin.io/cmd/camclient" |
| |
| import ( |
| "flag" |
| "fmt" |
| "mime/multipart" |
| "net/http" |
| "net/textproto" |
| "os" |
| "sync" |
| "time" |
| |
| "upspin.io/client" |
| "upspin.io/client/clientutil" |
| "upspin.io/config" |
| "upspin.io/flags" |
| "upspin.io/log" |
| "upspin.io/transports" |
| "upspin.io/upspin" |
| ) |
| |
| func main() { |
| flag.Usage = func() { |
| fmt.Fprintln(os.Stderr, "usage: camclient [flags] <Upspin path>") |
| flag.PrintDefaults() |
| } |
| |
| flags.Parse(flags.Client, "http") |
| |
| if flag.NArg() != 1 { |
| fmt.Fprintln(os.Stderr, "camclient: need exactly one path name argument") |
| flag.Usage() |
| os.Exit(2) |
| } |
| name := upspin.PathName(flag.Arg(0)) |
| |
| cfg, err := config.FromFile(flags.Config) |
| if err != nil { |
| log.Fatal(err) |
| } |
| transports.Init(cfg) |
| |
| h, err := newHandler(cfg, name) |
| if err != nil { |
| log.Fatal(err) |
| } |
| http.Handle("/", h) |
| log.Fatal(http.ListenAndServe(flags.HTTPAddr, nil)) |
| } |
| |
| // handler is an http.Handler that serves a Motion JPEG of a camserver stream. |
| type handler struct { |
| mu sync.Mutex |
| update *sync.Cond |
| frame []byte |
| } |
| |
| // newHandler initializes a handler that streams the provided camserver path |
| // with the given config. The handler only watches and fetches each frame once |
| // regardless of the number of concurrent viewers. |
| func newHandler(cfg upspin.Config, name upspin.PathName) (http.Handler, error) { |
| h := &handler{} |
| h.update = sync.NewCond(&h.mu) |
| |
| c := client.New(cfg) |
| dir, err := c.DirServer(name) |
| if err != nil { |
| return nil, err |
| } |
| entries := make(chan *upspin.DirEntry) |
| go func() { |
| for e := range entries { |
| // Read the latest frame. |
| frame, err := clientutil.ReadAll(cfg, e) |
| if err != nil { |
| log.Error.Print(err) |
| continue |
| } |
| // Share it with viewers. |
| h.mu.Lock() |
| h.frame = frame |
| h.mu.Unlock() |
| h.update.Broadcast() |
| } |
| }() |
| go func() { |
| var ( |
| fetched, skipped int |
| lastUpdate = time.Now() |
| done chan struct{} |
| ) |
| for { |
| if done != nil { |
| close(done) |
| } |
| done = make(chan struct{}) |
| events, err := dir.Watch(name, 0, done) |
| if err != nil { |
| log.Error.Print(err) |
| time.Sleep(5 * time.Second) |
| continue |
| } |
| for e := range events { |
| if e.Error != nil { |
| log.Error.Print(e.Error) |
| break |
| } |
| // Do a non-blocking send here so that we skip |
| // this frame if we're still fetching an old |
| // frame, to prevent us from failling behind. |
| select { |
| case entries <- e.Entry: |
| log.Debug.Printf("fetching frame %d", e.Entry.Sequence) |
| fetched++ |
| default: |
| log.Debug.Printf("skipped frame %d", e.Entry.Sequence) |
| skipped++ |
| } |
| if d := time.Since(lastUpdate); d > 10*time.Second { |
| sec := float64(d) / float64(time.Second) |
| fps := float64(fetched) / sec |
| sps := float64(skipped) / sec |
| log.Info.Printf("frames per second: %.3g fetched, %.3g skipped", fps, sps) |
| fetched, skipped, lastUpdate = 0, 0, time.Now() |
| } |
| } |
| } |
| }() |
| |
| return h, nil |
| } |
| |
| func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| // Motion JPEG is a mulitpart MIME-encoded series of JPEG images. |
| mw := multipart.NewWriter(w) |
| w.Header().Set("Content-Type", "multipart/x-mixed-replace;boundary="+mw.Boundary()) |
| partHeader := textproto.MIMEHeader{"Content-Type": {"image/jpeg"}} |
| for { |
| // Wait for a new frame to become available. |
| h.mu.Lock() |
| h.update.Wait() |
| frame := h.frame |
| h.mu.Unlock() |
| // Write that frame as a new MIME part. |
| w, err := mw.CreatePart(partHeader) |
| if err != nil { |
| log.Println(err) |
| return |
| } |
| _, err = w.Write(frame) |
| if err != nil { |
| log.Println(err) |
| return |
| } |
| } |
| } |