import copy from prometheus_client import REGISTRY METRIC_EQUALS_ERR_EXPLANATION = """ %s%s = %s, expected %s. The values for %s are: %s""" METRIC_DIFF_ERR_EXPLANATION = """ %s%s changed by %f, expected %f. Value before: %s Value after: %s """ METRIC_COMPARE_ERR_EXPLANATION = """ The change in value of %s%s didn't match the predicate. Value before: %s Value after: %s """ METRIC_DIFF_ERR_NONE_EXPLANATION = """ %s%s was None after. Value before: %s Value after: %s """ """A collection of utilities that make it easier to write test cases that interact with metrics. """ def assert_metric_equal(expected_value, metric_name, registry=REGISTRY, **labels): """Asserts that metric_name{**labels} == expected_value.""" value = get_metric(metric_name, registry=registry, **labels) assert_err = METRIC_EQUALS_ERR_EXPLANATION % ( metric_name, format_labels(labels), value, expected_value, metric_name, format_vector(get_metrics_vector(metric_name)), ) assert expected_value == value, assert_err def assert_metric_diff(frozen_registry, expected_diff, metric_name, registry=REGISTRY, **labels): """Asserts that metric_name{**labels} changed by expected_diff between the frozen registry and now. A frozen registry can be obtained by calling save_registry, typically at the beginning of a test case. """ saved_value = get_metric_from_frozen_registry(metric_name, frozen_registry, **labels) current_value = get_metric(metric_name, registry=registry, **labels) assert current_value is not None, METRIC_DIFF_ERR_NONE_EXPLANATION % ( metric_name, format_labels(labels), saved_value, current_value, ) diff = current_value - (saved_value or 0.0) assert_err = METRIC_DIFF_ERR_EXPLANATION % ( metric_name, format_labels(labels), diff, expected_diff, saved_value, current_value, ) assert expected_diff == diff, assert_err def assert_metric_no_diff(frozen_registry, expected_diff, metric_name, registry=REGISTRY, **labels): """Asserts that metric_name{**labels} isn't changed by expected_diff between the frozen registry and now. A frozen registry can be obtained by calling save_registry, typically at the beginning of a test case. """ saved_value = get_metric_from_frozen_registry(metric_name, frozen_registry, **labels) current_value = get_metric(metric_name, registry=registry, **labels) assert current_value is not None, METRIC_DIFF_ERR_NONE_EXPLANATION % ( metric_name, format_labels(labels), saved_value, current_value, ) diff = current_value - (saved_value or 0.0) assert_err = METRIC_DIFF_ERR_EXPLANATION % ( metric_name, format_labels(labels), diff, expected_diff, saved_value, current_value, ) assert expected_diff != diff, assert_err def assert_metric_not_equal(expected_value, metric_name, registry=REGISTRY, **labels): """Asserts that metric_name{**labels} == expected_value.""" value = get_metric(metric_name, registry=registry, **labels) assert_err = METRIC_EQUALS_ERR_EXPLANATION % ( metric_name, format_labels(labels), value, expected_value, metric_name, format_vector(get_metrics_vector(metric_name)), ) assert expected_value != value, assert_err def assert_metric_compare(frozen_registry, predicate, metric_name, registry=REGISTRY, **labels): """Asserts that metric_name{**labels} changed according to a provided predicate function between the frozen registry and now. A frozen registry can be obtained by calling save_registry, typically at the beginning of a test case. """ saved_value = get_metric_from_frozen_registry(metric_name, frozen_registry, **labels) current_value = get_metric(metric_name, registry=registry, **labels) assert current_value is not None, METRIC_DIFF_ERR_NONE_EXPLANATION % ( metric_name, format_labels(labels), saved_value, current_value, ) assert predicate(saved_value, current_value) is True, METRIC_COMPARE_ERR_EXPLANATION % ( metric_name, format_labels(labels), saved_value, current_value, ) def save_registry(registry=REGISTRY): """Freezes a registry. This lets a user test changes to a metric instead of testing the absolute value. A typical use case looks like: registry = save_registry() doStuff() assert_metric_diff(registry, 1, 'stuff_done_total') """ return copy.deepcopy(list(registry.collect())) def get_metric(metric_name, registry=REGISTRY, **labels): """Gets a single metric.""" return get_metric_from_frozen_registry(metric_name, registry.collect(), **labels) def get_metrics_vector(metric_name, registry=REGISTRY): """Returns the values for all labels of a given metric. The result is returned as a list of (labels, value) tuples, where `labels` is a dict. This is quite a hack since it relies on the internal representation of the prometheus_client, and it should probably be provided as a function there instead. """ return get_metric_vector_from_frozen_registry(metric_name, registry.collect()) def get_metric_vector_from_frozen_registry(metric_name, frozen_registry): """Like get_metrics_vector, but from a frozen registry.""" output = [] for metric in frozen_registry: for sample in metric.samples: if sample[0] == metric_name: output.append((sample[1], sample[2])) return output def get_metric_from_frozen_registry(metric_name, frozen_registry, **labels): """Gets a single metric from a frozen registry.""" for metric in frozen_registry: for sample in metric.samples: if sample[0] == metric_name and sample[1] == labels: return sample[2] def format_labels(labels): """Format a set of labels to Prometheus representation. In: {'method': 'GET', 'port': '80'} Out: '{method="GET",port="80"}' """ return "{{{}}}".format(",".join([f'{k}="{v}"' for k, v in labels.items()])) def format_vector(vector): """Formats a list of (labels, value) where labels is a dict into a human-readable representation. """ return "\n".join([f"{format_labels(labels)} = {value}" for labels, value in vector])