metrics/recorder/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
use std::{cell::Cell, marker::PhantomData, ptr::NonNull};

mod cell;
use self::cell::RecorderOnceCell;

mod errors;
pub use self::errors::SetRecorderError;

mod noop;
pub use self::noop::NoopRecorder;

use crate::{Counter, Gauge, Histogram, Key, KeyName, Metadata, SharedString, Unit};

static NOOP_RECORDER: NoopRecorder = NoopRecorder;
static GLOBAL_RECORDER: RecorderOnceCell = RecorderOnceCell::new();

thread_local! {
    static LOCAL_RECORDER: Cell<Option<NonNull<dyn Recorder>>> = Cell::new(None);
}

/// A trait for registering and recording metrics.
///
/// This is the core trait that allows interoperability between exporter implementations and the macros provided by
/// `metrics`.
pub trait Recorder {
    /// Describes a counter.
    ///
    /// Callers may provide the unit or a description of the counter being registered. Whether or not a metric can be
    /// re-registered to provide a unit/description, if one was already passed or not, as well as how units/descriptions
    /// are used by the underlying recorder, is an implementation detail.
    fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString);

    /// Describes a gauge.
    ///
    /// Callers may provide the unit or a description of the gauge being registered. Whether or not a metric can be
    /// re-registered to provide a unit/description, if one was already passed or not, as well as how units/descriptions
    /// are used by the underlying recorder, is an implementation detail.
    fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString);

    /// Describes a histogram.
    ///
    /// Callers may provide the unit or a description of the histogram being registered. Whether or not a metric can be
    /// re-registered to provide a unit/description, if one was already passed or not, as well as how units/descriptions
    /// are used by the underlying recorder, is an implementation detail.
    fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString);

    /// Registers a counter.
    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter;

    /// Registers a gauge.
    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge;

    /// Registers a histogram.
    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram;
}

// Blanket implementations.
macro_rules! impl_recorder {
    ($inner_ty:ident, $ptr_ty:ty) => {
        impl<$inner_ty> $crate::Recorder for $ptr_ty
        where
            $inner_ty: $crate::Recorder + ?Sized,
        {
            fn describe_counter(
                &self,
                key: $crate::KeyName,
                unit: Option<$crate::Unit>,
                description: $crate::SharedString,
            ) {
                std::ops::Deref::deref(self).describe_counter(key, unit, description)
            }

            fn describe_gauge(
                &self,
                key: $crate::KeyName,
                unit: Option<$crate::Unit>,
                description: $crate::SharedString,
            ) {
                std::ops::Deref::deref(self).describe_gauge(key, unit, description)
            }

            fn describe_histogram(
                &self,
                key: $crate::KeyName,
                unit: Option<$crate::Unit>,
                description: $crate::SharedString,
            ) {
                std::ops::Deref::deref(self).describe_histogram(key, unit, description)
            }

            fn register_counter(
                &self,
                key: &$crate::Key,
                metadata: &$crate::Metadata<'_>,
            ) -> $crate::Counter {
                std::ops::Deref::deref(self).register_counter(key, metadata)
            }

            fn register_gauge(
                &self,
                key: &$crate::Key,
                metadata: &$crate::Metadata<'_>,
            ) -> $crate::Gauge {
                std::ops::Deref::deref(self).register_gauge(key, metadata)
            }

            fn register_histogram(
                &self,
                key: &$crate::Key,
                metadata: &$crate::Metadata<'_>,
            ) -> $crate::Histogram {
                std::ops::Deref::deref(self).register_histogram(key, metadata)
            }
        }
    };
}

impl_recorder!(T, &T);
impl_recorder!(T, &mut T);
impl_recorder!(T, std::boxed::Box<T>);
impl_recorder!(T, std::sync::Arc<T>);

/// Guard for setting a local recorder.
///
/// When using a local recorder, we take a reference to the recorder and only hold it for as long as the duration of the
/// closure. However, we must store this reference in a static variable (thread-local storage) so that it can be
/// accessed by the macros. This guard ensures that the pointer we store to the reference is cleared when the guard is
/// dropped, so that it can't be used after the closure has finished, even if the closure panics and unwinds the stack.
///
/// ## Note
///
/// The guard has a lifetime parameter `'a` that is bounded using a `PhantomData` type. This upholds the guard's
/// contravariance, it must live _at most as long_ as the recorder it takes a reference to. The bounded lifetime
/// prevents accidental use-after-free errors when using a guard directly through [`crate::set_default_local_recorder`].
pub struct LocalRecorderGuard<'a> {
    prev_recorder: Option<NonNull<dyn Recorder>>,
    phantom: PhantomData<&'a dyn Recorder>,
}

