| // Copyright 2016 The Upspin Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package gcs implements a storage backend that saves data to Google Cloud Storage. |
| package gcs // import "gcp.upspin.io/cloud/storage/gcs" |
| |
| import ( |
| "bytes" |
| "encoding/base64" |
| "io/ioutil" |
| "net/http" |
| "strings" |
| "time" |
| |
| gContext "golang.org/x/net/context" |
| "golang.org/x/oauth2" |
| "golang.org/x/oauth2/google" |
| "google.golang.org/api/googleapi" |
| gcsBE "google.golang.org/api/storage/v1" |
| |
| "upspin.io/cloud/storage" |
| "upspin.io/errors" |
| "upspin.io/log" |
| ) |
| |
| const ( |
| scope = gcsBE.DevstorageFullControlScope |
| ) |
| |
| // These constants define ACLs for writing data to Google Cloud Store. |
| // Definitions according to https://github.com/google/google-api-go-client/blob/master/storage/v1/storage-gen.go: |
| // "publicReadWrite" - Project team owners get OWNER access, and |
| // allUsers get WRITER access. |
| const ( |
| // PublicRead means project team owners get owner access and all users get reader access. |
| PublicRead = "publicRead" |
| // Private means project team owners get owner access. |
| Private = "private" |
| // ProjectPrivate means project team members get access according to their roles. |
| ProjectPrivate = "projectPrivate" |
| // BucketOwnerFullCtrl means the object owner gets owner access and project team owners get owner access. |
| BucketOwnerFullCtrl = "bucketOwnerFullControl" |
| ) |
| |
| // Keys used for storing dial options. |
| const ( |
| bucketName = "gcpBucketName" |
| defaultACL = "defaultACL" |
| privateKeyData = "privateKeyData" |
| ) |
| |
| // gcsImpl is an implementation of Storage that connects to a Google Cloud Storage (GCS) backend. |
| type gcsImpl struct { |
| client *http.Client |
| service *gcsBE.Service |
| bucketName string |
| defaultWriteACL string |
| } |
| |
| // New initializes a Storage implementation that stores data to Google Cloud Storage. |
| func New(opts *storage.Opts) (storage.Storage, error) { |
| const op = "cloud/storage/gcs.New" |
| |
| bucket, ok := opts.Opts[bucketName] |
| if !ok { |
| return nil, errors.E(op, errors.Invalid, errors.Errorf("%q option is required", bucketName)) |
| } |
| acl, ok := opts.Opts[defaultACL] |
| if !ok { |
| return nil, errors.E(op, errors.Invalid, errors.Errorf("%q option is required", defaultACL)) |
| } |
| |
| var client *http.Client |
| if keyData, ok := opts.Opts[privateKeyData]; !ok { |
| // Authentication is provided by the associated service account |
| // when running on Compute Engine. |
| // TODO(adg): remove this once we have deprecated passing |
| // seviceaccount.json around. We should return an error here. |
| var err error |
| client, err = google.DefaultClient(gContext.Background(), scope) |
| if err != nil { |
| return nil, errors.E(op, errors.IO, errors.Errorf("unable to get default client: %s", err)) |
| } |
| } else { |
| b, err := base64.StdEncoding.DecodeString(keyData) |
| if err != nil { |
| return nil, errors.E(op, errors.IO, errors.Errorf("unable to decode %s: %s", privateKeyData, err)) |
| } |
| cfg, err := google.JWTConfigFromJSON(b, scope) |
| if err != nil { |
| return nil, errors.E(op, errors.Invalid, err) |
| } |
| ctx := gContext.Background() |
| client = oauth2.NewClient(ctx, cfg.TokenSource(ctx)) |
| } |
| |
| service, err := gcsBE.New(client) |
| if err != nil { |
| return nil, errors.E(op, errors.IO, errors.Errorf("unable to create storage service: %s", err)) |
| } |
| |
| return &gcsImpl{ |
| client: client, |
| service: service, |
| bucketName: bucket, |
| defaultWriteACL: acl, |
| }, nil |
| } |
| |
| func init() { |
| storage.Register("GCS", New) |
| } |
| |
| // Guarantee we implement the Storage interface. |
| var _ storage.Storage = (*gcsImpl)(nil) |
| |
| // LinkBase implements Storage. |
| func (gcs *gcsImpl) LinkBase() (base string, err error) { |
| return "https://storage.googleapis.com/" + gcs.bucketName + "/", nil |
| } |
| |
| // Download implements Storage. |
| func (gcs *gcsImpl) Download(ref string) ([]byte, error) { |
| const op = "cloud/storage/gcs.Download" |
| resp, err := gcs.service.Objects.Get(gcs.bucketName, ref).Download() |
| if err != nil { |
| if gcsErr, ok := err.(*googleapi.Error); ok && gcsErr.Code == 404 { |
| return nil, errors.E(op, errors.NotExist, err) |
| } |
| return nil, err |
| } |
| defer resp.Body.Close() |
| buf, err := ioutil.ReadAll(resp.Body) |
| if err != nil { |
| return nil, err |
| } |
| return buf, nil |
| } |
| |
| // Put implements Storage. |
| func (gcs *gcsImpl) Put(ref string, contents []byte) error { |
| const op = "cloud/storage/gcs.Put" |
| for tries := 0; ; tries++ { |
| _, err := gcs.service.Objects.Insert(gcs.bucketName, &gcsBE.Object{Name: ref}). |
| Media(bytes.NewReader(contents)). |
| PredefinedAcl(gcs.defaultWriteACL). |
| Do() |
| if err == nil { |
| return nil |
| } |
| if !strings.Contains(err.Error(), "503") || tries > 4 { |
| return errors.E(op, errors.Transient, err) |
| } |
| log.Info.Printf("cloud/storage/gcs: WARNING: retrying Insert(%s): %s", ref, err) |
| time.Sleep(time.Duration(100*(tries+1)) * time.Millisecond) |
| } |
| } |
| |
| // Delete implements Storage. |
| func (gcs *gcsImpl) Delete(ref string) error { |
| return gcs.service.Objects.Delete(gcs.bucketName, ref).Do() |
| } |
| |
| // emptyBucket completely removes all files in a bucket permanently. |
| // If verbose is true, every attempt to delete a file is logged to the standard logger. |
| // This is an expensive operation. It is also dangerous, so use with care. |
| // Use for testing only. |
| func (gcs *gcsImpl) emptyBucket(verbose bool) error { |
| const maxParallelDeletes = 10 |
| pageToken := "" |
| var firstErr error |
| recordErr := func(err error) bool { |
| if err == nil { |
| return false |
| } |
| if firstErr == nil { |
| firstErr = err |
| } |
| return true |
| } |
| for { |
| objs, err := gcs.service.Objects.List(gcs.bucketName).MaxResults(maxParallelDeletes).Fields("items(name),nextPageToken").PageToken(pageToken).Do() |
| if recordErr(err) { |
| log.Error.Printf("emptyBucket: List(%q): %v", gcs.bucketName, err) |
| break |
| } |
| if verbose { |
| log.Printf("Going to delete %d items from bucket %s", len(objs.Items), gcs.bucketName) |
| } |
| for _, o := range objs.Items { |
| if verbose { |
| log.Printf("Deleting: %q", o.Name) |
| } |
| err = gcs.Delete(o.Name) |
| if recordErr(err) { |
| log.Error.Printf("emptyBucket: Delete(%q): %v", o.Name, err) |
| continue |
| } |
| } |
| if objs.NextPageToken == "" { |
| break |
| } |
| pageToken = objs.NextPageToken |
| } |
| return firstErr |
| } |