| // Copyright 2017 The Upspin Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "bytes" |
| "flag" |
| "fmt" |
| "os" |
| "path/filepath" |
| "sync" |
| "time" |
| |
| "upspin.io/path" |
| "upspin.io/upspin" |
| ) |
| |
| // This file implements the directory scan. Because the network time of flight is |
| // significant to throughput, the scan is parallelized, which makes the code |
| // more intricate than we'd like. |
| // The code actually walks the directory tree using Glob. We could in principle |
| // use Watch(-1), but snapshots are problematic for Watch. We take care to |
| // avoid scanning a directory we've already seen, which Watch doesn't do on |
| // the server. Our code makes it practical to scan the snapshot tree. |
| |
| const scanParallelism = 10 // Empirically chosen: speedup significant, not too many resources. |
| |
| type dirScanner struct { |
| State *State |
| inFlight sync.WaitGroup // Count of directories we have seen but not yet processed. |
| buffer chan *upspin.DirEntry // Where to send directories for processing. |
| dirsToDo chan *upspin.DirEntry // Receive from here to find next directory to process. |
| done chan *upspin.DirEntry // Send entries here once it is completely done, including children. |
| } |
| |
| type sizeMap map[upspin.Endpoint]map[upspin.Reference]int64 |
| |
| func (m sizeMap) addRef(ep upspin.Endpoint, ref upspin.Reference, size int64) { |
| refs := m[ep] |
| if refs == nil { |
| refs = make(map[upspin.Reference]int64) |
| m[ep] = refs |
| } |
| refs[ref] = size |
| } |
| |
| func (s *State) scanDirectories(args []string) { |
| const help = ` |
| Audit scandir scans the directory tree for the named user roots. |
| For now it just prints the total storage consumed.` |
| |
| fs := flag.NewFlagSet("scandir", flag.ExitOnError) |
| glob := fs.Bool("glob", true, "apply glob processing to the arguments") |
| dataDir := dataDirFlag(fs) |
| s.ParseFlags(fs, args, help, "audit scandir root ...") |
| |
| if fs.NArg() == 0 || fs.Arg(0) == "help" { |
| fs.Usage() |
| os.Exit(2) |
| } |
| |
| if err := os.MkdirAll(*dataDir, 0700); err != nil { |
| s.Exit(err) |
| } |
| |
| var paths []upspin.PathName |
| if *glob { |
| paths = s.GlobAllUpspinPath(fs.Args()) |
| } else { |
| for _, p := range fs.Args() { |
| paths = append(paths, upspin.PathName(p)) |
| } |
| } |
| |
| // Check that the arguments are user roots. |
| for _, p := range paths { |
| parsed, err := path.Parse(p) |
| if err != nil { |
| s.Exit(err) |
| } |
| if !parsed.IsRoot() { |
| s.Exitf("%q is not a user root", p) |
| } |
| } |
| |
| now := time.Now() |
| |
| sc := dirScanner{ |
| State: s, |
| buffer: make(chan *upspin.DirEntry), |
| dirsToDo: make(chan *upspin.DirEntry), |
| done: make(chan *upspin.DirEntry), |
| } |
| |
| for i := 0; i < scanParallelism; i++ { |
| go sc.dirWorker() |
| } |
| go sc.bufferLoop() |
| |
| // Prime the pump. |
| for _, p := range paths { |
| de, err := s.DirServer(p).Lookup(p) |
| if err != nil { |
| s.Exit(err) |
| } |
| sc.do(de) |
| } |
| |
| // Shut down the process tree once nothing is in flight. |
| go func() { |
| sc.inFlight.Wait() |
| close(sc.buffer) |
| close(sc.done) |
| }() |
| |
| // Receive and collect the data. |
| size := make(sizeMap) |
| users := make(map[upspin.UserName]sizeMap) |
| for de := range sc.done { |
| p, err := path.Parse(de.Name) |
| if err != nil { |
| s.Fail(err) |
| continue |
| } |
| userSize := users[p.User()] |
| if userSize == nil { |
| userSize = make(sizeMap) |
| users[p.User()] = userSize |
| } |
| for _, block := range de.Blocks { |
| ep := block.Location.Endpoint |
| size.addRef(ep, block.Location.Reference, block.Size) |
| userSize.addRef(ep, block.Location.Reference, block.Size) |
| } |
| } |
| |
| // Print a summary. |
| total := int64(0) |
| for ep, refs := range size { |
| sum := int64(0) |
| for _, s := range refs { |
| sum += s |
| } |
| total += sum |
| fmt.Printf("%s: %d bytes (%s) (%d references)\n", ep.NetAddr, sum, ByteSize(sum), len(refs)) |
| } |
| if len(size) > 1 { |
| fmt.Printf("%d bytes total (%s)\n", total, ByteSize(total)) |
| } |
| |
| // Write the data to files, one for each user/endpoint combo. |
| for u, size := range users { |
| for ep, refs := range size { |
| file := filepath.Join(*dataDir, fmt.Sprintf("%s%s_%s_%d", dirFilePrefix, ep.NetAddr, u, now.Unix())) |
| s.writeItems(file, itemMapToSlice(refs)) |
| } |
| } |
| } |
| |
| // do processes a DirEntry. If it's a file, we deliver it to the done channel. |
| // Otherwise it's a directory and we buffer it for expansion. |
| func (sc *dirScanner) do(entry *upspin.DirEntry) { |
| if !entry.IsDir() { |
| sc.done <- entry |
| } else { |
| sc.inFlight.Add(1) |
| sc.buffer <- entry |
| } |
| } |
| |
| // bufferLoop gathers work to do and distributes it to the workers. It acts as |
| // an itermediary buffering work to avoid deadlock; without this loop, workers |
| // would both send to and receive from the dirsToDo channel. Once nothing is |
| // pending or in flight, bufferLoop shuts down the processing network. |
| func (sc *dirScanner) bufferLoop() { |
| defer close(sc.dirsToDo) |
| entriesPending := make(map[*upspin.DirEntry]bool) |
| seen := make(map[string]bool) // Eirectories we have seen, keyed by references within. |
| buffer := sc.buffer |
| var keyBuf bytes.Buffer // For creating keys for the seen map. |
| for { |
| var entry *upspin.DirEntry |
| var dirsToDo chan *upspin.DirEntry |
| if len(entriesPending) > 0 { |
| // Pick one entry at random from the map. |
| for entry = range entriesPending { |
| break |
| } |
| dirsToDo = sc.dirsToDo |
| } else if buffer == nil { |
| return |
| } |
| select { |
| case dirsToDo <- entry: |
| delete(entriesPending, entry) |
| case entry, active := <-buffer: |
| if !active { |
| buffer = nil |
| break |
| } |
| // If this directory has already been done, don't do it again. |
| // This situation arises when scanning a snapshot tree, as most of |
| // the directories are just dups of those in the main tree. |
| // We identify duplication by comparing the list of references within. |
| // TODO: Find a less expensive check. |
| keyBuf.Reset() |
| for i := range entry.Blocks { |
| b := &entry.Blocks[i] |
| fmt.Fprintf(&keyBuf, "%q %q\n", b.Location.Endpoint, b.Location.Reference) |
| } |
| key := keyBuf.String() |
| if seen[key] { |
| sc.inFlight.Done() |
| } else { |
| seen[key] = true |
| entriesPending[entry] = true |
| } |
| } |
| } |
| } |
| |
| // dirWorker receives DirEntries for directories from the dirsToDo channel |
| // and processes them, descending into their components and delivering |
| // the results to the buffer channel. |
| func (sc *dirScanner) dirWorker() { |
| for dir := range sc.dirsToDo { |
| des, err := sc.State.DirServer(dir.Name).Glob(upspin.AllFilesGlob(dir.Name)) |
| if err != nil { |
| sc.State.Fail(err) |
| } else { |
| for _, de := range des { |
| sc.do(de) |
| } |
| } |
| sc.done <- dir |
| sc.inFlight.Done() |
| } |
| } |