Skip to content

Commit 81b3c24

Browse files
committed
abstract the metadata function and use WithErrorReason
Signed-off-by: Andreas Gkizas <andreas.gkizas@elastic.co>
1 parent 570f81d commit 81b3c24

File tree

4 files changed

+55
-38
lines changed

4 files changed

+55
-38
lines changed

processor/ratelimitprocessor/gubernator.go

+33-22
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ func (r *gubernatorRateLimiter) Shutdown(ctx context.Context) error {
9797
}
9898

9999
func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
100-
projectID := getProjectIDFromMetadata(ctx)
101100
uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys)
101+
102102
createdAt := time.Now().UnixMilli()
103103
getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{
104104
Requests: []*gubernator.RateLimitReq{{
@@ -115,32 +115,35 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
115115
})
116116
if err != nil {
117117
r.set.Logger.Error("error executing gubernator rate limit request", zap.Error(err))
118-
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
119-
telemetry.WithProjectID(projectID),
120-
telemetry.WithLimitReason("request_error"),
118+
attrs := []attribute.KeyValue{
119+
telemetry.WithErrorReason(telemetry.BadRequest),
121120
telemetry.WithDecision("rejected"),
122-
)))
121+
}
122+
attrs = attrsFromMetadata(ctx, r.cfg.MetadataKeys, attrs)
123+
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(attrs...)))
123124
return errRateLimitInternalError
124125
}
125126

126127
// Inside the gRPC response, we should have a single-item list of responses.
127128
responses := getRateLimitsResp.GetResponses()
128129
if n := len(responses); n != 1 {
129-
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
130-
telemetry.WithProjectID(projectID),
131-
telemetry.WithLimitReason("request_error"),
130+
attrs := []attribute.KeyValue{
131+
telemetry.WithErrorReason(telemetry.Invalid),
132132
telemetry.WithDecision("accepted"),
133-
)))
133+
}
134+
attrs = attrsFromMetadata(ctx, r.cfg.MetadataKeys, attrs)
135+
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(attrs...)))
134136
return fmt.Errorf("expected 1 response from gubernator, got %d", n)
135137
}
136138
resp := responses[0]
137139
if resp.GetError() != "" {
138140
r.set.Logger.Error("failed to get response from gubernator", zap.Error(errors.New(resp.GetError())))
139-
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
140-
telemetry.WithProjectID(projectID),
141-
telemetry.WithLimitReason("limit_error"),
141+
attrs := []attribute.KeyValue{
142+
telemetry.WithErrorReason(telemetry.ServerError),
142143
telemetry.WithDecision("rejected"),
143-
)))
144+
}
145+
attrs = attrsFromMetadata(ctx, r.cfg.MetadataKeys, attrs)
146+
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(attrs...)))
144147
return errRateLimitInternalError
145148
}
146149

@@ -154,28 +157,36 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
154157
zap.String("processor_id", r.set.ID.String()),
155158
zap.Strings("metadata_keys", r.cfg.MetadataKeys),
156159
)
157-
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
158-
telemetry.WithProjectID(projectID),
159-
telemetry.WithLimitReason("throttled"),
160-
telemetry.WithDecision("accepted"),
161-
)))
160+
attrs := []attribute.KeyValue{
161+
telemetry.WithErrorReason(telemetry.StatusOverLimit),
162+
telemetry.WithDecision("rejected"),
163+
}
164+
attrs = attrsFromMetadata(ctx, r.cfg.MetadataKeys, attrs)
165+
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(attrs...)))
162166
return errTooManyRequests
163167
case ThrottleBehaviorDelay:
164168
delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond
165169
timer := time.NewTimer(delay)
166170
defer timer.Stop()
167171
select {
168172
case <-ctx.Done():
173+
attrs := []attribute.KeyValue{
174+
telemetry.WithErrorReason(telemetry.StatusOverLimit),
175+
telemetry.WithDecision("accepted"),
176+
}
177+
attrs = attrsFromMetadata(ctx, r.cfg.MetadataKeys, attrs)
178+
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(attrs...)))
169179
return ctx.Err()
170180
case <-timer.C:
171181
}
172182
}
173183
}
174184

