cloud/storage/openstack: implement storage.Lister
This will allow using upspin-audit scanstore.
The implementation is a bit convoluted as gophercloud does not
expect a client that proxies pagination.
Running against my storage gives:
remote,$domain:443: 181746788561 bytes total (169.26GB) in 271390 references
which matches the storage provider's summary data in the UI.
Change-Id: Ibc8cbc0a5de8ff2d8d3532a8a217b9f78a10f07e
Reviewed-on: https://upspin-review.googlesource.com/17320
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/cloud/storage/openstack/openstack.go b/cloud/storage/openstack/openstack.go
index 6efe324..79dbb58 100755
--- a/cloud/storage/openstack/openstack.go
+++ b/cloud/storage/openstack/openstack.go
@@ -13,6 +13,7 @@
"github.com/gophercloud/gophercloud/openstack"
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/containers"
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/objects"
+ "github.com/gophercloud/gophercloud/pagination"
"upspin.io/cloud/storage"
"upspin.io/errors"
@@ -102,7 +103,10 @@
}
}
-var _ storage.Storage = (*openstackStorage)(nil)
+var (
+ _ storage.Storage = (*openstackStorage)(nil)
+ _ storage.Lister = (*openstackStorage)(nil)
+)
// LinkBase will return the URL if the container has read access for everybody
// and an unsupported error in case it does not. Still, it might return an
@@ -161,3 +165,63 @@
}
return nil
}
+
+func (s *openstackStorage) pager(url string, perPage int) pagination.Pager {
+ // First page, can use objects.List().
+ if url == "" {
+ return objects.List(s.client, s.container, objects.ListOpts{
+ Full: true,
+ Limit: perPage,
+ })
+ }
+ // Continuation page, need custom pager.
+ return pagination.NewPager(s.client, url, func(r pagination.PageResult) pagination.Page {
+ p := objects.ObjectPage{
+ MarkerPageBase: pagination.MarkerPageBase{PageResult: r},
+ }
+ p.MarkerPageBase.Owner = p
+ return p
+ })
+}
+
+func (s *openstackStorage) list(url string, perPage int) (refs []upspin.ListRefsItem, nextToken string, err error) {
+ const op = "cloud/storage/openstack.List"
+
+ pager := s.pager(url, perPage)
+
+ err = pager.EachPage(func(page pagination.Page) (bool, error) {
+ objs, err := objects.ExtractInfo(page)
+ if err != nil {
+ return false, err
+ }
+ for _, o := range objs {
+ refs = append(refs, upspin.ListRefsItem{
+ Ref: upspin.Reference(o.Name),
+ Size: o.Bytes,
+ })
+ }
+ token, err := page.NextPageURL()
+ if err != nil {
+ return false, err
+ }
+ nextToken = token
+
+ // Stop pagination after the first page. Let the Upspin client
+ // do the pagination. If we called pager.AllPages() we'd have
+ // to wait until all pagination is done and the client would
+ // probably give up waiting for a response.
+ return false, nil
+ })
+
+ if err != nil {
+ err = errors.E(op, errors.IO, errors.Errorf("%q: %v", s.container, err))
+ }
+
+ return
+}
+
+// List implements storage.Lister. In this implementation, the token is in fact
+// the URL for the next page.
+func (s *openstackStorage) List(token string) ([]upspin.ListRefsItem, string, error) {
+ return s.list(token, 0)
+}
diff --git a/cloud/storage/openstack/openstack_test.go b/cloud/storage/openstack/openstack_test.go
index cd12c57..00fcb57 100755
--- a/cloud/storage/openstack/openstack_test.go
+++ b/cloud/storage/openstack/openstack_test.go
@@ -13,15 +13,17 @@
"testing"
"time"
+ "github.com/gophercloud/gophercloud/openstack"
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/containers"
"upspin.io/cloud/storage"
"upspin.io/errors"
"upspin.io/log"
+ "upspin.io/upspin"
)
const (
- defaultTestRegion = "BHS3"
+ defaultTestRegion = "WAW1"
defaultTestContainer = "upspin-test-container"
)
@@ -101,6 +103,66 @@
}
}
+func TestListingEmptyContainer(t *testing.T) {
+ l := client.(*openstackStorage)
+ refs, nextToken, err := l.List("")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(refs) != 0 {
+ t.Errorf("List returned %d refs, want 0", len(refs))
+ }
+ if nextToken != "" {
+ t.Errorf("List returned token %q, want empty string", nextToken)
+ }
+}
+
+func TestListingWithPagination(t *testing.T) {
+ putRefs := make([]string, 10)
+ for i := 0; i < 10; i++ {
+ ref := fmt.Sprintf("ref%d", i)
+ putRefs[i] = ref
+ if err := client.Put(ref, objectContents); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Try to clean up so the container can be deleted.
+ defer func() {
+ for _, ref := range putRefs {
+ client.Delete(ref)
+ }
+ }()
+
+ refs, callCount, err := getAllRefs(3, len(putRefs))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(refs) != len(putRefs) {
+ t.Errorf("Listed %d refs, want %d", len(refs), len(putRefs))
+ }
+ if want := 4; callCount != want {
+ t.Errorf("List split into %d pages, want %d", callCount, want)
+ }
+}
+
+func getAllRefs(perPage int, maxCalls int) (allRefs []upspin.ListRefsItem, callCount int, err error) {
+ l := client.(*openstackStorage)
+ var token string
+ for ; callCount < maxCalls; callCount++ {
+ var refs []upspin.ListRefsItem
+ refs, token, err = l.list(token, perPage)
+ if err != nil {
+ break
+ }
+ allRefs = append(allRefs, refs...)
+ if token == "" {
+ break
+ }
+ }
+ return
+}
+
func TestMain(m *testing.M) {
flag.Parse()
if !*useOpenStack {
@@ -117,10 +179,20 @@
// Create client that writes to test container.
var err error
+ // It is easier to source an openrc file than pass all via command-line
+ // flags.
+ ao, err := openstack.AuthOptionsFromEnv()
+ if err != nil {
+ log.Fatalf("cloud/storage/openstack: could not get auth opts from env: %v", err)
+ }
client, err = storage.Dial(
"OpenStack",
storage.WithKeyValue("openstackRegion", *testRegion),
storage.WithKeyValue("openstackContainer", *testContainer),
+ storage.WithKeyValue("openstackAuthURL", ao.IdentityEndpoint),
+ storage.WithKeyValue("privateOpenstackTenantName", ao.TenantName),
+ storage.WithKeyValue("privateOpenstackUsername", ao.Username),
+ storage.WithKeyValue("privateOpenstackPassword", ao.Password),
)
if err != nil {
log.Fatalf("cloud/storage/openstack: couldn't set up client: %v", err)