impl<'a> LocalRecorderGuard<'a> {
    /// Creates a new `LocalRecorderGuard` and sets the thread-local recorder.
    fn new(recorder: &'a dyn Recorder) -> Self {
        // SAFETY: While we take a lifetime-less pointer to the given reference, the reference we derive _from_ the
        // pointer is given the same lifetime of the reference used to construct the guard -- captured in the guard type
        // itself -- and so derived references never outlive the source reference.
        let recorder_ptr = unsafe { NonNull::new_unchecked(recorder as *const _ as *mut _) };

        let prev_recorder =
            LOCAL_RECORDER.with(|local_recorder| local_recorder.replace(Some(recorder_ptr)));

        Self { prev_recorder, phantom: PhantomData }
    }
}

impl<'a> Drop for LocalRecorderGuard<'a> {
    fn drop(&mut self) {
        // Clear the thread-local recorder.
        LOCAL_RECORDER.with(|local_recorder| local_recorder.replace(self.prev_recorder.take()));
    }
}

/// Sets the global recorder.
///
/// This function may only be called once in the lifetime of a program. Any metrics recorded before this method is
/// called will be completely ignored.
///
/// This function does not typically need to be called manually.  Metrics implementations should provide an
/// initialization method that installs the recorder internally.
///
/// # Errors
///
/// An error is returned if a recorder has already been set.
pub fn set_global_recorder<R>(recorder: R) -> Result<(), SetRecorderError<R>>
where
    R: Recorder + Sync + 'static,
{
    GLOBAL_RECORDER.set(recorder)
}

/// Sets the recorder as the default for the current thread for the duration of the lifetime of the returned
/// [`LocalRecorderGuard`].
///
/// This function is suitable for capturing metrics in asynchronous code, in particular when using a single-threaded
/// runtime. Any metrics registered prior to the returned guard will remain attached to the recorder that was present at
/// the time of registration, and so this cannot be used to intercept existing metrics.
///
/// Additionally, local recorders can be used in a nested fashion. When setting a new default local recorder, the
/// previous default local recorder will be captured if one was set, and will be restored when the returned guard drops.
/// the lifetime of the returned [`LocalRecorderGuard`].
///
/// Any metrics recorded before a guard is returned will be completely ignored.  Metrics implementations should provide
/// an initialization method that installs the recorder internally.
///
/// The function is suitable for capturing metrics in asynchronous code that uses a single threaded runtime.
///
/// If a global recorder is set, it will be restored once the guard is dropped.
#[must_use]
pub fn set_default_local_recorder(recorder: &dyn Recorder) -> LocalRecorderGuard {
    LocalRecorderGuard::new(recorder)
}

/// Runs the closure with the given recorder set as the global recorder for the duration.
///
/// This only applies as long as the closure is running, and on the thread where `with_local_recorder` is called. This
/// does not extend to other threads, and so is not suitable for capturing metrics in asynchronous code where multiple
/// threads are involved.
pub fn with_local_recorder<T>(recorder: &dyn Recorder, f: impl FnOnce() -> T) -> T {
    let _local = LocalRecorderGuard::new(recorder);
    f()
}

/// Runs the closure with a reference to the current recorder for this scope.
///
/// If a local recorder has been set, it will be used. Otherwise, the global recorder will be used.  If neither a local
/// recorder or global recorder have been set, a no-op recorder will be used.
///
/// It should typically not be necessary to call this function directly, as it is used primarily by generated code. You
/// should prefer working with the macros provided by `metrics` instead: `counter!`, `gauge!`, `histogram!`, etc.
pub fn with_recorder<T>(f: impl FnOnce(&dyn Recorder) -> T) -> T {
    LOCAL_RECORDER.with(|local_recorder| {
        if let Some(recorder) = local_recorder.get() {
            // SAFETY: If we have a local recorder, we know that it is valid because it can only be set during the
            // duration of a closure that is passed to `with_local_recorder`, which is the only time this method can be
            // called and have a local recorder set. This ensures that the lifetime of the recorder is valid for the
            // duration of this method call.
            unsafe { f(recorder.as_ref()) }
        } else if let Some(global_recorder) = GLOBAL_RECORDER.try_load() {
            f(global_recorder)
        } else {
            f(&NOOP_RECORDER)
        }
    })
}

