blob: cf6f010de14682439508860cbd75bd20c721986d [file] [log] [blame]
// Copyright 2017 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Command upspin-sharebot watches the root for the user in the provided config,
// detecting Access changes and re-wrapping any files whose reader set changed.
package main
import (
"bytes"
"crypto/sha256"
"fmt"
"sort"
"strings"
"sync"
"time"
"upspin.io/access"
"upspin.io/bind"
"upspin.io/client"
"upspin.io/config"
"upspin.io/errors"
"upspin.io/factotum"
"upspin.io/flags"
"upspin.io/log"
"upspin.io/pack"
"upspin.io/path"
"upspin.io/shutdown"
"upspin.io/upspin"
_ "upspin.io/transports"
)
func main() {
flags.Parse(flags.Client)
cfg, err := config.FromFile(flags.Config)
if err != nil {
log.Fatal(err)
}
w, err := NewWatcher(cfg)
if err != nil {
log.Fatal(err)
}
shutdown.Handle(w.Shutdown)
select {}
}
// Watcher monitors a user root for Access file changes and re-wraps the keys
// for each file whose set of readers is affected by the change.
type Watcher struct {
cfg upspin.Config
dir upspin.DirServer
key upspin.KeyServer
seq int64 // owned by watch
buffer chan upspin.PathName
check chan upspin.PathName
shutdown chan struct{} // closed to signal shutdown
done chan struct{} // closed when checkLoop exits
mu sync.Mutex
s *Sharer
}
// NewWatcher initializes, starts, and returns a new Watcher for the user in
// the provided config.
func NewWatcher(cfg upspin.Config) (*Watcher, error) {
if cfg.Factotum() == nil {
return nil, errors.Str("no factotum in config")
}
dir, err := bind.DirServer(cfg, cfg.DirEndpoint())
if err != nil {
return nil, err
}
key, err := bind.KeyServer(cfg, cfg.KeyEndpoint())
if err != nil {
return nil, err
}
w := &Watcher{
cfg: cfg,
dir: dir,
key: key,
seq: upspin.WatchCurrent,
buffer: make(chan upspin.PathName),
check: make(chan upspin.PathName),
shutdown: make(chan struct{}),
done: make(chan struct{}),
s: newSharer(cfg, dir, key),
}
go w.bufferLoop()
go w.checkLoop()
go w.watchLoop()
return w, nil
}
// bufferLoop receives path names from buffer and sends them to check,
// buffering and de-duplicating them in between.
func (w *Watcher) bufferLoop() {
defer close(w.check)
files := make(map[upspin.PathName]bool)
for {
var name upspin.PathName
var check chan upspin.PathName
if len(files) > 0 {
// Pick one entry at random from the files map.
for name = range files {
break
}
check = w.check
}
select {
case check <- name:
delete(files, name)
case newName, active := <-w.buffer:
if !active {
return
}
files[newName] = true
case <-w.shutdown:
return
}
}
}
// checkLoop receives path names from check, inspects each for inconsistencies
// between readers and wrapped keys, and fixes them if found.
func (w *Watcher) checkLoop() {
defer close(w.done)
for name := range w.check {
e, err := w.dir.Lookup(name)
if errors.Is(errors.NotExist, err) {
log.Debug.Printf("watcher: %v: no longer exists; skipping", name)
continue
}
if err != nil {
log.Error.Print(err)
continue
}
if e.Packing != upspin.EEPack {
log.Debug.Printf("watcher: %v: unknown packing %v", e.Name, e.Packing)
continue
}
w.mu.Lock()
readers, keyUsers, self, err := w.s.readers(e)
w.mu.Unlock()
if err != nil {
log.Error.Print("watcher: ", err)
continue
}
msg := fmt.Sprintf("%v self=%v\n\treaders: %v\n\tkeys: %v", e.Name, self, readers, keyUsers)
if !self && readers.String() == keyUsers.String() {
log.Debug.Print("watcher: ", msg)
continue
}
log.Info.Printf("watcher: fixing inconsistency: %v", msg)
w.mu.Lock()
if err := w.s.fixShare(e, readers); err != nil {
log.Error.Print("watcher: ", err)
}
w.mu.Unlock()
}
}
// watchLoop watches the user root, retrying if a watch fails.
func (w *Watcher) watchLoop() {
for {
dialed := time.Now()
if err := w.watch(); err != nil {
log.Error.Printf("watcher: %v", err)
}
select {
case <-w.shutdown:
return
default:
}
// Wait a minute between watches.
const wait = 1 + time.Minute
if elapsed := time.Since(dialed); elapsed < wait {
time.Sleep(wait - elapsed)
}
}
}
// watch watches the user root for new files.
// When it sees an Access file it passes it to addAccess.
// Otherwise it sends the file's name to buffer.
func (w *Watcher) watch() error {
var (
name = upspin.PathName(w.cfg.UserName() + "/")
done = make(chan struct{})
)
events, err := w.dir.Watch(name, w.seq, done)
if err != nil {
return err
}
for {
log.Debug.Print("watcher: waiting for event")
var e upspin.Event
var ok bool
select {
case e, ok = <-events:
case <-w.shutdown:
return nil
}
if !ok {
return nil
}
if e.Error != nil {
return err
}
log.Debug.Printf("watcher: received event: %v delete=%t seq=%d", e.Entry.Name, e.Delete, e.Entry.Sequence)
w.seq = e.Entry.Sequence
if e.Entry.IsDir() {
continue
}
if access.IsAccessFile(e.Entry.Name) {
w.mu.Lock()
if e.Delete {
log.Debug.Printf("watcher: removeAccess: %v", e.Entry.Name)
w.s.removeAccess(e.Entry.Name)
} else {
log.Debug.Printf("watcher: addAccess: %v", e.Entry.Name)
if err := w.s.addAccess(e.Entry.Name); err != nil {
log.Error.Print("watcher: ", err)
}
}
w.mu.Unlock()
p, err := path.Parse(e.Entry.Name)
if err != nil {
log.Error.Print("watcher: ", err)
continue
}
go w.checkDir(p.Drop(1).Path())
continue
}
if e.Delete {
continue
}
select {
case <-w.shutdown:
return nil
case w.buffer <- e.Entry.Name:
}
}
}
// checkDir recursively walks the given directory and sends each file to
// buffer. It will not descend into a directory that contains an Access file.
func (w *Watcher) checkDir(dir upspin.PathName) {
des, err := w.dir.Glob(upspin.AllFilesGlob(dir))
if err != nil {
log.Print("watcher: ", err)
return
}
for _, e := range des {
if access.IsAccessFile(e.Name) {
continue
}
if e.IsDir() {
// If there's no Access file in the
// directory then descend into it.
accessFile := path.Join(e.Name, "Access")
_, err := w.dir.Lookup(accessFile)
if errors.Is(errors.NotExist, err) {
w.checkDir(e.Name)
}
continue
}
select {
case w.buffer <- e.Name:
case <-w.shutdown:
return
}
}
}
func (w *Watcher) Shutdown() {
log.Debug.Print("watcher: shutting down")
close(w.shutdown)
<-w.done
log.Debug.Print("watcher: shutdown complete")
}
// Sharer holds the state for the share calculation. It holds some caches to
// avoid calling on the server too much.
// TODO(adg): clean this up further; this is a bunch of hacked up code from cmd/upspin.
type Sharer struct {
cfg upspin.Config
cli upspin.Client
dir upspin.DirServer
key upspin.KeyServer
// accessFiles contains the parsed Access files, keyed by directory to which it applies.
accessFiles map[upspin.PathName]*access.Access
// users caches per-directory user lists computed from Access files.
users map[upspin.PathName]userList
// userKeys holds the keys we've looked up for each user.
userKeys map[upspin.UserName]upspin.PublicKey
// userByHash maps the SHA-256 hashes of each user's key to the user name.
userByHash map[[sha256.Size]byte]upspin.UserName
}
func newSharer(cfg upspin.Config, dir upspin.DirServer, key upspin.KeyServer) *Sharer {
return &Sharer{
cfg: cfg,
cli: client.New(cfg),
dir: dir,
key: key,
accessFiles: make(map[upspin.PathName]*access.Access),
users: make(map[upspin.PathName]userList),
userKeys: make(map[upspin.UserName]upspin.PublicKey),
userByHash: make(map[[sha256.Size]byte]upspin.UserName),
}
}
// readers returns two lists, the list of users with access according to the
// access file, and the the pretty-printed string of user names recovered from
// looking at the list of hashed keys in the packdata.
// It also returns a boolean reporting whether key rewrapping is needed for self.
func (s *Sharer) readers(entry *upspin.DirEntry) (users, keyUsers userList, self bool, err error) {
if entry.IsDir() {
// Directories don't have readers.
return nil, nil, self, nil
}
p, _ := path.Parse(entry.Name)
for {
p = p.Drop(1)
var ok bool
users, ok = s.users[p.Path()]
if ok {
break
}
if p.IsRoot() {
users = userList{p.User()}
break
}
}
for _, user := range users {
if _, err := s.lookupKey(user); err != nil {
log.Error.Printf("watcher: %v: %v", entry.Name, err)
}
}
packer := s.lookupPacker(entry)
if packer == nil {
return users, nil, self, errors.Errorf("no packer registered for packer %s", entry.Packing)
}
hashes, err := packer.ReaderHashes(entry.Packdata)
if err != nil {
return nil, nil, self, err
}
for _, hash := range hashes {
var thisUser upspin.UserName
switch packer.Packing() {
case upspin.EEPack:
if len(hash) != sha256.Size {
log.Error.Printf("watcher: %v: hash size is %d; expected %d", entry.Name, len(hash), sha256.Size)
continue
}
var h [sha256.Size]byte
copy(h[:], hash)
var ok bool
thisUser, ok = s.userByHash[h]
if !ok {
// Check old keys in Factotum.
f := s.cfg.Factotum()
if _, err := f.PublicKeyFromHash(hash); err == nil {
thisUser = s.cfg.UserName()
ok = true
self = true
}
}
if !ok && bytes.Equal(factotum.AllUsersKeyHash, hash) {
ok = true
thisUser = access.AllUsers
}
if !ok {
thisUser = "unknown"
}
default:
log.Error.Printf("watcher: %v: unrecognized packing %s", entry.Name, packer)
continue
}
keyUsers = append(keyUsers, thisUser)
}
return users, keyUsers, self, nil
}
// lookupPacker returns the Packer implementation for the entry, or
// nil if none is available.
func (s *Sharer) lookupPacker(entry *upspin.DirEntry) upspin.Packer {
if entry.IsDir() {
// Directories are not packed.
return nil
}
packer := pack.Lookup(entry.Packing)
if packer == nil {
log.Error.Printf("watcher: %v: no registered packer for %d; ignoring\n", entry.Name, entry.Packing)
}
return packer
}
// addAccess reads the given Access file, adds it to the accessFiles map, and
// adds the readers defined by the Access file to the users map.
func (s *Sharer) addAccess(name upspin.PathName) error {
b, err := s.cli.Get(name)
if err != nil {
return err
}
a, err := access.Parse(name, b)
if err != nil {
return err
}
readers, err := a.Users(access.Read, s.cli.Get)
if err != nil {
return errors.E(name, err)
}
dir := path.DropPath(name, 1)
s.accessFiles[dir] = a
s.users[dir] = userList(readers)
return nil
}
// removeAccess removes the given Access file and its readers set from the
// accessFiles and users maps.
func (s *Sharer) removeAccess(name upspin.PathName) {
dir := path.DropPath(name, 1)
delete(s.accessFiles, dir)
delete(s.users, dir)
}
// fixShare updates the packdata of the named file to contain wrapped keys for all the users.
func (s *Sharer) fixShare(entry *upspin.DirEntry, users userList) error {
if entry.IsDir() {
return errors.E(entry.Name, errors.IsDir, "cannot fix directory")
}
packer := s.lookupPacker(entry) // Won't be nil.
if packer.Packing() != upspin.EEPack {
return errors.E(entry.Name, errors.Invalid, errors.Errorf("unexpected packing %v", packer))
}
// If it's an Access or Group file, share with all users.
all := access.IsAccessControlFile(entry.Name)
keys := make([]upspin.PublicKey, 0, len(users))
for _, user := range users {
if user == access.AllUsers {
all = true
continue
}
// Erroneous or wildcard users will have empty keys here, and be ignored.
k, err := s.lookupKey(user)
if err != nil {
return errors.E(entry.Name, user, err)
}
if len(k) > 0 {
keys = append(keys, k)
}
}
// Add the AllUsersKey if we're sharing with all users.
if all {
keys = append(keys, upspin.AllUsersKey)
}
packer.Share(s.cfg, keys, []*[]byte{&entry.Packdata})
if entry.Packdata == nil {
return errors.E(entry.Name, "packing skipped")
}
_, err := s.dir.Put(entry)
return err
}
// lookupKey returns the public key for the user.
// If the user does not exist, is the "all" user, or is a wildcard
// (*@example.com), it returns the empty string.
func (s *Sharer) lookupKey(user upspin.UserName) (upspin.PublicKey, error) {
if user == access.AllUsers {
return upspin.AllUsersKey, nil
}
key, ok := s.userKeys[user] // Use an empty (zero-valued) key to cache failed lookups.
if ok {
return key, nil
}
if user == access.AllUsers {
s.userKeys[user] = "<all>"
return "", nil
}
if isWildcardUser(user) {
s.userKeys[user] = ""
return "", nil
}
u, err := s.key.Lookup(user)
if err != nil {
s.userKeys[user] = ""
return "", err
}
// Remember the lookup, failed or otherwise.
key = u.PublicKey
if len(key) == 0 {
s.userKeys[user] = ""
return "", errors.E(user, "empty public key")
}
s.userKeys[user] = key
s.userByHash[sha256.Sum256([]byte(key))] = user
return key, nil
}
func isWildcardUser(user upspin.UserName) bool {
return strings.HasPrefix(string(user), "*@")
}
// userList stores a list of users, and its string representation
// presents them in sorted order for easy comparison.
type userList []upspin.UserName
func (u userList) Len() int { return len(u) }
func (u userList) Less(i, j int) bool { return u[i] < u[j] }
func (u userList) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
// String returns a canonically formatted, sorted list of the users.
func (u userList) String() string {
if u == nil {
return "<nil>"
}
sort.Sort(u)
userString := fmt.Sprint([]upspin.UserName(u))
return userString[1 : len(userString)-1]
}