blob: 61a5b541a665b7ee020773ff99f048230e80b55e [file] [log] [blame]
// Copyright 2017 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inprocess
import (
"time"
"upspin.io/access"
"upspin.io/errors"
"upspin.io/log"
"upspin.io/path"
"upspin.io/upspin"
)
const watchTimeout = 10 * time.Second
// A listener connects the event manager to an events channel on which
// to deliver events to a single client.
type listener struct {
eventMgr *eventManager
root path.Parsed // The root of the subtree of interest.
server *server // Holds the user info; needed for access control.
done <-chan struct{} // From the Watch method; signals termination.
events chan<- upspin.Event
sequence int64 // The point in the event stream the listener has reached.
}
// want reports whether this listener is interested in the event, which means that the
// event is below its root in the tree and is accessible to its user. If the user does not
// have read permission, the event is trimmed and marked incomplete.
func (l *listener) want(event upspin.Event, parsed path.Parsed) (upspin.Event, bool) {
if !parsed.HasPrefix(l.root) {
return upspin.Event{}, false
}
// Errors in access checks don't matter here; we just ignore things that fail or are blocked.
if canAny, _ := l.server.can(access.AnyRight, parsed); !canAny {
return upspin.Event{}, false
}
canRead, _ := l.server.can(access.Read, parsed)
if event.Entry.IsDir() || (!canRead && !access.IsAccessControlFile(event.Entry.SignedName)) {
// Must make a copy of the entry before cleaning it.
entry := *event.Entry
entry.MarkIncomplete()
event.Entry = &entry
}
return event, true
}
func (l *listener) doneHandler() {
<-l.done
l.eventMgr.listenerDone <- l
}
// sendAll sends, if possible, all the events. It returns false if it cannot
// complete the list. l.sequence keeps track of our progress.
func (l *listener) sendAll(events []upspin.Event) bool {
for _, event := range events {
parsed, err := path.Parse(event.Entry.Name)
if err != nil {
// Shouldn't happen.
log.Info.Printf("dir/inprocess.Sendall: parse error for %q: %v", event.Entry.Name, err)
continue
}
if cleanEvent, ok := l.want(event, parsed); ok {
select {
case l.events <- cleanEvent:
// Delivered.
l.sequence++
case <-time.After(watchTimeout):
// Failed to deliver; client is not keeping up.
return false
}
}
}
return true
}
// sendTree sends events for the tree rooted at name. The boolean
// return reports whether it sent all valid info to the client. Thus a
// false return means this listener did not start properly, but it is
// not an error if name does not exist, as it may appear later.
// If it returns false, it attempts to send an error event unless the
// problem was an unresponsive client.
func (l *listener) sendTree(name upspin.PathName) bool {
const op errors.Op = "dir/inprocess.Watch"
parsed, err := path.Parse(name)
if err != nil {
// Shouldn't happen.
log.Info.Printf("%s: parse error in sendTree for %q: %v", op, name, err)
l.sendEvent(upspin.Event{Error: errors.E(op, err)})
return false
}
entry, err := l.server.lookup(op, parsed, false)
if err != nil {
// Ignore error. The problem (existence, permission) might resolve later.
return true
}
event, ok := l.want(upspin.Event{Entry: entry}, parsed)
if !ok {
return true
}
if !l.sendEvent(event) {
return false
}
if !entry.IsDir() {
return true
}
entries, err := l.server.listDir(entry.Name)
if err != nil {
return true
}
for _, entry := range entries {
if !l.sendTree(entry.Name) {
return false
}
}
return true
}
// sendEvent sends an event on the channel, or returns false if it cannot.
func (l *listener) sendEvent(event upspin.Event) bool {
select {
case l.events <- event:
// Delivered.
// Do not increment sequence: these events are not part of the standard stream.
return true
case <-time.After(watchTimeout):
// Failed to deliver; client is not keeping up.
return false
}
}
// eventManager is the structure that delivers events to all the listeners.
type eventManager struct {
events []upspin.Event
listeners []*listener
// These channels mediate all access to the event manager.
eventsSoFar chan []upspin.Event
newListener chan *listener
newEvent chan upspin.Event
listenerDone chan *listener
}
// newEventManager returns a new event manager. There is one per database.
func newEventManager() *eventManager {
// The numbers here are arbitrary, but for the testing purposes of
// this package should be enough to prevent blocking the client.
// A more serious attempt would scale these dynamically as needed.
em := &eventManager{
newListener: make(chan *listener, 100),
newEvent: make(chan upspin.Event, 100),
eventsSoFar: make(chan []upspin.Event),
}
go em.run()
return em
}
// run is the goroutine managing the single event manager.
// All interaction with the event manager is through the channels handled
// in this goroutine, obviating explicit mutexes.
func (e *eventManager) run() {
// Invariant: Each element of e.listeners has its sequence at the current point.
// When we receive an event, each element of that list is ready for it.
// We only add a new listener to the list once it has caught up to the rest.
for {
select {
case e.eventsSoFar <- e.events:
// Nothing to do.
case listener := <-e.listenerDone:
e.delete(listener)
case listener := <-e.newListener:
// New listener has been created, but it may be behind.
if listener.sequence < int64(len(e.events)) && !listener.sendAll(e.events[listener.sequence:]) {
// It couldn't catch up, so ignore it and don't install it.
close(listener.events)
continue
}
e.listeners = append(e.listeners, listener)
case event := <-e.newEvent:
parsed, err := path.Parse(event.Entry.Name)
if err != nil {
// Shouldn't happen.
log.Info.Printf("dir/inprocess: parse error in event for %q: %v", event.Entry.Name, err)
continue
}
n := len(e.listeners)
for i := 0; i < n; i++ {
l := e.listeners[i]
if cleanEvent, ok := l.want(event, parsed); ok {
if l.sendEvent(cleanEvent) {
// Delivered.
l.sequence++
} else {
// Failed to deliver; client is not keeping up.
e.deleteNth(i)
i-- // Back up the loop counter; ith guy is gone.
n--
}
}
}
e.events = append(e.events, event)
}
}
}
// deleteNth deletes the Nth listener from the eventManager's list.
func (e *eventManager) deleteNth(i int) {
close(e.listeners[i].events)
copy(e.listeners[i:], e.listeners[i+1:])
e.listeners = e.listeners[:len(e.listeners)-1]
}
// delete deletes the listener from the eventManager's list.
func (e *eventManager) delete(which *listener) {
for i, l := range e.listeners {
if l == which {
e.deleteNth(i)
return
}
}
panic("listener not found in event loop")
}
// watch is the implementation of DirServer.Watch after basic checking is done.
func (e *eventManager) watch(server *server, root path.Parsed, sequence int64, done <-chan struct{}) (<-chan upspin.Event, error) {
const op errors.Op = "dir/inprocess.Watch"
events := make(chan upspin.Event, 10)
l := &listener{
eventMgr: e,
root: root,
server: server,
done: done,
events: events,
sequence: 0,
}
go l.doneHandler()
eventsSoFar := <-e.eventsSoFar
// A sequence other than the special cases 0 and -1 must exist.
// The special case of an invalid sequence is returned as an event with an "invalid" error.
if sequence != 0 && sequence != -1 {
if sequence < 0 || int64(len(eventsSoFar)) < sequence {
events <- upspin.Event{Error: errors.E(op, errors.Invalid, "bad sequence")}
close(events)
return events, nil
}
}
// Must do this in the background and return so client can receive initialization events.
go func() {
switch sequence {
case upspin.WatchStart:
// 0 is a special case in the API, but it's not a special case here.
fallthrough
default:
if !l.sendAll(eventsSoFar) {
log.Printf("dir/inprocess.Watch %q could not send all initial events", root)
return
}
case upspin.WatchCurrent:
// Send state of tree under name.
if !l.sendTree(root.Path()) {
log.Printf("dir/inprocess.Watch %q could not send all initial events", root)
return
}
fallthrough
case upspin.WatchNew:
// Start transmitting from where we were before sendTree.
l.sequence = int64(len(eventsSoFar))
}
e.newListener <- l
}()
return events, nil
}