blob: b65b4cf0bb11f5c8d8d2581f29679d77d84582fe [file] [log] [blame]
// Copyright 2016 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tree
// TODOs:
// - The watcher "tails" the log, starting from a given sequence number. It is
// done in a goroutine because the sequence can be very far from the current state
// and we don't want to block the caller until all such state is sent on the
// Event channel. However, once the watcher has caught up with the current
// state of the Tree, there's no longer a need for a goroutine or for reading
// the log directly (and thus spend time in disk I/O, unmarshalling, etc). We
// can simply note that the end of file was reached, quit the goroutine and
// send events as they come in. This requires some extra synchronization code
// and ensuring that sending does not block the Tree (we can keep the
// goroutine if we don't want to impose a short timeout on the channel).
import (
"sync/atomic"
"time"
"upspin.io/dir/server/serverlog"
"upspin.io/errors"
"upspin.io/log"
"upspin.io/path"
"upspin.io/upspin"
)
const (
// watcherTimeout is the timeout to send notifications on a watcher
// channel. It happens in a goroutine, so it's safe to hang for a while.
watcherTimeout = 1 * time.Minute
)
var (
errTimeout = errors.E(errors.IO, "channel operation timed out")
errClosed = errors.E(errors.IO, "channel closed")
)
// watcher holds together the done channel and the event channel for a given
// watch point.
type watcher struct {
// The path name this watcher watches.
path path.Parsed
// events is the Event channel with the client. It is write-only.
events chan *upspin.Event
// done is the client's done channel. When it's closed, this watcher
// dies.
done <-chan struct{}
// hasWork is an internal channel that the Tree uses to tell the watcher
// goroutine to look for work at the end of the log.
hasWork chan bool
// log is a reader instance of the Tree's log that keeps track of this
// watcher's progress.
log *serverlog.Reader
// closed indicates whether the watcher is closed (1) or open (0).
// It must be loaded and stored atomically.
closed int32
// shutdown is closed by Tree.Close to signal that the watcher should
// exit. It is never closed by the watcher itself.
shutdown chan struct{}
// doneFunc must be called by this watcher before it exits its watch
// loop. It decrements the owning tree's watchers wait group.
doneFunc func()
}
// Watch implements upspin.DirServer.Watch.
func (t *Tree) Watch(p path.Parsed, sequence int64, done <-chan struct{}) (<-chan *upspin.Event, error) {
t.mu.Lock()
defer t.mu.Unlock()
// First, ensure the tree is not shutting down.
select {
case <-t.shutdown:
return nil, errors.Str("can't start Watch; tree shutting down")
default:
}
// Watch can watch non-existent files, but not non-existent roots.
// Therefore, we ensure the root exists before we proceed.
err := t.loadRoot()
if err != nil {
return nil, err
}
// Clone the logs so we can keep reading it while the current tree
// continues to be updated (we're about to unlock this tree).
cLog, err := t.user.NewReader()
if err != nil {
return nil, err
}
// Create a watcher, but do not attach it to any node yet.
// TODO: limit number of watchers on any given node/tree?
w := &watcher{
path: p,
events: make(chan *upspin.Event),
done: done,
hasWork: make(chan bool, 1),
log: cLog,
closed: 0,
shutdown: t.shutdown,
}
w.doneFunc = func() {
// Remove this watcher from watchers when done.
t.mu.Lock()
t.removeWatcher(p, w)
t.mu.Unlock()
// Signal to the closing tree that we're done.
t.watcherWG.Done()
}
if sequence == upspin.WatchCurrent {
// Send the current state first. We must flush the tree so we
// know our logs are current (or we need to recover the tree
// from the logs).
err := t.flush()
if err != nil {
return nil, err
}
// Make a copy of the tree so we have an immutable tree in
// memory, at a fixed log position.
offset := t.user.AppendOffset()
clonedUser, err := t.user.ReadOnlyClone()
if err != nil {
return nil, err
}
clone := &Tree{
user: clonedUser,
config: t.config,
packer: t.packer,
shutdown: make(chan struct{}),
// there are no watchers on the clone.
}
// Start sending the current state of the cloned tree and setup
// the watcher for this tree once the current state is sent.
t.watcherWG.Add(1)
go w.sendCurrentAndWatch(clone, t, p, offset)
} else {
var (
offset int64
offsetErr error // Will be delivered on the channel, not here.
)
if sequence == upspin.WatchNew {
// We must flush the tree so we know our logs are current (or we
// need to recover the tree from the logs).
err := t.flush()
if err != nil {
return nil, err
}
offset = t.user.AppendOffset()
} else {
offset = t.user.OffsetOf(sequence)
if offset < 0 {
offsetErr = errors.E(errors.Invalid, p.Path(), errors.Errorf("unknown sequence %d", sequence))
}
}
// Set up the notification hook.
t.addWatcher(p, w)
// Start the watcher.
t.watcherWG.Add(1)
go w.watch(offset, offsetErr)
}
return w.events, nil
}
// addWatcher adds a watcher at the given path.
// t.mu must be held.
func (t *Tree) addWatcher(p path.Parsed, w *watcher) {
name := p.Path()
t.watchers[name] = append(t.watchers[name], w)
}
// removeWatcher removes the given watcher from the given path.
// t.mu must be held.
func (t *Tree) removeWatcher(p path.Parsed, w *watcher) {
name := p.Path()
ws := t.watchers[name]
for i := range ws {
if ws[i] == w {
ws = append(ws[:i], ws[i+1:]...)
break
}
}
if len(ws) == 0 {
delete(t.watchers, name)
} else {
t.watchers[name] = ws
}
}
// notifyWatchers wakes any watchers that are watching the given path (or any
// of its ancestors).
// t.mu must be held.
func (t *Tree) notifyWatchers(name upspin.PathName) {
p, _ := path.Parse(name)
for {
ws := t.watchers[p.Path()]
for _, w := range ws {
select {
case w.hasWork <- true:
default:
}
}
if p.IsRoot() {
break
}
p = p.Drop(1)
}
}
// sendCurrentAndWatch takes an original tree and its clone and sends the state
// of the clone starting from the subtree rooted at p. The offset refers to the
// last log offset saved by the original tree. When sendCurrentAndWatch returns,
// the watcher and the cloned tree are closed.
// It must run in a goroutine. Errors are logged.
func (w *watcher) sendCurrentAndWatch(clone, orig *Tree, p path.Parsed, offset int64) {
defer clone.Close()
n, err := clone.loadPath(p)
if err != nil && !errors.Is(errors.NotExist, err) {
w.sendError(err)
w.close()
return
}
// If p exists, traverse the sub-tree and send its current state on the
// events channel.
if err == nil {
fn := func(n *node, level int) error {
logEntry := &serverlog.Entry{
Op: serverlog.Put,
Entry: n.entry,
}
err := w.sendEvent(logEntry, offset)
if err == errTimeout || err == errClosed {
return nil
}
return err
}
err = clone.traverse(n, 0, fn)
if err != nil {
w.sendError(err)
w.close()
return
}
}
// Set up the notification hook on the original tree. We must lock it.
orig.mu.Lock()
orig.addWatcher(p, w)
orig.mu.Unlock()
// Start the watcher (in this goroutine -- don't start a new one here).
w.watch(offset, nil)
}
// sendEvent sends a single logEntry read from the log at offset position
// to the event channel. If the channel blocks for longer than watcherTimeout,
// the operation fails and the watcher is invalidated (marked for deletion).
func (w *watcher) sendEvent(logEntry *serverlog.Entry, offset int64) error {
var event *upspin.Event
// Strip block information for directories. We avoid an extra copy
// if it's not a directory.
if logEntry.Entry.IsDir() {
entry := logEntry.Entry
entry.MarkIncomplete()
event = &upspin.Event{
Entry: &entry, // already a copy.
Delete: logEntry.Op == serverlog.Delete,
}
} else {
event = &upspin.Event{
Entry: &logEntry.Entry, // already a copy.
Delete: logEntry.Op == serverlog.Delete,
}
}
timer := time.NewTimer(watcherTimeout)
defer timer.Stop()
select {
case <-w.shutdown:
return errClosed
case <-w.done:
// Client is done receiving events.
return errClosed
case w.events <- event:
// Event was sent.
return nil
case <-timer.C:
// Oops. Client didn't read fast enough.
return errTimeout
}
}
func (w *watcher) sendError(err error) {
e := &upspin.Event{
Error: err,
}
select {
case w.events <- e:
// Error event was sent.
case <-time.After(3 * watcherTimeout):
// Can't send another error since we timed out again. Log an
// error and close the watcher.
log.Error.Printf("dir/server/tree.sendError: %s", errTimeout)
}
}
// sendEventFromLog sends notifications to the given watcher for all
// descendant entries of a target path, reading from the given log starting at a
// given offset until it reaches the end of the log. It returns the next offset
// to read.
func (w *watcher) sendEventFromLog(offset int64) (int64, error) {
curr := offset
for {
// Is the receiver still interested in reading events and the
// tree still open for business?
select {
case <-w.done:
return 0, errClosed
case <-w.shutdown:
return 0, errClosed
default:
}
logEntry, next, err := w.log.ReadAt(curr)
if err != nil {
return next, errors.E(errors.Invalid, errors.Errorf("cannot read log at offset %d: %v", curr, err))
}
if next == curr {
return curr, nil
}
curr = next
path := logEntry.Entry.Name
if !isPrefixPath(path, w.path) {
// Not a log of interest.
continue
}
err = w.sendEvent(&logEntry, curr)
if err != nil {
return 0, err
}
}
}
// watch, which runs in a goroutine, reads from the log starting at a given
// offset and sends notifications on the event channel until the end of the log
// is reached. It waits to be notified of more work or until the client's
// done channel is closed, in which case it terminates.
// The API for the DirServer.Watch requires that an invalid sequence
// is returned on the channel, not in the call. The initialErr argument here
// is present for that case: If non-nil, we deliver the error and stop.
// Otherwise if offset is negative, the first event will be an Invalid error.
func (w *watcher) watch(offset int64, initialErr error) {
defer w.close()
if initialErr != nil {
w.sendError(initialErr)
return
}
for {
var err error
offset, err = w.sendEventFromLog(offset)
if err != nil {
if err != errTimeout && err != errClosed {
log.Debug.Printf("watch: sending error to client: %s", err)
w.sendError(err)
}
return
}
select {
case <-w.done:
// Done channel was closed. Close watcher and quit this
// goroutine.
return
case <-w.shutdown:
// Tree has closed, nothing else to do.
return
case <-w.hasWork:
// Wake up and work from where we left off.
}
}
}
// isClosed reports whether this watcher has been closed.
func (w *watcher) isClosed() bool {
return atomic.LoadInt32(&w.closed) == 1
}
// close closes the watcher. Must only be called internally by the watcher's
// goroutine.
func (w *watcher) close() {
atomic.StoreInt32(&w.closed, 1)
close(w.events)
w.doneFunc()
}
// isPrefixPath reports whether the path has a pathwise prefix.
func isPrefixPath(name upspin.PathName, prefix path.Parsed) bool {
parsed, err := path.Parse(name)
if err != nil {
log.Debug.Print("dir/server/tree.isPrefixPath: error parsing path", name)
return false
}
return parsed.HasPrefix(prefix)
}