blob: 6aefd12dea0ceb15d6bbbab65fe317cbd951fee4 [file] [log] [blame]
// Copyright 2016 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package remote implements an inprocess directory server that uses RPC to
// connect to a remote directory server.
package remote // import "upspin.io/dir/remote"
import (
"fmt"
pb "github.com/golang/protobuf/proto"
"upspin.io/bind"
"upspin.io/errors"
"upspin.io/log"
"upspin.io/rpc"
"upspin.io/upspin"
"upspin.io/upspin/proto"
)
// dialConfig contains the destination and authenticated user of the dial.
type dialConfig struct {
endpoint upspin.Endpoint
userName upspin.UserName
}
// remote implements upspin.DirServer.
type remote struct {
rpc.Client // For sessions and Close.
cfg dialConfig
}
var _ upspin.DirServer = (*remote)(nil)
// Glob implements upspin.DirServer.Glob.
func (r *remote) Glob(pattern string) ([]*upspin.DirEntry, error) {
op := r.opf("Glob", "%q", pattern)
req := &proto.DirGlobRequest{
Pattern: pattern,
}
resp := new(proto.EntriesError)
if err := r.Invoke("Dir/Glob", req, resp, nil, nil); err != nil {
return nil, op.error(errors.IO, err)
}
err := unmarshalError(resp.Error)
if err != nil && err != upspin.ErrFollowLink {
return nil, op.error(err)
}
entries, pErr := proto.UpspinDirEntries(resp.Entries)
if pErr != nil {
return nil, op.error(errors.IO, pErr)
}
return entries, op.error(err)
}
func entryName(entry *upspin.DirEntry) string {
if entry == nil {
return "<nil>"
}
return fmt.Sprintf("%q", entry.Name)
}
// Put implements upspin.DirServer.Put.
func (r *remote) Put(entry *upspin.DirEntry) (*upspin.DirEntry, error) {
op := r.opf("Put", "%s", entryName(entry))
b, err := entry.Marshal()
if err != nil {
return nil, op.error(err)
}
return r.invoke(op, "Dir/Put", &proto.DirPutRequest{
Entry: b,
})
}
// WhichAccess implements upspin.DirServer.WhichAccess.
func (r *remote) WhichAccess(pathName upspin.PathName) (*upspin.DirEntry, error) {
op := r.opf("WhichAccess", "%q", pathName)
return r.invoke(op, "Dir/WhichAccess", &proto.DirWhichAccessRequest{
Name: string(pathName),
})
}
// Delete implements upspin.DirServer.Delete.
func (r *remote) Delete(pathName upspin.PathName) (*upspin.DirEntry, error) {
op := r.opf("Delete", "%q", pathName)
return r.invoke(op, "Dir/Delete", &proto.DirDeleteRequest{
Name: string(pathName),
})
}
// Lookup implements upspin.DirServer.Lookup.
func (r *remote) Lookup(pathName upspin.PathName) (*upspin.DirEntry, error) {
op := r.opf("Lookup", "%q", pathName)
return r.invoke(op, "Dir/Lookup", &proto.DirLookupRequest{
Name: string(pathName),
})
}
func (r *remote) invoke(op *operation, method string, req pb.Message) (*upspin.DirEntry, error) {
resp := new(proto.EntryError)
err := r.Invoke(method, req, resp, nil, nil)
return op.entryError(resp, err)
}
// Watch implements upspin.DirServer.
func (r *remote) Watch(name upspin.PathName, sequence int64, done <-chan struct{}) (<-chan upspin.Event, error) {
op := r.opf("Watch", "%q sequence %d", name, sequence)
req := &proto.DirWatchRequest{
Name: string(name),
Sequence: sequence,
}
stream := make(eventStream)
events := make(chan upspin.Event)
go func() {
defer close(events)
for {
select {
case ep, ok := <-stream:
if !ok {
return
}
e, err := proto.UpspinEvent(&ep)
if err != nil {
op.logErr(err)
return
}
select {
case events <- *e:
case <-done:
return
}
case <-done:
return
}
}
}()
if err := r.Invoke("Dir/Watch", req, nil, stream, done); err != nil {
close(stream)
if err == upspin.ErrNotSupported {
return nil, err
}
return nil, op.error(err)
}
return events, nil
}
type eventStream chan proto.Event
func (s eventStream) Send(b []byte, done <-chan struct{}) error {
var e proto.Event
if err := pb.Unmarshal(b, &e); err != nil {
return err
}
select {
case s <- e:
case <-done:
}
return nil
}
func (s eventStream) Close() { close(s) }
func (s eventStream) Error(err error) {
s <- proto.Event{Error: errors.MarshalError(err)}
}
// Endpoint implements upspin.StoreServer.Endpoint.
func (r *remote) Endpoint() upspin.Endpoint {
return r.cfg.endpoint
}
func dialCache(config upspin.Config, proxyFor upspin.Endpoint) (upspin.Service, error) {
// Are we using a cache?
ce := config.CacheEndpoint()
if ce.Unassigned() {
return nil, nil
}
// Call the cache. The cache is local so don't bother with TLS.
authClient, err := rpc.NewClient(config, ce.NetAddr, rpc.NoSecurity, proxyFor)
if err != nil {
return nil, err
}
return &remote{
Client: authClient,
cfg: dialConfig{
endpoint: proxyFor,
userName: config.UserName(),
},
}, nil
}
// Dial implements upspin.Service.
func (r *remote) Dial(config upspin.Config, e upspin.Endpoint) (upspin.Service, error) {
op := r.opf("Dial", "%q, %q", config.UserName(), e)
if e.Transport != upspin.Remote {
return nil, op.error(errors.Invalid, "unrecognized transport")
}
// First try a cache
if svc, err := dialCache(config, e); err != nil {
return nil, op.error(err)
} else if svc != nil {
return svc, nil
}
authClient, err := rpc.NewClient(config, e.NetAddr, rpc.Secure, upspin.Endpoint{})
if err != nil {
return nil, op.error(errors.IO, err)
}
// The connection is closed when this service is released (see Bind.Release)
r = &remote{
Client: authClient,
cfg: dialConfig{
endpoint: e,
userName: config.UserName(),
},
}
return r, nil
}
const transport = upspin.Remote
func init() {
r := &remote{} // uninitialized until Dial time.
bind.RegisterDirServer(transport, r)
}
// unmarshalError calls proto.UnmarshalError, but if the error
// matches upspin.ErrFollowLink, it returns that exact value so
// the caller can do == to test for links.
func unmarshalError(b []byte) error {
err := errors.UnmarshalError(b)
if err != nil && err.Error() == upspin.ErrFollowLink.Error() {
return upspin.ErrFollowLink
}
return err
}
func (r *remote) opf(method string, format string, args ...interface{}) *operation {
addr := r.cfg.endpoint.NetAddr
s := fmt.Sprintf("dir/remote(%q).%s", addr, method)
op := &operation{errors.Op(s), fmt.Sprintf(format, args...)}
log.Debug.Print(op)
return op
}
type operation struct {
op errors.Op
args string
}
func (op *operation) String() string {
return fmt.Sprintf("%s(%s)", op.op, op.args)
}
func (op *operation) logErr(err error) {
log.Error.Printf("%s: %s", op, err)
}
func (op *operation) error(args ...interface{}) error {
if len(args) == 0 {
panic("error called with zero args")
}
if len(args) == 1 {
if e, ok := args[0].(error); ok && e == upspin.ErrFollowLink {
return e
}
if args[0] == nil {
return nil
}
}
log.Debug.Printf("%v error: %v", op, errors.E(args...))
return errors.E(append([]interface{}{op.op}, args...)...)
}
// entryError performs the common operation of converting an EntryError
// protocol buffer into a directory entry and error pair.
func (op *operation) entryError(p *proto.EntryError, err error) (*upspin.DirEntry, error) {
if err != nil {
return nil, op.error(errors.IO, err)
}
err = unmarshalError(p.Error)
if err != nil && err != upspin.ErrFollowLink {
return nil, op.error(err)
}
entry, unmarshalErr := proto.UpspinDirEntry(p.Entry)
if unmarshalErr != nil {
return nil, op.error(unmarshalErr)
}
return entry, op.error(err)
}