#[cfg(test)]
mod tests {
    use std::sync::{atomic::Ordering, Arc};

    use crate::{with_local_recorder, NoopRecorder};

    use super::{Recorder, RecorderOnceCell};

    #[test]
    fn boxed_recorder_dropped_on_existing_set() {
        // This test simply ensures that if a boxed recorder is handed to us to install, and another
        // recorder has already been installed, that we drop the new boxed recorder instead of
        // leaking it.
        let recorder_cell = RecorderOnceCell::new();

        // This is the first set of the cell, so it should always succeed.
        let (first_recorder, _) = test_recorders::TrackOnDropRecorder::new();
        let first_set_result = recorder_cell.set(first_recorder);
        assert!(first_set_result.is_ok());

        // Since the cell is already set, this second set should fail. We'll also then assert that
        // our atomic boolean is set to `true`, indicating the drop logic ran for it.
        let (second_recorder, was_dropped) = test_recorders::TrackOnDropRecorder::new();
        assert!(!was_dropped.load(Ordering::SeqCst));

        let second_set_result = recorder_cell.set(second_recorder);
        assert!(second_set_result.is_err());
        assert!(!was_dropped.load(Ordering::SeqCst));
        drop(second_set_result);
        assert!(was_dropped.load(Ordering::SeqCst));
    }

    #[test]
    fn blanket_implementations() {
        fn is_recorder<T: Recorder>(_recorder: T) {}

        let mut local = NoopRecorder;

        is_recorder(NoopRecorder);
        is_recorder(Arc::new(NoopRecorder));
        is_recorder(Box::new(NoopRecorder));
        is_recorder(&local);
        is_recorder(&mut local);
    }

    #[test]
    fn thread_scoped_recorder_guards() {
        // This test ensures that when a recorder is installed through
        // `crate::set_default_local_recorder` it will only be valid in the scope of the
        // thread.
        //
        // The goal of the test is to give confidence that no invalid memory
        // access errors are present when operating with locally scoped
        // recorders.
        let t1_recorder = test_recorders::SimpleCounterRecorder::new();
        let t2_recorder = test_recorders::SimpleCounterRecorder::new();
        let t3_recorder = test_recorders::SimpleCounterRecorder::new();
        // Start a new thread scope to take references to each recorder in the
        // closures passed to the thread.
        std::thread::scope(|s| {
            s.spawn(|| {
                let _guard = crate::set_default_local_recorder(&t1_recorder);
                crate::counter!("t1_counter").increment(1);
            });

            s.spawn(|| {
                with_local_recorder(&t2_recorder, || {
                    crate::counter!("t2_counter").increment(2);
                })
            });

            s.spawn(|| {
                let _guard = crate::set_default_local_recorder(&t3_recorder);
                crate::counter!("t3_counter").increment(3);
            });
        });

        assert!(t1_recorder.get_value() == 1);
        assert!(t2_recorder.get_value() == 2);
        assert!(t3_recorder.get_value() == 3);
    }

    #[test]
    fn local_recorder_restored_when_dropped() {
        // This test ensures that any previously installed local recorders are
        // restored when the subsequently installed recorder's guard is dropped.
        let root_recorder = test_recorders::SimpleCounterRecorder::new();
        // Install the root recorder and increment the counter once.
        let _guard = crate::set_default_local_recorder(&root_recorder);
        crate::counter!("test_counter").increment(1);

        // Install a second recorder and increment its counter once.
        let next_recorder = test_recorders::SimpleCounterRecorder::new();
        let next_guard = crate::set_default_local_recorder(&next_recorder);
        crate::counter!("test_counter").increment(1);
        let final_recorder = test_recorders::SimpleCounterRecorder::new();
        crate::with_local_recorder(&final_recorder, || {
            // Final recorder increments the counter by 10. At the end of the
            // closure, the guard should be dropped, and `next_recorder`
            // restored.
            crate::counter!("test_counter").increment(10);
        });
        // Since `next_recorder` is restored, we can increment it once and check
        // that the value is 2 (+1 before and after the closure).
        crate::counter!("test_counter").increment(1);
        assert!(next_recorder.get_value() == 2);
        drop(next_guard);

        // At the end, increment the counter again by an arbitrary value. Since
        // `next_guard` is dropped, the root recorder is restored.
        crate::counter!("test_counter").increment(20);
        assert!(root_recorder.get_value() == 21);
    }

