| // Copyright 2016 The Upspin Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package gcpmetric implements a metric.Saver that records metrics with |
| // Google's Cloud Trace API. |
| package gcpmetric // import "gcp.upspin.io/cloud/gcpmetric" |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "time" |
| |
| "golang.org/x/net/context" |
| "golang.org/x/oauth2/google" |
| trace "google.golang.org/api/cloudtrace/v1" |
| |
| "upspin.io/errors" |
| "upspin.io/log" |
| "upspin.io/metric" |
| "upspin.io/serverutil" |
| ) |
| |
| // traceSaver is an interface to cloudtrace API. It is used mostly for testing. |
| type traceSaver interface { |
| // SaveTraces saves the traces to GCP. |
| Save(*trace.Traces) error |
| } |
| |
| // gcpSaver is a Saver that saves to GCP Traces. |
| type gcpSaver struct { |
| projectID string |
| api traceSaver |
| saverQueue chan *metric.Metric |
| staticLabels map[string]string |
| n int // sampling 1 out of every n metrics. |
| rate *serverutil.RateLimiter |
| } |
| |
| var _ metric.Saver = (*gcpSaver)(nil) |
| |
| // onFlush is called when data is saved to the backend. It's used in tests only. |
| var onFlush = func() {} |
| |
| // NewSaver returns a metric.Saver that saves metrics to GCP Traces for a GCP |
| // projectID. The caller must have enabled the StackDriver Traces API for the |
| // projectID and have sufficient permission to use the scope "cloud-platform". |
| // |
| // A random sampling ratio of 1-to-n is taken. |
| // Setting n to 1 reports every event. |
| // |
| // A maximum number of outbound network requests per second (maxQPS) when |
| // uploading metrics to the GCP backend is enforced. Values equal to or larger |
| // than 1000 mean unlimited. |
| // |
| // An optional set of string key-value pairs can be given and they will be saved |
| // as labels on GCP. They are useful, for example, in the case of |
| // differentiating a metric coming from a test instance versus production. |
| func NewSaver(projectID string, n, maxQPS int, labels ...string) (metric.Saver, error) { |
| const op = "gcpmetric.New" |
| // Authentication is provided by the gcloud tool when running locally, and |
| // by the associated service account when running on Compute Engine. |
| client, err := google.DefaultClient(context.Background(), trace.CloudPlatformScope) |
| if err != nil { |
| return nil, errors.E(op, errors.Internal, errors.Errorf("unable to get default client: %v", err)) |
| } |
| if n < 1 { |
| return nil, errors.E(op, errors.Invalid, errors.Errorf("invalid sampling rate n=%d", n)) |
| } |
| |
| srv, err := trace.New(client) |
| if err != nil { |
| return nil, errors.E(op, errors.IO, err) |
| } |
| if len(labels)%2 != 0 { |
| return nil, errors.E(op, errors.Invalid, errors.Str("metric labels must come in pairs")) |
| } |
| |
| var rate *serverutil.RateLimiter |
| if maxQPS < 1000 { |
| maxQPS++ |
| rate = &serverutil.RateLimiter{ |
| Backoff: time.Second / time.Duration(maxQPS), |
| Max: time.Second / time.Duration(maxQPS), |
| } |
| } |
| rand.Seed(time.Now().Unix()) |
| |
| return &gcpSaver{ |
| projectID: projectID, |
| api: &traceSaverImpl{ |
| projectID: projectID, |
| api: srv.Projects, |
| }, |
| staticLabels: makeLabels(labels), |
| n: n, |
| rate: rate, |
| }, nil |
| } |
| |
| func (g *gcpSaver) Register(queue chan *metric.Metric) { |
| g.saverQueue = queue |
| go g.saverLoop() |
| } |
| |
| func (g *gcpSaver) saverLoop() { |
| const idleTimeout = time.Hour |
| var ( |
| batchTimeout time.Duration |
| traces []*trace.Trace |
| ) |
| if g.rate != nil { |
| batchTimeout = g.rate.Backoff |
| } else { |
| batchTimeout = 50 * time.Millisecond |
| } |
| maybeSave := func() bool { |
| if g.rate != nil { |
| if pass, _ := g.rate.Pass("metric"); !pass { |
| return false |
| } |
| } |
| g.save(traces) |
| traces = traces[:0] |
| return true |
| } |
| timer := time.NewTimer(idleTimeout) |
| for { |
| select { |
| case m := <-g.saverQueue: |
| if len(traces) >= metric.SaveQueueLength/2 { |
| // Buffer is half full. Start trying to save. |
| maybeSave() |
| } |
| if g.n > 1 { |
| // Only 1 out every n is buffered to be saved. |
| if rand.Intn(g.n) > 0 { |
| continue |
| } |
| } |
| // Buffer metric for later. |
| traces = append(traces, g.prepareToSave(m)) |
| // While we're getting new metrics, reset the timer so |
| // we have time to fill the buffer. |
| if !timer.Stop() { |
| <-timer.C |
| } |
| timer.Reset(batchTimeout) |
| case <-timer.C: |
| // Timer expired. Try to flush if there's anything |
| // buffered. |
| if len(traces) == 0 || maybeSave() { |
| // Nothing left to save. Revert to the long |
| // timeout. |
| timer.Reset(idleTimeout) |
| } else { |
| // Buffered metrics remain. Try again soon. |
| timer.Reset(batchTimeout) |
| } |
| } |
| } |
| } |
| |
| // save serializes the metric in a GCP-friendly way and saves it to the |
| // specific GCP backend configured when the Saver was created. |
| func (g *gcpSaver) prepareToSave(m *metric.Metric) *trace.Trace { |
| spans := m.Spans() |
| traceSpans := make([]*trace.TraceSpan, len(spans)) |
| for i, s := range spans { |
| var annotation map[string]string |
| if s.Annotation != "" { |
| annotation = map[string]string{"txt": s.Annotation} |
| } |
| traceSpans[i] = &trace.TraceSpan{ |
| SpanId: uint64(i + 1), |
| Name: s.Name, |
| StartTime: formatTime(s.StartTime), |
| EndTime: formatTime(s.EndTime), |
| Kind: toKindString(s.Kind), |
| Labels: mergeMaps(g.staticLabels, annotation), |
| } |
| if s.ParentSpan != nil { |
| // This can be N^2 if every span has a parent. But we should not have zillions of spans, so ok. |
| r := findSpanRank(s.ParentSpan, m) |
| if r != -1 { |
| traceSpans[i].ParentSpanId = uint64(r + 1) |
| } |
| } |
| } |
| return &trace.Trace{ |
| ProjectId: g.projectID, |
| TraceId: makeTraceID(), |
| Spans: traceSpans, |
| } |
| } |
| |
| func (g *gcpSaver) save(traces []*trace.Trace) { |
| batch := &trace.Traces{ |
| Traces: traces, |
| } |
| err := g.api.Save(batch) |
| if err != nil { |
| log.Error.Printf("metric: error saving to GCP: %v", err) |
| } |
| onFlush() |
| } |
| |
| func toKindString(k metric.Kind) string { |
| switch k { |
| case metric.Server: |
| return "RPC_SERVER" |
| case metric.Client: |
| return "RPC_CLIENT" |
| default: |
| return "SPAN_KIND_UNSPECIFIED" |
| } |
| } |
| |
| // findSpanRank returns the position (rank) of a span within a metric. |
| // If not found -1 is returned. |
| func findSpanRank(s *metric.Span, m *metric.Metric) int { |
| for i, span := range m.Spans() { |
| if s == span { |
| return i |
| } |
| } |
| return -1 |
| } |
| |
| // formatTime returns a time string formatted in the format expected by GCP traces: nanoseconds from Unix epoch in the |
| // format "2016-06-02T14:01:23.045123456Z" |
| func formatTime(tm time.Time) string { |
| return tm.Format(time.RFC3339Nano) |
| } |
| |
| // makeTraceID makes a random string containing 16 bytes of hex-encoded digits. It is what GCP Traces expects as ID. |
| // Because it needs to be unique within a project, we pad it with random numbers. |
| func makeTraceID() string { |
| var b [16]byte |
| n, err := rand.Read(b[:]) |
| if err != nil || n != len(b) { |
| // Will probably never happen, but if it does, we just use timenow. |
| ts := time.Now() |
| id := fmt.Sprintf("%x%x%x%x%x%x%x", ts, ts, ts, ts, ts, ts, ts) |
| return id[:32] |
| } |
| return fmt.Sprintf("%x", b) |
| } |
| |
| // makeLabels converts an even-length slice of labels to a map. |
| // A zero-length slice is valid and returns an empty map. |
| func makeLabels(labels []string) map[string]string { |
| var m map[string]string |
| if len(labels) > 0 { |
| m = make(map[string]string, len(labels)/2+1) |
| } |
| for i := 0; i < len(labels); i = i + 2 { |
| m[labels[i]] = labels[i+1] |
| } |
| return m |
| } |
| |
| // mergeMaps merges m1 and m2 together into a new copy and returns it, without changing either m1 or m2. |
| func mergeMaps(m1, m2 map[string]string) map[string]string { |
| var m map[string]string |
| if len(m1) == 0 && len(m2) == 0 { |
| return m |
| } |
| m = make(map[string]string, len(m1)+len(m2)) |
| copyMap(m, m1) |
| copyMap(m, m2) |
| return m |
| } |
| |
| func copyMap(dst, src map[string]string) { |
| for k, v := range src { |
| dst[k] = v |
| } |
| } |
| |
| // traceSaverImpl is a concrete implementation of traceSaver that writes to GCP. |
| type traceSaverImpl struct { |
| projectID string |
| api *trace.ProjectsService |
| } |
| |
| var _ traceSaver = (*traceSaverImpl)(nil) |
| |
| // Save implement SaveTraces. |
| func (s *traceSaverImpl) Save(traces *trace.Traces) error { |
| _, err := s.api.PatchTraces(s.projectID, traces).Do() |
| return err |
| } |