175-
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
176-
telemetry.WithProjectID(projectID),
177-
telemetry.WithLimitReason("under_limit"),
185+
attrs := []attribute.KeyValue{
186+
telemetry.WithErrorReason(telemetry.StatusUnderLimit),
178187
telemetry.WithDecision("accepted"),
179-
)))
188+
}
189+
attrs = attrsFromMetadata(ctx, r.cfg.MetadataKeys, attrs)
190+
r.telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(attrs...)))
180191
return nil
181192
}

processor/ratelimitprocessor/gubernator_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,12 @@ func TestGubernatorRateLimiter_RateLimit_Meterprovider(t *testing.T) {
211211
mp := metric.NewMeterProvider(metric.WithReader(reader))
212212
otel.SetMeterProvider(mp)
213213

214-
cfg := createDefaultConfig().(*Config)
214+
cfg := &Config{
215+
Rate: 1,
216+
Burst: 2,
217+
MetadataKeys: []string{"x-elastic-project-id"},
218+
ThrottleBehavior: ThrottleBehaviorError,
219+
}
215220
server, rl := newTestGubernatorRateLimiter(t, cfg, mp)
216221

217222
//We provide the x-elastic-project-id in oder to set the ProjectID

processor/ratelimitprocessor/internal/telemetry/attributes.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ const (
3131
outcomeKey = "outcome"
3232
errorReasonKey = "error_reason"
3333
descisionKey = "ratelimit_decision"
34-
limitReasonKey = "reason"
3534

36-
TooLarge ErrorReason = "too_large"
37-
BadRequest ErrorReason = "bad_request"
38-
Invalid ErrorReason = "invalid"
39-
TooMany ErrorReason = "too_many"
35+
TooLarge ErrorReason = "too_large"
36+
BadRequest ErrorReason = "bad_request"
37+
Invalid ErrorReason = "invalid"
38+
TooMany ErrorReason = "too_many"
39+
StatusUnderLimit ErrorReason = "underl_limit"
40+
StatusOverLimit ErrorReason = "throttled"
4041

4142
SignalTrace = "trace"
4243
SignalMetric = "metric"
@@ -56,11 +57,6 @@ func WithDecision(decision string) attribute.KeyValue {
5657
return attribute.String(descisionKey, decision)
5758
}
5859

59-
// WithLimitReason returns limitReason attribute with key.
60-
func WithLimitReason(limitReason string) attribute.KeyValue {
61-
return attribute.String(limitReasonKey, limitReason)
62-
}
63-
6460
// WithProtocol returns a protocol attribute with key.
6561
func WithProtocol(protocol string) attribute.KeyValue {
6662
return attribute.String(protocolKey, protocol)

processor/ratelimitprocessor/ratelimiter.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strings"
2424

2525
"go.opentelemetry.io/collector/client"
26+
"go.opentelemetry.io/otel/attribute"
2627
)
2728

2829
var (
@@ -56,13 +57,17 @@ const (
5657
ProjectID = "x-elastic-project-id"
5758
)
5859

59-
func getProjectIDFromMetadata(ctx context.Context) string {
60+
func attrsFromMetadata(ctx context.Context, metadataKeys []string, attrs []attribute.KeyValue) []attribute.KeyValue {
6061
clientInfo := client.FromContext(ctx)
61-
values := clientInfo.Metadata.Get(ProjectID)
62-
if len(values) > 0 {
63-
return values[0]
62+
63+
for _, key := range metadataKeys {
64+
values := clientInfo.Metadata.Get(key)
65+
if len(values) > 0 {
66+
attrs = append(attrs, attribute.String(key, strings.Join(values, ",")))
67+
}
6468
}
65-
return ""
69+
70+
return attrs
6671
}
6772

6873
func getUniqueKey(ctx context.Context, metadataKeys []string) string {

0 commit comments

Comments
 (0)