blob: f50afd19e0f13cc13b44b990a6d8ab1e38dbcca0 [file] [log] [blame]
// Copyright 2016 The Upspin Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package server implements upspin.StoreServer using storage.Storage as its
// storage back end.
package server // import "upspin.io/store/server"
import (
"fmt"
"strings"
"sync"
"upspin.io/cloud/storage"
"upspin.io/errors"
"upspin.io/key/sha256key"
"upspin.io/log"
"upspin.io/metric"
"upspin.io/upspin"
)
// server implements upspin.StoreServer.
type server struct {
storage storage.Storage
mu sync.RWMutex // Protects fields below.
refCount uint64 // How many clones of us exist.
linkBase []byte
}
var _ upspin.StoreServer = (*server)(nil)
// New returns a StoreServer that serves the given endpoint with the provided options.
func New(options ...string) (upspin.StoreServer, error) {
const op errors.Op = "store/server.New"
var backend string
var dialOpts []storage.DialOpts
for _, option := range options {
const prefix = "backend="
if strings.HasPrefix(option, prefix) {
backend = option[len(prefix):]
continue
}
// Pass other options to the storage backend.
dialOpts = append(dialOpts, storage.WithOptions(option))
}
if backend == "" {
return nil, errors.E(op, errors.Invalid, `storage "backend" option is missing`)
}
s, err := storage.Dial(backend, dialOpts...)
if err != nil {
return nil, errors.E(op, err)
}
return &server{
storage: s,
}, nil
}
// Put implements upspin.StoreServer.
func (s *server) Put(data []byte) (*upspin.Refdata, error) {
const op errors.Op = "store/server.Put"
m, sp := metric.NewSpan(op)
sp.SetAnnotation(fmt.Sprintf("size=%d", len(data)))
defer m.Done()
defer sp.End()
ref := sha256key.Of(data).String()
if err := s.storage.Put(ref, data); err != nil {
return nil, errors.E(op, err)
}
refdata := &upspin.Refdata{
Reference: upspin.Reference(ref),
Volatile: false,
Duration: 0,
}
return refdata, nil
}
// Get implements upspin.StoreServer.
func (s *server) Get(ref upspin.Reference) ([]byte, *upspin.Refdata, []upspin.Location, error) {
const op errors.Op = "store/server.Get"
m, sp := metric.NewSpan(op)
defer m.Done()
defer sp.End()
if ref == upspin.HTTPBaseMetadata {
refData := &upspin.Refdata{Reference: ref}
s.mu.Lock()
base := s.linkBase
s.mu.Unlock()
if base != nil {
return base, refData, nil, nil
}
baseStr, err := s.storage.LinkBase()
if err == upspin.ErrNotSupported {
return nil, nil, nil, errors.E(op, errors.NotExist)
} else if err != nil {
return nil, nil, nil, errors.E(op, err)
}
base = []byte(baseStr)
s.mu.Lock()
s.linkBase = base
s.mu.Unlock()
return base, refData, nil, nil
}
data, err := s.storage.Download(string(ref))
if err != nil {
return nil, nil, nil, errors.E(op, err)
}
refdata := &upspin.Refdata{
Reference: ref,
Volatile: false,
Duration: 0,
}
sp.SetAnnotation(fmt.Sprintf("refsize=%d", len(ref)))
return data, refdata, nil, nil
}
// Delete implements upspin.StoreServer.
func (s *server) Delete(ref upspin.Reference) error {
const op errors.Op = "store/server.Delete"
m, _ := metric.NewSpan(op)
defer m.Done()
err := s.storage.Delete(string(ref))
if err != nil {
return errors.E(op, errors.Errorf("%s: %s", ref, err))
}
return nil
}
// Dial implements upspin.Service.
func (s *server) Dial(config upspin.Config, e upspin.Endpoint) (upspin.Service, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.refCount++
return s, nil
}
// Close implements upspin.Service.
func (s *server) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.refCount == 0 {
log.Error.Printf("store/server: closing store that was not dialed")
return
}
s.refCount--
if s.refCount == 0 {
s.storage = nil
}
}
// Endpoint implements upspin.Service.
func (s *server) Endpoint() upspin.Endpoint {
return upspin.Endpoint{} // No endpoint.
}