blob: 95e515286bbca157da246d9a971866f57d6fe99e [file] [log] [blame]
// Copyright 2016 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package dirserver provides a wrapper for an upspin.DirServer implementation
// that presents it as an authenticated service.
package dirserver // import "upspin.io/rpc/dirserver"
import (
"fmt"
"net/http"
pb "github.com/golang/protobuf/proto"
"upspin.io/config"
"upspin.io/errors"
"upspin.io/log"
"upspin.io/rpc"
"upspin.io/upspin"
"upspin.io/upspin/proto"
)
type server struct {
config upspin.Config
// What this server reports itself as through its Endpoint method.
endpoint upspin.Endpoint
// The underlying dirserver implementation.
dir upspin.DirServer
}
func New(cfg upspin.Config, dir upspin.DirServer, addr upspin.NetAddr) http.Handler {
s := &server{
config: cfg,
endpoint: upspin.Endpoint{
Transport: upspin.Remote,
NetAddr: addr,
},
dir: dir,
}
return rpc.NewServer(cfg, rpc.Service{
Name: "Dir",
Methods: map[string]rpc.Method{
"Delete": s.Delete,
"Glob": s.Glob,
"Lookup": s.Lookup,
"Put": s.Put,
"WhichAccess": s.WhichAccess,
},
Streams: map[string]rpc.Stream{
"Watch": s.Watch,
},
})
}
func (s *server) serverFor(session rpc.Session, reqBytes []byte, req pb.Message) (upspin.DirServer, error) {
if err := pb.Unmarshal(reqBytes, req); err != nil {
return nil, err
}
e := s.dir.Endpoint()
if ep := session.ProxiedEndpoint(); ep.Transport != upspin.Unassigned {
e = ep
}
svc, err := s.dir.Dial(config.SetUserName(s.config, session.User()), e)
if err != nil {
return nil, err
}
return svc.(upspin.DirServer), nil
}
// Lookup implements proto.DirServer.
func (s *server) Lookup(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.DirLookupRequest
dir, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := logf(session, "Lookup(%q)", req.Name)
return op.entryError(dir.Lookup(upspin.PathName(req.Name)))
}
// Put implements proto.DirServer.
func (s *server) Put(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.DirPutRequest
dir, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
entry, err := proto.UpspinDirEntry(req.Entry)
if err != nil {
return &proto.EntryError{Error: errors.MarshalError(err)}, nil
}
op := logf(session, "Put(%q)", entry.Name)
return op.entryError(dir.Put(entry))
}
// Glob implements proto.DirServer.
func (s *server) Glob(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.DirGlobRequest
dir, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := logf(session, "Glob(%q)", req.Pattern)
entries, globErr := dir.Glob(req.Pattern)
if globErr != nil && globErr != upspin.ErrFollowLink {
op.log(globErr)
return globError(globErr), nil
}
// Fall through OK for ErrFollowLink.
b, err := proto.DirEntryBytes(entries)
if err != nil {
op.log(err)
return globError(err), nil
}
return &proto.EntriesError{
Entries: b,
Error: errors.MarshalError(globErr),
}, nil
}
func globError(err error) *proto.EntriesError {
return &proto.EntriesError{Error: errors.MarshalError(err)}
}
// Watch implements proto.Watch.
func (s *server) Watch(session rpc.Session, reqBytes []byte, done <-chan struct{}) (<-chan pb.Message, error) {
var req proto.DirWatchRequest
dir, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := logf(session, "Watch(%q, %d)", req.Name, req.Sequence)
events, err := dir.Watch(upspin.PathName(req.Name), req.Sequence, done)
if err != nil {
op.log(err)
return nil, err
}
out := make(chan pb.Message)
go func() {
defer close(out)
for e := range events {
ep, err := proto.EventProto(&e)
if err != nil {
op.logf("error converting event to proto: %v", err)
return
}
out <- ep
}
}()
return out, nil
}
// Delete implements proto.DirServer.
func (s *server) Delete(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.DirDeleteRequest
dir, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := logf(session, "Delete(%q)", req.Name)
return op.entryError(dir.Delete(upspin.PathName(req.Name)))
}
// WhichAccess implements proto.DirServer.
func (s *server) WhichAccess(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.DirWhichAccessRequest
dir, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := logf(session, "WhichAccess(%q)", req.Name)
return op.entryError(dir.WhichAccess(upspin.PathName(req.Name)))
}
func logf(sess rpc.Session, format string, args ...interface{}) operation {
op := fmt.Sprintf("rpc/dirserver: %q: dir.", sess.User())
op += fmt.Sprintf(format, args...)
log.Debug.Print(op)
return operation(op)
}
type operation string
func (op operation) log(err error) {
log.Debug.Printf("%s failed: %s", op, err)
}
func (op operation) logf(format string, args ...interface{}) {
log.Debug.Printf("%s: "+format, append([]interface{}{op}, args...)...)
}
// entryError performs the common operation of converting a directory entry
// and error result pair into the corresponding protocol buffer.
func (op operation) entryError(entry *upspin.DirEntry, err error) (*proto.EntryError, error) {
var b []byte
if entry != nil {
var mErr error
b, mErr = entry.Marshal()
if mErr != nil {
return nil, mErr
}
}
return &proto.EntryError{Entry: b, Error: errors.MarshalError(err)}, nil
}