    mod test_recorders {
        use std::sync::{
            atomic::{AtomicBool, AtomicU64, Ordering},
            Arc,
        };

        use crate::Recorder;

        #[derive(Debug)]
        // Tracks how many times the recorder was dropped
        pub struct TrackOnDropRecorder(Arc<AtomicBool>);

        impl TrackOnDropRecorder {
            pub fn new() -> (Self, Arc<AtomicBool>) {
                let arc = Arc::new(AtomicBool::new(false));
                (Self(arc.clone()), arc)
            }
        }

        // === impl TrackOnDropRecorder ===

        impl Recorder for TrackOnDropRecorder {
            fn describe_counter(
                &self,
                _: crate::KeyName,
                _: Option<crate::Unit>,
                _: crate::SharedString,
            ) {
            }
            fn describe_gauge(
                &self,
                _: crate::KeyName,
                _: Option<crate::Unit>,
                _: crate::SharedString,
            ) {
            }
            fn describe_histogram(
                &self,
                _: crate::KeyName,
                _: Option<crate::Unit>,
                _: crate::SharedString,
            ) {
            }

            fn register_counter(&self, _: &crate::Key, _: &crate::Metadata<'_>) -> crate::Counter {
                crate::Counter::noop()
            }

            fn register_gauge(&self, _: &crate::Key, _: &crate::Metadata<'_>) -> crate::Gauge {
                crate::Gauge::noop()
            }

            fn register_histogram(
                &self,
                _: &crate::Key,
                _: &crate::Metadata<'_>,
            ) -> crate::Histogram {
                crate::Histogram::noop()
            }
        }

        impl Drop for TrackOnDropRecorder {
            fn drop(&mut self) {
                self.0.store(true, Ordering::SeqCst);
            }
        }

        // A simple recorder that only implements `register_counter`.
        #[derive(Debug)]
        pub struct SimpleCounterRecorder {
            state: Arc<AtomicU64>,
        }

        impl SimpleCounterRecorder {
            pub fn new() -> Self {
                Self { state: Arc::new(AtomicU64::default()) }
            }

            pub fn get_value(&self) -> u64 {
                self.state.load(Ordering::Acquire)
            }
        }

        struct SimpleCounterHandle {
            state: Arc<AtomicU64>,
        }

        impl crate::CounterFn for SimpleCounterHandle {
            fn increment(&self, value: u64) {
                self.state.fetch_add(value, Ordering::Acquire);
            }

            fn absolute(&self, _value: u64) {
                unimplemented!()
            }
        }

        // === impl SimpleCounterRecorder ===

        impl Recorder for SimpleCounterRecorder {
            fn describe_counter(
                &self,
                _: crate::KeyName,
                _: Option<crate::Unit>,
                _: crate::SharedString,
            ) {
            }
            fn describe_gauge(
                &self,
                _: crate::KeyName,
                _: Option<crate::Unit>,
                _: crate::SharedString,
            ) {
            }
            fn describe_histogram(
                &self,
                _: crate::KeyName,
                _: Option<crate::Unit>,
                _: crate::SharedString,
            ) {
            }

            fn register_counter(&self, _: &crate::Key, _: &crate::Metadata<'_>) -> crate::Counter {
                crate::Counter::from_arc(Arc::new(SimpleCounterHandle {
                    state: self.state.clone(),
                }))
            }

            fn register_gauge(&self, _: &crate::Key, _: &crate::Metadata<'_>) -> crate::Gauge {
                crate::Gauge::noop()
            }

            fn register_histogram(
                &self,
                _: &crate::Key,
                _: &crate::Metadata<'_>,
            ) -> crate::Histogram {
                crate::Histogram::noop()
            }
        }
    }
}