Skip to content

OpenMetrics


OpenMetrics is used for collecting metrics using the CNCF-backed OpenMetrics format. This version is the default version for all new OpenMetric-checks, and it is compatible with Python 3 only.

Interface

datadog_checks.base.checks.openmetrics.v2.base.OpenMetricsBaseCheckV2 (AgentCheck)

OpenMetricsBaseCheckV2 is an updated class of OpenMetricsBaseCheck to scrape endpoints that emit Prometheus metrics.

Minimal example configuration:

instances:
- openmetrics_endpoint: http://example.com/endpoint
  namespace: "foobar"
  metrics:
  - bar
  - foo
Source code in
class OpenMetricsBaseCheckV2(AgentCheck):
    """
    OpenMetricsBaseCheckV2 is an updated class of OpenMetricsBaseCheck to scrape endpoints that emit Prometheus metrics.

    Minimal example configuration:

    ```yaml
    instances:
    - openmetrics_endpoint: http://example.com/endpoint
      namespace: "foobar"
      metrics:
      - bar
      - foo
    ```

    """

    DEFAULT_METRIC_LIMIT = 2000

    # Allow tracing for openmetrics integrations
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        return traced_class(cls)

    def __init__(self, name, init_config, instances):
        """
        The base class for any OpenMetrics-based integration.
        """
        super(OpenMetricsBaseCheckV2, self).__init__(name, init_config, instances)

        # All desired scraper configurations, which subclasses can override as needed
        self.scraper_configs = [self.instance]

        # All configured scrapers keyed by the endpoint
        self.scrapers = {}

        self.check_initializations.append(self.configure_scrapers)

    def check(self, _):
        self.refresh_scrapers()

        for endpoint, scraper in self.scrapers.items():
            self.log.info('Scraping OpenMetrics endpoint: %s', endpoint)

            with self.adopt_namespace(scraper.namespace):
                scraper.scrape()

    def configure_scrapers(self):
        """
        Creates a scraper configuration for each instance.
        """

        scrapers = {}

        for config in self.scraper_configs:
            endpoint = config.get('openmetrics_endpoint', '')
            if not isinstance(endpoint, str):
                raise ConfigurationError('The setting `openmetrics_endpoint` must be a string')
            elif not endpoint:
                raise ConfigurationError('The setting `openmetrics_endpoint` is required')

            scrapers[endpoint] = self.create_scraper(config)

        self.scrapers.clear()
        self.scrapers.update(scrapers)

    def create_scraper(self, config):
        # Subclasses can override to return a custom scraper based on configuration
        return OpenMetricsScraper(self, self.get_config_with_defaults(config))

    def set_dynamic_tags(self, *tags):
        for scraper in self.scrapers.values():
            scraper.set_dynamic_tags(*tags)

    def get_config_with_defaults(self, config):
        return ChainMap(config, self.get_default_config())

    def get_default_config(self):
        return {}

    def refresh_scrapers(self):
        pass

    @contextmanager
    def adopt_namespace(self, namespace):
        old_namespace = self.__NAMESPACE__

        try:
            self.__NAMESPACE__ = namespace or old_namespace
            yield
        finally:
            self.__NAMESPACE__ = old_namespace

__init__(self, name, init_config, instances) special

The base class for any OpenMetrics-based integration.

Source code in
def __init__(self, name, init_config, instances):
    """
    The base class for any OpenMetrics-based integration.
    """
    super(OpenMetricsBaseCheckV2, self).__init__(name, init_config, instances)

    # All desired scraper configurations, which subclasses can override as needed
    self.scraper_configs = [self.instance]

    # All configured scrapers keyed by the endpoint
    self.scrapers = {}

    self.check_initializations.append(self.configure_scrapers)

check(self, _)

Source code in
def check(self, _):
    self.refresh_scrapers()

    for endpoint, scraper in self.scrapers.items():
        self.log.info('Scraping OpenMetrics endpoint: %s', endpoint)

        with self.adopt_namespace(scraper.namespace):
            scraper.scrape()

configure_scrapers(self)

Creates a scraper configuration for each instance.

Source code in
def configure_scrapers(self):
    """
    Creates a scraper configuration for each instance.
    """

    scrapers = {}

    for config in self.scraper_configs:
        endpoint = config.get('openmetrics_endpoint', '')
        if not isinstance(endpoint, str):
            raise ConfigurationError('The setting `openmetrics_endpoint` must be a string')
        elif not endpoint:
            raise ConfigurationError('The setting `openmetrics_endpoint` is required')

        scrapers[endpoint] = self.create_scraper(config)

    self.scrapers.clear()
    self.scrapers.update(scrapers)

Scrapers

datadog_checks.base.checks.openmetrics.v2.scraper.OpenMetricsScraper

OpenMetricsScraper is a class that can be used to override the default scraping behavior for OpenMetricsBaseCheckV2.

Minimal example configuration:

- openmetrics_endpoint: http://example.com/endpoint
  namespace: "foobar"
  metrics:
  - bar
  - foo
  raw_metric_prefix: "test"
  telemetry: "true"
  hostname_label: node
