blob: b930bcd816fccd8e35be151095c00effcb79115b [file] [log] [blame]
// Copyright 2016 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package storeserver is a wrapper for an upspin.StoreServer implementation
// that presents it as an authenticated service.
package storeserver // import "upspin.io/rpc/storeserver"
import (
"fmt"
"net/http"
pb "github.com/golang/protobuf/proto"
"upspin.io/config"
"upspin.io/errors"
"upspin.io/log"
"upspin.io/rpc"
"upspin.io/upspin"
"upspin.io/upspin/proto"
)
type server struct {
config upspin.Config
// The underlying storage implementation.
store upspin.StoreServer
}
func New(cfg upspin.Config, store upspin.StoreServer, _ upspin.NetAddr) http.Handler {
// TODO(adg): remove addr argument
s := &server{
config: cfg,
store: store,
}
return rpc.NewServer(cfg, rpc.Service{
Name: "Store",
Methods: map[string]rpc.Method{
"Get": s.Get,
"Put": s.Put,
"Delete": s.Delete,
},
})
}
func (s *server) serverFor(session rpc.Session, reqBytes []byte, req pb.Message) (upspin.StoreServer, error) {
if err := pb.Unmarshal(reqBytes, req); err != nil {
return nil, err
}
e := s.store.Endpoint()
if ep := session.ProxiedEndpoint(); ep.Transport != upspin.Unassigned {
e = ep
}
svc, err := s.store.Dial(config.SetUserName(s.config, session.User()), e)
if err != nil {
return nil, err
}
return svc.(upspin.StoreServer), nil
}
// Get implements proto.StoreServer.
func (s *server) Get(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.StoreGetRequest
store, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := s.logf(session, "Get(%q)", req.Reference)
data, refdata, locs, err := store.Get(upspin.Reference(req.Reference))
if err != nil {
op.log(err)
return &proto.StoreGetResponse{Error: errors.MarshalError(err)}, nil
}
resp := &proto.StoreGetResponse{
Data: data,
Refdata: proto.RefdataProto(refdata),
Locations: proto.Locations(locs),
}
return resp, nil
}
// Put implements proto.StoreServer.
func (s *server) Put(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.StorePutRequest
store, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := s.logf(session, "Put(%.16x...) (%d bytes)", req.Data, len(req.Data))
refdata, err := store.Put(req.Data)
if err != nil {
op.log(err)
return &proto.StorePutResponse{Error: errors.MarshalError(err)}, nil
}
resp := &proto.StorePutResponse{
Refdata: proto.RefdataProto(refdata),
}
return resp, nil
}
// Empty struct we can allocate just once.
var deleteResponse proto.StoreDeleteResponse
// Delete implements proto.StoreServer.
func (s *server) Delete(session rpc.Session, reqBytes []byte) (pb.Message, error) {
var req proto.StoreGetRequest
store, err := s.serverFor(session, reqBytes, &req)
if err != nil {
return nil, err
}
op := s.logf(session, "Delete(%q)", req.Reference)
err = store.Delete(upspin.Reference(req.Reference))
if err != nil {
op.log(err)
return &proto.StoreDeleteResponse{Error: errors.MarshalError(err)}, nil
}
return &deleteResponse, nil
}
func (s *server) logf(sess rpc.Session, format string, args ...interface{}) operation {
op := fmt.Sprintf("rpc/storeserver: %q: store.", sess.User())
op += fmt.Sprintf(format, args...)
log.Debug.Print(op)
return operation(op)
}
type operation string
func (op operation) log(err error) {
log.Debug.Print(op)
}