Source code in
class OpenMetricsScraper:
    """
    OpenMetricsScraper is a class that can be used to override the default scraping behavior for OpenMetricsBaseCheckV2.

    Minimal example configuration:

    ```yaml
    - openmetrics_endpoint: http://example.com/endpoint
      namespace: "foobar"
      metrics:
      - bar
      - foo
      raw_metric_prefix: "test"
      telemetry: "true"
      hostname_label: node
    ```

    """

    SERVICE_CHECK_HEALTH = 'openmetrics.health'

    def __init__(self, check, config):
        """
        The base class for any scraper overrides.
        """

        self.config = config

        # Save a reference to the check instance
        self.check = check

        # Parse the configuration
        self.endpoint = config['openmetrics_endpoint']

        self.metric_transformer = MetricTransformer(self.check, config)
        self.label_aggregator = LabelAggregator(self.check, config)

        self.enable_telemetry = is_affirmative(config.get('telemetry', False))
        # Make every telemetry submission method a no-op to avoid many lookups of `self.enable_telemetry`
        if not self.enable_telemetry:
            for name, _ in inspect.getmembers(self, predicate=inspect.ismethod):
                if name.startswith('submit_telemetry_'):
                    setattr(self, name, no_op)

        # Prevent overriding an integration's defined namespace
        self.namespace = check.__NAMESPACE__ or config.get('namespace', '')
        if not isinstance(self.namespace, str):
            raise ConfigurationError('Setting `namespace` must be a string')

        self.raw_metric_prefix = config.get('raw_metric_prefix', '')
        if not isinstance(self.raw_metric_prefix, str):
            raise ConfigurationError('Setting `raw_metric_prefix` must be a string')

        self.enable_health_service_check = is_affirmative(config.get('enable_health_service_check', True))

        self.hostname_label = config.get('hostname_label', '')
        if not isinstance(self.hostname_label, str):
            raise ConfigurationError('Setting `hostname_label` must be a string')

        hostname_format = config.get('hostname_format', '')
        if not isinstance(hostname_format, str):
            raise ConfigurationError('Setting `hostname_format` must be a string')

        self.hostname_formatter = None
        if self.hostname_label and hostname_format:
            placeholder = '<HOSTNAME>'
            if placeholder not in hostname_format:
                raise ConfigurationError(f'Setting `hostname_format` does not contain the placeholder `{placeholder}`')

            self.hostname_formatter = lambda hostname: hostname_format.replace('<HOSTNAME>', hostname, 1)

        exclude_labels = config.get('exclude_labels', [])
        if not isinstance(exclude_labels, list):
            raise ConfigurationError('Setting `exclude_labels` must be an array')

        self.exclude_labels = set()
        for i, entry in enumerate(exclude_labels, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(f'Entry #{i} of setting `exclude_labels` must be a string')

            self.exclude_labels.add(entry)

        include_labels = config.get('include_labels', [])
        if not isinstance(include_labels, list):
            raise ConfigurationError('Setting `include_labels` must be an array')
        self.include_labels = set()
        for i, entry in enumerate(include_labels, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(f'Entry #{i} of setting `include_labels` must be a string')
            if entry in self.exclude_labels:
                self.log.debug(
                    'Label `%s` is set in both `exclude_labels` and `include_labels`. Excluding label.', entry
                )
            self.include_labels.add(entry)

        self.rename_labels = config.get('rename_labels', {})
        if not isinstance(self.rename_labels, dict):
            raise ConfigurationError('Setting `rename_labels` must be a mapping')

        for key, value in self.rename_labels.items():
            if not isinstance(value, str):
                raise ConfigurationError(f'Value for label `{key}` of setting `rename_labels` must be a string')

        exclude_metrics = config.get('exclude_metrics', [])
        if not isinstance(exclude_metrics, list):
            raise ConfigurationError('Setting `exclude_metrics` must be an array')

        self.exclude_metrics = set()
        self.exclude_metrics_pattern = None
        exclude_metrics_patterns = []
        for i, entry in enumerate(exclude_metrics, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(f'Entry #{i} of setting `exclude_metrics` must be a string')

            escaped_entry = re.escape(entry)
            if entry == escaped_entry:
                self.exclude_metrics.add(entry)
            else:
                exclude_metrics_patterns.append(entry)

        if exclude_metrics_patterns:
            self.exclude_metrics_pattern = re.compile('|'.join(exclude_metrics_patterns))

        self.exclude_metrics_by_labels = {}
        exclude_metrics_by_labels = config.get('exclude_metrics_by_labels', {})
        if not isinstance(exclude_metrics_by_labels, dict):
            raise ConfigurationError('Setting `exclude_metrics_by_labels` must be a mapping')
        elif exclude_metrics_by_labels:
            for label, values in exclude_metrics_by_labels.items():
                if values is True:
                    self.exclude_metrics_by_labels[label] = return_true
                elif isinstance(values, list):
                    for i, value in enumerate(values, 1):
                        if not isinstance(value, str):
                            raise ConfigurationError(
                                f'Value #{i} for label `{label}` of setting `exclude_metrics_by_labels` '
                                f'must be a string'
                            )

                    self.exclude_metrics_by_labels[label] = (
                        lambda label_value, pattern=re.compile('|'.join(values)): pattern.search(label_value)
                        is not None
                    )
                else:
                    raise ConfigurationError(
                        f'Label `{label}` of setting `exclude_metrics_by_labels` must be an array or set to `true`'
                    )

        custom_tags = config.get('tags', [])
        if not isinstance(custom_tags, list):
            raise ConfigurationError('Setting `tags` must be an array')

        for i, entry in enumerate(custom_tags, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(f'Entry #{i} of setting `tags` must be a string')

        # Some tags can be ignored to reduce the cardinality.
        # This can be useful for cost optimization in containerized environments
        # when the openmetrics check is configured to collect custom metrics.
        # Even when the Agent's Tagger is configured to add low-cardinality tags only,
        # some tags can still generate unwanted metric contexts (e.g pod annotations as tags).
        ignore_tags = config.get('ignore_tags', [])
        if ignore_tags:
            ignored_tags_re = re.compile('|'.join(set(ignore_tags)))
            custom_tags = [tag for tag in custom_tags if not ignored_tags_re.search(tag)]

        # These will be applied only to service checks
        self.static_tags = [f'endpoint:{self.endpoint}']
        self.static_tags.extend(custom_tags)
        self.static_tags = tuple(self.static_tags)

        # These will be applied to everything except service checks
        self.tags = self.static_tags

        self.raw_line_filter = None
        raw_line_filters = config.get('raw_line_filters', [])
        if not isinstance(raw_line_filters, list):
            raise ConfigurationError('Setting `raw_line_filters` must be an array')
        elif raw_line_filters:
            for i, entry in enumerate(raw_line_filters, 1):
                if not isinstance(entry, str):
                    raise ConfigurationError(f'Entry #{i} of setting `raw_line_filters` must be a string')

            self.raw_line_filter = re.compile('|'.join(raw_line_filters))

        self.http = RequestsWrapper(config, self.check.init_config, self.check.HTTP_CONFIG_REMAPPER, self.check.log)

        # Decide how strictly we will adhere to the latest version of the specification
        if is_affirmative(config.get('use_latest_spec', False)):
            self.parse_metric_families = parse_metric_families_strict
            # https://github.com/prometheus/client_python/blob/v0.9.0/prometheus_client/openmetrics/exposition.py#L7
            self.http.options['headers'].setdefault(
                'Accept', 'application/openmetrics-text; version=0.0.1; charset=utf-8'
            )
        else:
            self.parse_metric_families = parse_metric_families
            self.http.options['headers'].setdefault('Accept', 'text/plain')

        self.use_process_start_time = is_affirmative(config.get('use_process_start_time'))

        # Used for monotonic counts
        self.flush_first_value = False

    def scrape(self):
        """
        Execute a scrape, and for each metric collected, transform the metric.
        """
        runtime_data = {'flush_first_value': self.flush_first_value, 'static_tags': self.static_tags}

        for metric in self.consume_metrics(runtime_data):
            transformer = self.metric_transformer.get(metric)
            if transformer is None:
                continue

            transformer(metric, self.generate_sample_data(metric), runtime_data)

        self.flush_first_value = True

    def consume_metrics(self, runtime_data):
        """
        Yield the processed metrics and filter out excluded metrics.
        """

        metric_parser = self.parse_metrics()
        if not self.flush_first_value and self.use_process_start_time:
            metric_parser = first_scrape_handler(metric_parser, runtime_data, datadog_agent.get_process_start_time())
        if self.label_aggregator.configured:
            metric_parser = self.label_aggregator(metric_parser)

        for metric in metric_parser:
            if metric.name in self.exclude_metrics or (
                self.exclude_metrics_pattern is not None and self.exclude_metrics_pattern.search(metric.name)
            ):
                self.submit_telemetry_number_of_ignored_metric_samples(metric)
                continue

            yield metric

    def parse_metrics(self):
        """
        Get the line streamer and yield processed metrics.
        """

        line_streamer = self.stream_connection_lines()
        if self.raw_line_filter is not None:
            line_streamer = self.filter_connection_lines(line_streamer)

        for metric in self.parse_metric_families(line_streamer):
            self.submit_telemetry_number_of_total_metric_samples(metric)

            # It is critical that the prefix is removed immediately so that
            # all other configuration may reference the trimmed metric name
            if self.raw_metric_prefix and metric.name.startswith(self.raw_metric_prefix):
                metric.name = metric.name[len(self.raw_metric_prefix) :]

            yield metric

    def generate_sample_data(self, metric):
        """
        Yield a sample of processed data.
        """

        label_normalizer = get_label_normalizer(metric.type)

        for sample in metric.samples:
            value = sample.value
            if isnan(value) or isinf(value):
                self.log.debug('Ignoring sample for metric `%s` as it has an invalid value: %s', metric.name, value)
                continue

            tags = []
            skip_sample = False
            labels = sample.labels
            self.label_aggregator.populate(labels)
            label_normalizer(labels)

            for label_name, label_value in labels.items():
                sample_excluder = self.exclude_metrics_by_labels.get(label_name)
                if sample_excluder is not None and sample_excluder(label_value):
                    skip_sample = True
                    break
                elif label_name in self.exclude_labels:
                    continue
                elif self.include_labels and label_name not in self.include_labels:
                    continue

                label_name = self.rename_labels.get(label_name, label_name)
                tags.append(f'{label_name}:{label_value}')

            if skip_sample:
                continue

            tags.extend(self.tags)

            hostname = self.hostname
            if self.hostname_label and self.hostname_label in labels:
                hostname = labels[self.hostname_label]
                if self.hostname_formatter is not None:
                    hostname = self.hostname_formatter(hostname)

            self.submit_telemetry_number_of_processed_metric_samples()
            yield sample, tags, hostname

    def stream_connection_lines(self):
        """
        Yield the connection line.
        """

        with self.get_connection() as connection:
            for line in connection.iter_lines(decode_unicode=True):
                yield line

    def filter_connection_lines(self, line_streamer):
        """
        Filter connection lines in the line streamer.
        """

        for line in line_streamer:
            if self.raw_line_filter.search(line):
                self.submit_telemetry_number_of_ignored_lines()
            else:
                yield line

    def get_connection(self):
        """
        Send a request to scrape metrics. Return the response or throw an exception.
        """

        try:
            response = self.send_request()
        except Exception as e:
            self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
            raise
        else:
            try:
                response.raise_for_status()
            except Exception as e:
                self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
                response.close()
                raise
            else:
                self.submit_health_check(ServiceCheck.OK)

                # Never derive the encoding from the locale
                if response.encoding is None:
                    response.encoding = 'utf-8'

                self.submit_telemetry_endpoint_response_size(response)
                return response

    def send_request(self, **kwargs):
        """
        Send an HTTP GET request to the `openmetrics_endpoint` value.
        """

        kwargs['stream'] = True
        return self.http.get(self.endpoint, **kwargs)

    def set_dynamic_tags(self, *tags):
        """
        Set dynamic tags.
        """

        self.tags = tuple(chain(self.static_tags, tags))

    def submit_health_check(self, status, **kwargs):
        """
        If health service check is enabled, send an `openmetrics.health` service check.
        """

        if self.enable_health_service_check:
            self.service_check(self.SERVICE_CHECK_HEALTH, status, tags=self.static_tags, **kwargs)

    def submit_telemetry_number_of_total_metric_samples(self, metric):
        self.count('telemetry.metrics.input.count', len(metric.samples), tags=self.tags)

    def submit_telemetry_number_of_ignored_metric_samples(self, metric):
        self.count('telemetry.metrics.ignored.count', len(metric.samples), tags=self.tags)

    def submit_telemetry_number_of_processed_metric_samples(self):
        self.count('telemetry.metrics.processed.count', 1, tags=self.tags)

    def submit_telemetry_number_of_ignored_lines(self):
        self.count('telemetry.metrics.blacklist.count', 1, tags=self.tags)

    def submit_telemetry_endpoint_response_size(self, response):
        content_length = response.headers.get('Content-Length')
        if content_length is not None:
            content_length = int(content_length)
        else:
            content_length = len(response.content)

        self.gauge('telemetry.payload.size', content_length, tags=self.tags)

    def __getattr__(self, name):
        # Forward all unknown attribute lookups to the check instance for access to submission methods, hostname, etc.
        attribute = getattr(self.check, name)
        setattr(self, name, attribute)
        return attribute

__init__(self, check, config) special

The base class for any scraper overrides.

Source code in
def __init__(self, check, config):
    """
    The base class for any scraper overrides.
    """

    self.config = config

    # Save a reference to the check instance
    self.check = check

    # Parse the configuration
    self.endpoint = config['openmetrics_endpoint']

    self.metric_transformer = MetricTransformer(self.check, config)
    self.label_aggregator = LabelAggregator(self.check, config)

    self.enable_telemetry = is_affirmative(config.get('telemetry', False))
    # Make every telemetry submission method a no-op to avoid many lookups of `self.enable_telemetry`
    if not self.enable_telemetry:
        for name, _ in inspect.getmembers(self, predicate=inspect.ismethod):
            if name.startswith('submit_telemetry_'):
                setattr(self, name, no_op)

    # Prevent overriding an integration's defined namespace
    self.namespace = check.__NAMESPACE__ or config.get('namespace', '')
    if not isinstance(self.namespace, str):
        raise ConfigurationError('Setting `namespace` must be a string')

    self.raw_metric_prefix = config.get('raw_metric_prefix', '')
    if not isinstance(self.raw_metric_prefix, str):
        raise ConfigurationError('Setting `raw_metric_prefix` must be a string')

    self.enable_health_service_check = is_affirmative(config.get('enable_health_service_check', True))

    self.hostname_label = config.get('hostname_label', '')
    if not isinstance(self.hostname_label, str):
        raise ConfigurationError('Setting `hostname_label` must be a string')

    hostname_format = config.get('hostname_format', '')
    if not isinstance(hostname_format, str):
        raise ConfigurationError('Setting `hostname_format` must be a string')

    self.hostname_formatter = None
    if self.hostname_label and hostname_format:
        placeholder = '<HOSTNAME>'
        if placeholder not in hostname_format:
            raise ConfigurationError(f'Setting `hostname_format` does not contain the placeholder `{placeholder}`')

        self.hostname_formatter = lambda hostname: hostname_format.replace('<HOSTNAME>', hostname, 1)

    exclude_labels = config.get('exclude_labels', [])
    if not isinstance(exclude_labels, list):
        raise ConfigurationError('Setting `exclude_labels` must be an array')

    self.exclude_labels = set()
    for i, entry in enumerate(exclude_labels, 1):
        if not isinstance(entry, str):
            raise ConfigurationError(f'Entry #{i} of setting `exclude_labels` must be a string')

        self.exclude_labels.add(entry)

    include_labels = config.get('include_labels', [])
    if not isinstance(include_labels, list):
        raise ConfigurationError('Setting `include_labels` must be an array')
    self.include_labels = set()
    for i, entry in enumerate(include_labels, 1):
        if not isinstance(entry, str):
            raise ConfigurationError(f'Entry #{i} of setting `include_labels` must be a string')
        if entry in self.exclude_labels:
            self.log.debug(
                'Label `%s` is set in both `exclude_labels` and `include_labels`. Excluding label.', entry
            )
        self.include_labels.add(entry)

    self.rename_labels = config.get('rename_labels', {})
    if not isinstance(self.rename_labels, dict):
        raise ConfigurationError('Setting `rename_labels` must be a mapping')

    for key, value in self.rename_labels.items():
        if not isinstance(value, str):
            raise ConfigurationError(f'Value for label `{key}` of setting `rename_labels` must be a string')

    exclude_metrics = config.get('exclude_metrics', [])
    if not isinstance(exclude_metrics, list):
        raise ConfigurationError('Setting `exclude_metrics` must be an array')

    self.exclude_metrics = set()
    self.exclude_metrics_pattern = None
    exclude_metrics_patterns = []
    for i, entry in enumerate(exclude_metrics, 1):
        if not isinstance(entry, str):
            raise ConfigurationError(f'Entry #{i} of setting `exclude_metrics` must be a string')

        escaped_entry = re.escape(entry)
        if entry == escaped_entry:
            self.exclude_metrics.add(entry)
        else:
            exclude_metrics_patterns.append(entry)

    if exclude_metrics_patterns:
        self.exclude_metrics_pattern = re.compile('|'.join(exclude_metrics_patterns))

    self.exclude_metrics_by_labels = {}
    exclude_metrics_by_labels = config.get('exclude_metrics_by_labels', {})
    if not isinstance(exclude_metrics_by_labels, dict):
        raise ConfigurationError('Setting `exclude_metrics_by_labels` must be a mapping')
    elif exclude_metrics_by_labels:
        for label, values in exclude_metrics_by_labels.items():
            if values is True:
                self.exclude_metrics_by_labels[label] = return_true
            elif isinstance(values, list):
                for i, value in enumerate(values, 1):
                    if not isinstance(value, str):
                        raise ConfigurationError(
                            f'Value #{i} for label `{label}` of setting `exclude_metrics_by_labels` '
                            f'must be a string'
                        )

                self.exclude_metrics_by_labels[label] = (
                    lambda label_value, pattern=re.compile('|'.join(values)): pattern.search(label_value)
                    is not None
                )
            else:
                raise ConfigurationError(
                    f'Label `{label}` of setting `exclude_metrics_by_labels` must be an array or set to `true`'
                )

    custom_tags = config.get('tags', [])
    if not isinstance(custom_tags, list):
        raise ConfigurationError('Setting `tags` must be an array')

    for i, entry in enumerate(custom_tags, 1):
        if not isinstance(entry, str):
            raise ConfigurationError(f'Entry #{i} of setting `tags` must be a string')

    # Some tags can be ignored to reduce the cardinality.
    # This can be useful for cost optimization in containerized environments
    # when the openmetrics check is configured to collect custom metrics.
    # Even when the Agent's Tagger is configured to add low-cardinality tags only,
    # some tags can still generate unwanted metric contexts (e.g pod annotations as tags).
    ignore_tags = config.get('ignore_tags', [])
    if ignore_tags:
        ignored_tags_re = re.compile('|'.join(set(ignore_tags)))
        custom_tags = [tag for tag in custom_tags if not ignored_tags_re.search(tag)]

    # These will be applied only to service checks
    self.static_tags = [f'endpoint:{self.endpoint}']
    self.static_tags.extend(custom_tags)
    self.static_tags = tuple(self.static_tags)

    # These will be applied to everything except service checks
    self.tags = self.static_tags

    self.raw_line_filter = None
    raw_line_filters = config.get('raw_line_filters', [])
    if not isinstance(raw_line_filters, list):
        raise ConfigurationError('Setting `raw_line_filters` must be an array')
    elif raw_line_filters:
        for i, entry in enumerate(raw_line_filters, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(f'Entry #{i} of setting `raw_line_filters` must be a string')

        self.raw_line_filter = re.compile('|'.join(raw_line_filters))

    self.http = RequestsWrapper(config, self.check.init_config, self.check.HTTP_CONFIG_REMAPPER, self.check.log)

    # Decide how strictly we will adhere to the latest version of the specification
    if is_affirmative(config.get('use_latest_spec', False)):
        self.parse_metric_families = parse_metric_families_strict
        # https://github.com/prometheus/client_python/blob/v0.9.0/prometheus_client/openmetrics/exposition.py#L7
        self.http.options['headers'].setdefault(
            'Accept', 'application/openmetrics-text; version=0.0.1; charset=utf-8'
        )
    else:
        self.parse_metric_families = parse_metric_families
        self.http.options['headers'].setdefault('Accept', 'text/plain')

    self.use_process_start_time = is_affirmative(config.get('use_process_start_time'))

    # Used for monotonic counts
    self.flush_first_value = False

consume_metrics(self, runtime_data)

Yield the processed metrics and filter out excluded metrics.

Source code in
def consume_metrics(self, runtime_data):
    """
    Yield the processed metrics and filter out excluded metrics.
    """

    metric_parser = self.parse_metrics()
    if not self.flush_first_value and self.use_process_start_time:
        metric_parser = first_scrape_handler(metric_parser, runtime_data, datadog_agent.get_process_start_time())
    if self.label_aggregator.configured:
        metric_parser = self.label_aggregator(metric_parser)

    for metric in metric_parser:
        if metric.name in self.exclude_metrics or (
            self.exclude_metrics_pattern is not None and self.exclude_metrics_pattern.search(metric.name)
        ):
            self.submit_telemetry_number_of_ignored_metric_samples(metric)
            continue

        yield metric

filter_connection_lines(self, line_streamer)

Filter connection lines in the line streamer.

Source code in
def filter_connection_lines(self, line_streamer):
    """
    Filter connection lines in the line streamer.
    """

    for line in line_streamer:
        if self.raw_line_filter.search(line):
            self.submit_telemetry_number_of_ignored_lines()
        else:
            yield line

generate_sample_data(self, metric)

Yield a sample of processed data.

Source code in
def generate_sample_data(self, metric):
    """
    Yield a sample of processed data.
    """

    label_normalizer = get_label_normalizer(metric.type)

    for sample in metric.samples:
        value = sample.value
        if isnan(value) or isinf(value):
            self.log.debug('Ignoring sample for metric `%s` as it has an invalid value: %s', metric.name, value)
            continue

        tags = []
        skip_sample = False
        labels = sample.labels
        self.label_aggregator.populate(labels)
        label_normalizer(labels)

        for label_name, label_value in labels.items():
            sample_excluder = self.exclude_metrics_by_labels.get(label_name)
            if sample_excluder is not None and sample_excluder(label_value):
                skip_sample = True
                break
            elif label_name in self.exclude_labels:
                continue
            elif self.include_labels and label_name not in self.include_labels:
                continue

            label_name = self.rename_labels.get(label_name, label_name)
            tags.append(f'{label_name}:{label_value}')

        if skip_sample:
            continue

        tags.extend(self.tags)

        hostname = self.hostname
        if self.hostname_label and self.hostname_label in labels:
            hostname = labels[self.hostname_label]
            if self.hostname_formatter is not None:
                hostname = self.hostname_formatter(hostname)

        self.submit_telemetry_number_of_processed_metric_samples()
        yield sample, tags, hostname

get_connection(self)

Send a request to scrape metrics. Return the response or throw an exception.

Source code in
def get_connection(self):
    """
    Send a request to scrape metrics. Return the response or throw an exception.
    """

    try:
        response = self.send_request()
    except Exception as e:
        self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
        raise
    else:
        try:
            response.raise_for_status()
        except Exception as e:
            self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
            response.close()
            raise
        else:
            self.submit_health_check(ServiceCheck.OK)

            # Never derive the encoding from the locale
            if response.encoding is None:
                response.encoding = 'utf-8'

            self.submit_telemetry_endpoint_response_size(response)
            return response

parse_metrics(self)

Get the line streamer and yield processed metrics.

Source code in
def parse_metrics(self):
    """
    Get the line streamer and yield processed metrics.
    """

    line_streamer = self.stream_connection_lines()
    if self.raw_line_filter is not None:
        line_streamer = self.filter_connection_lines(line_streamer)

    for metric in self.parse_metric_families(line_streamer):
        self.submit_telemetry_number_of_total_metric_samples(metric)

        # It is critical that the prefix is removed immediately so that
        # all other configuration may reference the trimmed metric name
        if self.raw_metric_prefix and metric.name.startswith(self.raw_metric_prefix):
            metric.name = metric.name[len(self.raw_metric_prefix) :]

        yield metric

scrape(self)

Execute a scrape, and for each metric collected, transform the metric.

Source code in
def scrape(self):
    """
    Execute a scrape, and for each metric collected, transform the metric.
    """
    runtime_data = {'flush_first_value': self.flush_first_value, 'static_tags': self.static_tags}

    for metric in self.consume_metrics(runtime_data):
        transformer = self.metric_transformer.get(metric)
        if transformer is None:
            continue

        transformer(metric, self.generate_sample_data(metric), runtime_data)

    self.flush_first_value = True

set_dynamic_tags(self, *tags)

Set dynamic tags.

Source code in
def set_dynamic_tags(self, *tags):
    """
    Set dynamic tags.
    """

    self.tags = tuple(chain(self.static_tags, tags))

stream_connection_lines(self)

Yield the connection line.

Source code in
def stream_connection_lines(self):
    """
    Yield the connection line.
    """

    with self.get_connection() as connection:
        for line in connection.iter_lines(decode_unicode=True):
            yield line

submit_health_check(self, status, **kwargs)

If health service check is enabled, send an openmetrics.health service check.

Source code in
def submit_health_check(self, status, **kwargs):
    """
    If health service check is enabled, send an `openmetrics.health` service check.
    """

    if self.enable_health_service_check:
        self.service_check(self.SERVICE_CHECK_HEALTH, status, tags=self.static_tags, **kwargs)

Transformers

datadog_checks.base.checks.openmetrics.v2.transform.Transformers

Source code in
class Transformers(object):
    pass

counter(check, metric_name, modifiers, global_options)

https://prometheus.io/docs/concepts/metric_types/#counter https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#counter-1

Source code in
def get_counter(check, metric_name, modifiers, global_options):
    """
    https://prometheus.io/docs/concepts/metric_types/#counter
    https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#counter-1
    """
    monotonic_count_method = check.monotonic_count
    metric_name = f'{metric_name}.count'

    def counter(metric, sample_data, runtime_data):
        flush_first_value = runtime_data['flush_first_value']

        for sample, tags, hostname in sample_data:
            if sample.name.endswith('_total'):
                monotonic_count_method(
                    metric_name,
                    sample.value,
                    tags=tags,
                    hostname=hostname,
                    flush_first_value=flush_first_value,
                )

    del check
    del modifiers
    del global_options
    return counter

counter_gauge(check, metric_name, modifiers, global_options)

This submits metrics as both a monotonic_count suffixed by .count and a gauge suffixed by .total.

Source code in
def get_counter_gauge(check, metric_name, modifiers, global_options):
    """
    This submits metrics as both a `monotonic_count` suffixed by `.count` and a `gauge` suffixed by `.total`.
    """
    gauge_method = check.gauge
    monotonic_count_method = check.monotonic_count

    total_metric = f'{metric_name}.total'
    count_metric = f'{metric_name}.count'

    def counter_gauge(metric, sample_data, runtime_data):
        flush_first_value = runtime_data['flush_first_value']

        for sample, tags, hostname in sample_data:
            gauge_method(total_metric, sample.value, tags=tags, hostname=hostname)
            monotonic_count_method(
                count_metric,
                sample.value,
                tags=tags,
                hostname=hostname,
                flush_first_value=flush_first_value,
            )

    del check
    del metric_name
    del modifiers
    del global_options
    return counter_gauge

gauge(check, metric_name, modifiers, global_options)

https://prometheus.io/docs/concepts/metric_types/#gauge https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#gauge-1

Source code in
def get_gauge(check, metric_name, modifiers, global_options):
    """
    https://prometheus.io/docs/concepts/metric_types/#gauge
    https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#gauge-1
    """
    gauge_method = check.gauge

    def gauge(metric, sample_data, runtime_data):
        for sample, tags, hostname in sample_data:
            gauge_method(metric_name, sample.value, tags=tags, hostname=hostname)

    del check
    del modifiers
    del global_options
    return gauge

histogram(check, metric_name, modifiers, global_options)

https://prometheus.io/docs/concepts/metric_types/#histogram https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#histogram-1

Source code in
def get_histogram(check, metric_name, modifiers, global_options):
    """
    https://prometheus.io/docs/concepts/metric_types/#histogram
    https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#histogram-1
    """
    if global_options['collect_histogram_buckets']:
        if global_options['histogram_buckets_as_distributions']:
            logger = check.log
            submit_histogram_bucket_method = check.submit_histogram_bucket

            if global_options['collect_counters_with_distributions']:
                monotonic_count_method = check.monotonic_count
                sum_metric = f'{metric_name}.sum'
                count_metric = f'{metric_name}.count'

                def histogram(metric, sample_data, runtime_data):
                    flush_first_value = runtime_data['flush_first_value']

                    for sample, tags, hostname in decumulate_histogram_buckets(sample_data):
                        sample_name = sample.name
                        if sample_name.endswith('_sum'):
                            monotonic_count_method(
                                sum_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )
                        elif sample_name.endswith('_count'):
                            monotonic_count_method(
                                count_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )
                        elif sample_name.endswith('_bucket'):
                            lower_bound = canonicalize_numeric_label(sample.labels['lower_bound'])
                            upper_bound = canonicalize_numeric_label(sample.labels['upper_bound'])

                            if lower_bound == upper_bound:
                                # this can happen for -inf/-inf bucket that we don't want to send (always 0)
                                logger.warning(
                                    'Metric: %s has bucket boundaries equal, skipping: %s', metric_name, sample.labels
                                )
                                continue

                            submit_histogram_bucket_method(
                                metric_name,
                                sample.value,
                                lower_bound,
                                upper_bound,
                                True,
                                hostname,
                                tags,
                                flush_first_value=flush_first_value,
                            )

            else:

                def histogram(metric, sample_data, runtime_data):
                    flush_first_value = runtime_data['flush_first_value']

                    for sample, tags, hostname in decumulate_histogram_buckets(sample_data):
                        if not sample.name.endswith('_bucket'):
                            continue

                        lower_bound = canonicalize_numeric_label(sample.labels['lower_bound'])
                        upper_bound = canonicalize_numeric_label(sample.labels['upper_bound'])

                        if lower_bound == upper_bound:
                            # this can happen for -inf/-inf bucket that we don't want to send (always 0)
                            logger.warning(
                                'Metric: %s has bucket boundaries equal, skipping: %s', metric_name, sample.labels
                            )
                            continue

                        submit_histogram_bucket_method(
                            metric_name,
                            sample.value,
                            lower_bound,
                            upper_bound,
                            True,
                            hostname,
                            tags,
                            flush_first_value=flush_first_value,
                        )

        else:
            monotonic_count_method = check.monotonic_count
            bucket_metric = f'{metric_name}.bucket'
            sum_metric = f'{metric_name}.sum'
            count_metric = f'{metric_name}.count'

            if global_options['non_cumulative_histogram_buckets']:

                def histogram(metric, sample_data, runtime_data):
                    flush_first_value = runtime_data['flush_first_value']

                    for sample, tags, hostname in decumulate_histogram_buckets(sample_data):
                        sample_name = sample.name
                        if sample_name.endswith('_sum'):
                            monotonic_count_method(
                                sum_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )
                        elif sample_name.endswith('_count'):
                            monotonic_count_method(
                                count_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )
                        # Skip infinity upper bound as that is otherwise the
                        # same context as the sample suffixed by `_count`
                        elif sample_name.endswith('_bucket') and not sample.labels['upper_bound'].endswith('inf'):
                            monotonic_count_method(
                                bucket_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )

            # Default behavior
            else:

                def histogram(metric, sample_data, runtime_data):
                    flush_first_value = runtime_data['flush_first_value']

                    for sample, tags, hostname in sample_data:
                        sample_name = sample.name
                        if sample_name.endswith('_sum'):
                            monotonic_count_method(
                                sum_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )
                        elif sample_name.endswith('_count'):
                            monotonic_count_method(
                                count_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )
                        # Skip infinity upper bound as that is otherwise the
                        # same context as the sample suffixed by `_count`
                        elif sample_name.endswith('_bucket') and not sample.labels['upper_bound'].endswith('inf'):
                            monotonic_count_method(
                                bucket_metric,
                                sample.value,
                                tags=tags,
                                hostname=hostname,
                                flush_first_value=flush_first_value,
                            )

    else:
        monotonic_count_method = check.monotonic_count
        sum_metric = f'{metric_name}.sum'
        count_metric = f'{metric_name}.count'

        def histogram(metric, sample_data, runtime_data):
            flush_first_value = runtime_data['flush_first_value']

            for sample, tags, hostname in sample_data:
                sample_name = sample.name
                if sample_name.endswith('_sum'):
                    monotonic_count_method(
                        sum_metric,
                        sample.value,
                        tags=tags,
                        hostname=hostname,
                        flush_first_value=flush_first_value,
                    )
                elif sample_name.endswith('_count'):
                    monotonic_count_method(
                        count_metric,
                        sample.value,
                        tags=tags,
                        hostname=hostname,
                        flush_first_value=flush_first_value,
                    )

    del check
    del modifiers
    del global_options
    return histogram

metadata(check, metric_name, modifiers, global_options)

This allows for the submission of instance metadata like a product's version. The required modifier label indicates which label contains the desired information. For more information, see: https://datadoghq.dev/integrations-core/base/metadata/

Source code in
def get_metadata(check, metric_name, modifiers, global_options):
    """
    This allows for the submission of instance metadata like a product's version. The required modifier
    `label` indicates which label contains the desired information. For more information, see:
    https://datadoghq.dev/integrations-core/base/metadata/
    """
    set_metadata_method = check.set_metadata

    options = deepcopy(modifiers)
    label = options.pop('label', '')
    if not isinstance(label, str):
        raise TypeError('the `label` parameter must be a string')
    elif not label:
        raise ValueError('the `label` parameter is required')

    def metadata(metric, sample_data, runtime_data):
        for sample, _tags, _hostname in sample_data:
            set_metadata_method(metric_name, sample.labels[label], **options)

    del check
    del modifiers
    del global_options
    return metadata

native(check, metric_name, modifiers, global_options)

Uses whatever the endpoint describes as the metric type in the first occurrence.

Source code in
def get_native_transformer(check, metric_name, modifiers, global_options):
    """
    Uses whatever the endpoint describes as the metric type in the first occurrence.
    """
    transformer = None

    def native(metric, sample_data, runtime_data):
        nonlocal transformer
        if transformer is None:
            transformer = NATIVE_TRANSFORMERS[metric.type](check, metric_name, modifiers, global_options)

        transformer(metric, sample_data, runtime_data)

    return native

native_dynamic(check, metric_name, modifiers, global_options)

Uses whatever the endpoint describes as the metric type.

Source code in
def get_native_dynamic_transformer(check, metric_name, modifiers, global_options):
    """
    Uses whatever the endpoint describes as the metric type.
    """
    cached_transformers = {}

    def native_dynamic(metric, sample_data, runtime_data):
        transformer = cached_transformers.get(metric.type)
        if transformer is None:
            transformer = NATIVE_TRANSFORMERS[metric.type](check, metric_name, modifiers, global_options)
            cached_transformers[metric.type] = transformer

        transformer(metric, sample_data, runtime_data)

    return native_dynamic

rate(check, metric_name, modifiers, global_options)

Send with the AgentCheck.rate method.

Source code in
def get_rate(check, metric_name, modifiers, global_options):
    """
    Send with the `AgentCheck.rate` method.
    """
    rate_method = check.rate

    def rate(metric, sample_data, runtime_data):
        for sample, tags, hostname in sample_data:
            rate_method(metric_name, sample.value, tags=tags, hostname=hostname)

    del check
    del modifiers
    del global_options
    return rate

service_check(check, metric_name, modifiers, global_options)

This submits metrics as service checks.

The required modifier status_map is a mapping of values to statuses. Valid statuses include:

  • OK
  • WARNING
  • CRITICAL
  • UNKNOWN

Any encountered values that are not defined will be sent as UNKNOWN.

Source code in
def get_service_check(check, metric_name, modifiers, global_options):
    """
    This submits metrics as service checks.

    The required modifier `status_map` is a mapping of values to statuses. Valid statuses include:

    - `OK`
    - `WARNING`
    - `CRITICAL`
    - `UNKNOWN`

    Any encountered values that are not defined will be sent as `UNKNOWN`.
    """
    # Do work in a separate function to avoid having to `del` a bunch of variables
    status_map = compile_service_check_statuses(modifiers)

    service_check_method = check.service_check

    def service_check(metric, sample_data, runtime_data):
        static_tags = runtime_data['static_tags']

        for sample, _, hostname in sample_data:
            service_check_method(
                metric_name,
                status_map.get(int(sample.value), ServiceCheck.UNKNOWN),
                tags=static_tags,
                hostname=hostname,
            )

    del check
    del modifiers
    del global_options
    return service_check

summary(check, metric_name, modifiers, global_options)

https://prometheus.io/docs/concepts/metric_types/#summary https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#summary-1

Source code in
def get_summary(check, metric_name, modifiers, global_options):
    """
    https://prometheus.io/docs/concepts/metric_types/#summary
    https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#summary-1
    """
    gauge_method = check.gauge
    monotonic_count_method = check.monotonic_count
    sum_metric = f'{metric_name}.sum'
    count_metric = f'{metric_name}.count'
    quantile_metric = f'{metric_name}.quantile'

    def summary(metric, sample_data, runtime_data):
        flush_first_value = runtime_data['flush_first_value']

        for sample, tags, hostname in sample_data:
            sample_name = sample.name
            if sample_name.endswith('_sum'):
                monotonic_count_method(
                    sum_metric,
                    sample.value,
                    tags=tags,
                    hostname=hostname,
                    flush_first_value=flush_first_value,
                )
            elif sample_name.endswith('_count'):
                monotonic_count_method(
                    count_metric,
                    sample.value,
                    tags=tags,
                    hostname=hostname,
                    flush_first_value=flush_first_value,
                )
            elif sample_name == metric.name:
                gauge_method(quantile_metric, sample.value, tags=tags, hostname=hostname)

    del check
    del modifiers
    del global_options
    return summary

temporal_percent(check, metric_name, modifiers, global_options)

This calculates values as a percentage of time since the last check run.

For example, say the result is a forever increasing counter representing the total time spent pausing for garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent pausing since the previous collection interval it becomes a useful metric.

There is one required parameter called scale that indicates what unit of time the result should be considered. Valid values are:

  • second
  • millisecond
  • microsecond
  • nanosecond

You may also define the unit as an integer number of parts compared to seconds e.g. millisecond is equivalent to 1000.

Source code in
def get_temporal_percent(check, metric_name, modifiers, global_options):
    """
    This calculates values as a percentage of time since the last check run.

    For example, say the result is a forever increasing counter representing the total time spent pausing for
    garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent
    pausing since the previous collection interval it becomes a useful metric.

    There is one required parameter called `scale` that indicates what unit of time the result should be considered.
    Valid values are:

    - `second`
    - `millisecond`
    - `microsecond`
    - `nanosecond`

    You may also define the unit as an integer number of parts compared to seconds e.g. `millisecond` is
    equivalent to `1000`.
    """
    scale = modifiers.get('scale')
    if scale is None:
        raise ValueError('the `scale` parameter is required')

    if isinstance(scale, str):
        scale = TIME_UNITS.get(scale.lower())
        if scale is None:
            raise ValueError(f"the `scale` parameter must be one of: {' | '.join(sorted(TIME_UNITS))}")
    elif not isinstance(scale, int):
        raise ValueError(
            'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond'
        )

    rate_method = check.rate

    def temporal_percent(metric, sample_data, runtime_data):
        for sample, tags, hostname in sample_data:
            rate_method(
                metric_name, total_time_to_temporal_percent(sample.value, scale=scale), tags=tags, hostname=hostname
            )

    del check
    del modifiers
    del global_options
    return temporal_percent

time_elapsed(check, metric_name, modifiers, global_options)

This sends the number of seconds elapsed from a time in the past as a gauge.

Source code in
def get_time_elapsed(check, metric_name, modifiers, global_options):
    """
    This sends the number of seconds elapsed from a time in the past as a `gauge`.
    """
    gauge_method = check.gauge

    def time_elapsed(metric, sample_data, runtime_data):
        for sample, tags, hostname in sample_data:
            gauge_method(metric_name, get_timestamp() - sample.value, tags=tags, hostname=hostname)

    del check
    del modifiers
    del global_options
    return time_elapsed

Options

For complete documentation on every option, see the associated templates for the instance and init_config sections.

Legacy

This OpenMetrics implementation is the updated version of the original Prometheus/OpenMetrics implementation. The docs for the deprecated implementation are still available as a reference.


Last update: November 10, 2021