OpenMetrics¶
OpenMetrics is used for collecting metrics using the CNCF-backed OpenMetrics format. This version is the default version for all new OpenMetric-checks, and it is compatible with Python 3 only.
Interface¶
datadog_checks.base.checks.openmetrics.v2.base.OpenMetricsBaseCheckV2
¶
OpenMetricsBaseCheckV2 is an updated class of OpenMetricsBaseCheck to scrape endpoints that emit Prometheus metrics.
Minimal example configuration:
instances:
- openmetrics_endpoint: http://example.com/endpoint
namespace: "foobar"
metrics:
- bar
- foo
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/base.py
class OpenMetricsBaseCheckV2(AgentCheck):
"""
OpenMetricsBaseCheckV2 is an updated class of OpenMetricsBaseCheck to scrape endpoints that emit Prometheus metrics.
Minimal example configuration:
```yaml
instances:
- openmetrics_endpoint: http://example.com/endpoint
namespace: "foobar"
metrics:
- bar
- foo
```
"""
DEFAULT_METRIC_LIMIT = 2000
# Allow tracing for openmetrics integrations
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
return traced_class(cls)
def __init__(self, name, init_config, instances):
"""
The base class for any OpenMetrics-based integration.
Subclasses are expected to override this to add their custom scrapers or transformers.
When overriding, make sure to call this (the parent's) __init__ first!
"""
super(OpenMetricsBaseCheckV2, self).__init__(name, init_config, instances)
# All desired scraper configurations, which subclasses can override as needed
self.scraper_configs = [self.instance]
# All configured scrapers keyed by the endpoint
self.scrapers = {}
self.check_initializations.append(self.configure_scrapers)
def check(self, _):
"""
Perform an openmetrics-based check.
Subclasses should typically not need to override this, as most common customization
needs are covered by the use of custom scrapers.
Another thing to note is that this check ignores its instance argument completely.
We take care of instance-level customization at initialization time.
"""
self.refresh_scrapers()
for endpoint, scraper in self.scrapers.items():
self.log.debug('Scraping OpenMetrics endpoint: %s', endpoint)
with self.adopt_namespace(scraper.namespace):
try:
scraper.scrape()
except (ConnectionError, RequestException) as e:
self.log.error("There was an error scraping endpoint %s: %s", endpoint, str(e))
raise type(e)("There was an error scraping endpoint {}: {}".format(endpoint, e)) from None
def configure_scrapers(self):
"""
Creates a scraper configuration for each instance.
"""
scrapers = {}
for config in self.scraper_configs:
endpoint = config.get('openmetrics_endpoint', '')
if not isinstance(endpoint, str):
raise ConfigurationError('The setting `openmetrics_endpoint` must be a string')
elif not endpoint:
raise ConfigurationError('The setting `openmetrics_endpoint` is required')
scrapers[endpoint] = self.create_scraper(config)
self.scrapers.clear()
self.scrapers.update(scrapers)
def create_scraper(self, config):
"""
Subclasses can override to return a custom scraper based on instance configuration.
"""
return OpenMetricsScraper(self, self.get_config_with_defaults(config))
def set_dynamic_tags(self, *tags):
for scraper in self.scrapers.values():
scraper.set_dynamic_tags(*tags)
def get_config_with_defaults(self, config):
return ChainMap(config, self.get_default_config())
def get_default_config(self):
return {}
def refresh_scrapers(self):
pass
@contextmanager
def adopt_namespace(self, namespace):
old_namespace = self.__NAMESPACE__
try:
self.__NAMESPACE__ = namespace or old_namespace
yield
finally:
self.__NAMESPACE__ = old_namespace
__init__(name, init_config, instances)
¶
The base class for any OpenMetrics-based integration.
Subclasses are expected to override this to add their custom scrapers or transformers. When overriding, make sure to call this (the parent's) init first!
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/base.py
def __init__(self, name, init_config, instances):
"""
The base class for any OpenMetrics-based integration.
Subclasses are expected to override this to add their custom scrapers or transformers.
When overriding, make sure to call this (the parent's) __init__ first!
"""
super(OpenMetricsBaseCheckV2, self).__init__(name, init_config, instances)
# All desired scraper configurations, which subclasses can override as needed
self.scraper_configs = [self.instance]
# All configured scrapers keyed by the endpoint
self.scrapers = {}
self.check_initializations.append(self.configure_scrapers)
check(_)
¶
Perform an openmetrics-based check.
Subclasses should typically not need to override this, as most common customization needs are covered by the use of custom scrapers. Another thing to note is that this check ignores its instance argument completely. We take care of instance-level customization at initialization time.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/base.py
def check(self, _):
"""
Perform an openmetrics-based check.
Subclasses should typically not need to override this, as most common customization
needs are covered by the use of custom scrapers.
Another thing to note is that this check ignores its instance argument completely.
We take care of instance-level customization at initialization time.
"""
self.refresh_scrapers()
for endpoint, scraper in self.scrapers.items():
self.log.debug('Scraping OpenMetrics endpoint: %s', endpoint)
with self.adopt_namespace(scraper.namespace):
try:
scraper.scrape()
except (ConnectionError, RequestException) as e:
self.log.error("There was an error scraping endpoint %s: %s", endpoint, str(e))
raise type(e)("There was an error scraping endpoint {}: {}".format(endpoint, e)) from None
configure_scrapers()
¶
Creates a scraper configuration for each instance.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/base.py
def configure_scrapers(self):
"""
Creates a scraper configuration for each instance.
"""
scrapers = {}
for config in self.scraper_configs:
endpoint = config.get('openmetrics_endpoint', '')
if not isinstance(endpoint, str):
raise ConfigurationError('The setting `openmetrics_endpoint` must be a string')
elif not endpoint:
raise ConfigurationError('The setting `openmetrics_endpoint` is required')
scrapers[endpoint] = self.create_scraper(config)
self.scrapers.clear()
self.scrapers.update(scrapers)
create_scraper(config)
¶
Subclasses can override to return a custom scraper based on instance configuration.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/base.py
def create_scraper(self, config):
"""
Subclasses can override to return a custom scraper based on instance configuration.
"""
return OpenMetricsScraper(self, self.get_config_with_defaults(config))
Scrapers¶
datadog_checks.base.checks.openmetrics.v2.scraper.OpenMetricsScraper
¶
OpenMetricsScraper is a class that can be used to override the default scraping behavior for OpenMetricsBaseCheckV2.
Minimal example configuration:
- openmetrics_endpoint: http://example.com/endpoint
namespace: "foobar"
metrics:
- bar
- foo
raw_metric_prefix: "test"
telemetry: "true"
hostname_label: node
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
class OpenMetricsScraper:
"""
OpenMetricsScraper is a class that can be used to override the default scraping behavior for OpenMetricsBaseCheckV2.
Minimal example configuration:
```yaml
- openmetrics_endpoint: http://example.com/endpoint
namespace: "foobar"
metrics:
- bar
- foo
raw_metric_prefix: "test"
telemetry: "true"
hostname_label: node
```
"""
SERVICE_CHECK_HEALTH = 'openmetrics.health'
def __init__(self, check, config):
"""
The base class for any scraper overrides.
"""
self.config = config
# Save a reference to the check instance
self.check = check
# Parse the configuration
self.endpoint = config['openmetrics_endpoint']
self.metric_transformer = MetricTransformer(self.check, config)
self.label_aggregator = LabelAggregator(self.check, config)
self.enable_telemetry = is_affirmative(config.get('telemetry', False))
# Make every telemetry submission method a no-op to avoid many lookups of `self.enable_telemetry`
if not self.enable_telemetry:
for name, _ in inspect.getmembers(self, predicate=inspect.ismethod):
if name.startswith('submit_telemetry_'):
setattr(self, name, no_op)
# Prevent overriding an integration's defined namespace
self.namespace = check.__NAMESPACE__ or config.get('namespace', '')
if not isinstance(self.namespace, str):
raise ConfigurationError('Setting `namespace` must be a string')
self.raw_metric_prefix = config.get('raw_metric_prefix', '')
if not isinstance(self.raw_metric_prefix, str):
raise ConfigurationError('Setting `raw_metric_prefix` must be a string')
self.enable_health_service_check = is_affirmative(config.get('enable_health_service_check', True))
self.ignore_connection_errors = is_affirmative(config.get('ignore_connection_errors', False))
self.hostname_label = config.get('hostname_label', '')
if not isinstance(self.hostname_label, str):
raise ConfigurationError('Setting `hostname_label` must be a string')
hostname_format = config.get('hostname_format', '')
if not isinstance(hostname_format, str):
raise ConfigurationError('Setting `hostname_format` must be a string')
self.hostname_formatter = None
if self.hostname_label and hostname_format:
placeholder = '<HOSTNAME>'
if placeholder not in hostname_format:
raise ConfigurationError(f'Setting `hostname_format` does not contain the placeholder `{placeholder}`')
self.hostname_formatter = lambda hostname: hostname_format.replace('<HOSTNAME>', hostname, 1)
exclude_labels = config.get('exclude_labels', [])
if not isinstance(exclude_labels, list):
raise ConfigurationError('Setting `exclude_labels` must be an array')
self.exclude_labels = set()
for i, entry in enumerate(exclude_labels, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `exclude_labels` must be a string')
self.exclude_labels.add(entry)
include_labels = config.get('include_labels', [])
if not isinstance(include_labels, list):
raise ConfigurationError('Setting `include_labels` must be an array')
self.include_labels = set()
for i, entry in enumerate(include_labels, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `include_labels` must be a string')
if entry in self.exclude_labels:
self.log.debug(
'Label `%s` is set in both `exclude_labels` and `include_labels`. Excluding label.', entry
)
self.include_labels.add(entry)
self.rename_labels = config.get('rename_labels', {})
if not isinstance(self.rename_labels, dict):
raise ConfigurationError('Setting `rename_labels` must be a mapping')
for key, value in self.rename_labels.items():
if not isinstance(value, str):
raise ConfigurationError(f'Value for label `{key}` of setting `rename_labels` must be a string')
exclude_metrics = config.get('exclude_metrics', [])
if not isinstance(exclude_metrics, list):
raise ConfigurationError('Setting `exclude_metrics` must be an array')
self.exclude_metrics = set()
self.exclude_metrics_pattern = None
exclude_metrics_patterns = []
for i, entry in enumerate(exclude_metrics, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `exclude_metrics` must be a string')
escaped_entry = re.escape(entry)
if entry == escaped_entry:
self.exclude_metrics.add(entry)
else:
exclude_metrics_patterns.append(entry)
if exclude_metrics_patterns:
self.exclude_metrics_pattern = re.compile('|'.join(exclude_metrics_patterns))
self.exclude_metrics_by_labels = {}
exclude_metrics_by_labels = config.get('exclude_metrics_by_labels', {})
if not isinstance(exclude_metrics_by_labels, dict):
raise ConfigurationError('Setting `exclude_metrics_by_labels` must be a mapping')
elif exclude_metrics_by_labels:
for label, values in exclude_metrics_by_labels.items():
if values is True:
self.exclude_metrics_by_labels[label] = return_true
elif isinstance(values, list):
for i, value in enumerate(values, 1):
if not isinstance(value, str):
raise ConfigurationError(
f'Value #{i} for label `{label}` of setting `exclude_metrics_by_labels` '
f'must be a string'
)
self.exclude_metrics_by_labels[label] = (
lambda label_value, pattern=re.compile('|'.join(values)): pattern.search( # noqa: B008
label_value
) # noqa: B008, E501
is not None
)
else:
raise ConfigurationError(
f'Label `{label}` of setting `exclude_metrics_by_labels` must be an array or set to `true`'
)
custom_tags = config.get('tags', []) # type: List[str]
if not isinstance(custom_tags, list):
raise ConfigurationError('Setting `tags` must be an array')
for i, entry in enumerate(custom_tags, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `tags` must be a string')
# Some tags can be ignored to reduce the cardinality.
# This can be useful for cost optimization in containerized environments
# when the openmetrics check is configured to collect custom metrics.
# Even when the Agent's Tagger is configured to add low-cardinality tags only,
# some tags can still generate unwanted metric contexts (e.g pod annotations as tags).
ignore_tags = config.get('ignore_tags', [])
if ignore_tags:
ignored_tags_re = re.compile('|'.join(set(ignore_tags)))
custom_tags = [tag for tag in custom_tags if not ignored_tags_re.search(tag)]
self.static_tags = copy(custom_tags)
if is_affirmative(self.config.get('tag_by_endpoint', True)):
self.static_tags.append(f'endpoint:{self.endpoint}')
# These will be applied only to service checks
self.static_tags = tuple(self.static_tags)
# These will be applied to everything except service checks
self.tags = self.static_tags
self.raw_line_filter = None
raw_line_filters = config.get('raw_line_filters', [])
if not isinstance(raw_line_filters, list):
raise ConfigurationError('Setting `raw_line_filters` must be an array')
elif raw_line_filters:
for i, entry in enumerate(raw_line_filters, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `raw_line_filters` must be a string')
self.raw_line_filter = re.compile('|'.join(raw_line_filters))
self.http = RequestsWrapper(config, self.check.init_config, self.check.HTTP_CONFIG_REMAPPER, self.check.log)
self._content_type = ''
self._use_latest_spec = is_affirmative(config.get('use_latest_spec', False))
if self._use_latest_spec:
accept_header = 'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1'
else:
accept_header = 'text/plain'
# Request the appropriate exposition format
if self.http.options['headers'].get('Accept') == '*/*':
self.http.options['headers']['Accept'] = accept_header
self.use_process_start_time = is_affirmative(config.get('use_process_start_time'))
# Used for monotonic counts
self.flush_first_value = False
def scrape(self):
"""
Execute a scrape, and for each metric collected, transform the metric.
"""
runtime_data = {'flush_first_value': self.flush_first_value, 'static_tags': self.static_tags}
for metric in self.consume_metrics(runtime_data):
transformer = self.metric_transformer.get(metric)
if transformer is None:
continue
transformer(metric, self.generate_sample_data(metric), runtime_data)
self.flush_first_value = True
def consume_metrics(self, runtime_data):
"""
Yield the processed metrics and filter out excluded metrics.
"""
metric_parser = self.parse_metrics()
if not self.flush_first_value and self.use_process_start_time:
metric_parser = first_scrape_handler(metric_parser, runtime_data, datadog_agent.get_process_start_time())
if self.label_aggregator.configured:
metric_parser = self.label_aggregator(metric_parser)
for metric in metric_parser:
if metric.name in self.exclude_metrics or (
self.exclude_metrics_pattern is not None and self.exclude_metrics_pattern.search(metric.name)
):
self.submit_telemetry_number_of_ignored_metric_samples(metric)
continue
yield metric
def parse_metrics(self):
"""
Get the line streamer and yield processed metrics.
"""
line_streamer = self.stream_connection_lines()
if self.raw_line_filter is not None:
line_streamer = self.filter_connection_lines(line_streamer)
# Since we determine `self.parse_metric_families` dynamically from the response and that's done as a
# side effect inside the `line_streamer` generator, we need to consume the first line in order to
# trigger that side effect.
try:
line_streamer = chain([next(line_streamer)], line_streamer)
except StopIteration:
# If line_streamer is an empty iterator, next(line_streamer) fails.
return
for metric in self.parse_metric_families(line_streamer):
self.submit_telemetry_number_of_total_metric_samples(metric)
# It is critical that the prefix is removed immediately so that
# all other configuration may reference the trimmed metric name
if self.raw_metric_prefix and metric.name.startswith(self.raw_metric_prefix):
metric.name = metric.name[len(self.raw_metric_prefix) :]
yield metric
@property
def parse_metric_families(self):
media_type = self._content_type.split(';')[0]
# Setting `use_latest_spec` forces the use of the OpenMetrics format, otherwise
# the format will be chosen based on the media type specified in the response's content-header.
# The selection is based on what Prometheus does:
# https://github.com/prometheus/prometheus/blob/v2.43.0/model/textparse/interface.go#L83-L90
return (
parse_openmetrics
if self._use_latest_spec or media_type == 'application/openmetrics-text'
else parse_prometheus
)
def generate_sample_data(self, metric):
"""
Yield a sample of processed data.
"""
label_normalizer = get_label_normalizer(metric.type)
for sample in metric.samples:
value = sample.value
if isnan(value) or isinf(value):
self.log.debug('Ignoring sample for metric `%s` as it has an invalid value: %s', metric.name, value)
continue
tags = []
skip_sample = False
labels = sample.labels
self.label_aggregator.populate(labels)
label_normalizer(labels)
for label_name, label_value in labels.items():
sample_excluder = self.exclude_metrics_by_labels.get(label_name)
if sample_excluder is not None and sample_excluder(label_value):
skip_sample = True
break
elif label_name in self.exclude_labels:
continue
elif self.include_labels and label_name not in self.include_labels:
continue
label_name = self.rename_labels.get(label_name, label_name)
tags.append(f'{label_name}:{label_value}')
if skip_sample:
continue
tags.extend(self.tags)
hostname = ""
if self.hostname_label and self.hostname_label in labels:
hostname = labels[self.hostname_label]
if self.hostname_formatter is not None:
hostname = self.hostname_formatter(hostname)
self.submit_telemetry_number_of_processed_metric_samples()
yield sample, tags, hostname
def stream_connection_lines(self):
"""
Yield the connection line.
"""
try:
with self.get_connection() as connection:
# Media type will be used to select parser dynamically
self._content_type = connection.headers.get('Content-Type', '')
for line in connection.iter_lines(decode_unicode=True):
yield line
except ConnectionError as e:
if self.ignore_connection_errors:
self.log.warning("OpenMetrics endpoint %s is not accessible", self.endpoint)
else:
raise e
def filter_connection_lines(self, line_streamer):
"""
Filter connection lines in the line streamer.
"""
for line in line_streamer:
if self.raw_line_filter.search(line):
self.submit_telemetry_number_of_ignored_lines()
else:
yield line
def get_connection(self):
"""
Send a request to scrape metrics. Return the response or throw an exception.
"""
try:
response = self.send_request()
except Exception as e:
self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
raise
else:
try:
response.raise_for_status()
except Exception as e:
self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
response.close()
raise
else:
self.submit_health_check(ServiceCheck.OK)
# Never derive the encoding from the locale
if response.encoding is None:
response.encoding = 'utf-8'
self.submit_telemetry_endpoint_response_size(response)
return response
def send_request(self, **kwargs):
"""
Send an HTTP GET request to the `openmetrics_endpoint` value.
"""
kwargs['stream'] = True
return self.http.get(self.endpoint, **kwargs)
def set_dynamic_tags(self, *tags):
"""
Set dynamic tags.
"""
self.tags = tuple(chain(self.static_tags, tags))
def submit_health_check(self, status, **kwargs):
"""
If health service check is enabled, send an `openmetrics.health` service check.
"""
if self.enable_health_service_check:
self.service_check(self.SERVICE_CHECK_HEALTH, status, tags=self.static_tags, **kwargs)
def submit_telemetry_number_of_total_metric_samples(self, metric):
self.count('telemetry.metrics.input.count', len(metric.samples), tags=self.tags)
def submit_telemetry_number_of_ignored_metric_samples(self, metric):
self.count('telemetry.metrics.ignored.count', len(metric.samples), tags=self.tags)
def submit_telemetry_number_of_processed_metric_samples(self):
self.count('telemetry.metrics.processed.count', 1, tags=self.tags)
def submit_telemetry_number_of_ignored_lines(self):
self.count('telemetry.metrics.blacklist.count', 1, tags=self.tags)
def submit_telemetry_endpoint_response_size(self, response):
content_length = response.headers.get('Content-Length')
if content_length is not None:
content_length = int(content_length)
else:
content_length = len(response.content)
self.gauge('telemetry.payload.size', content_length, tags=self.tags)
def __getattr__(self, name):
# Forward all unknown attribute lookups to the check instance for access to submission methods, hostname, etc.
attribute = getattr(self.check, name)
setattr(self, name, attribute)
return attribute
__init__(check, config)
¶
The base class for any scraper overrides.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def __init__(self, check, config):
"""
The base class for any scraper overrides.
"""
self.config = config
# Save a reference to the check instance
self.check = check
# Parse the configuration
self.endpoint = config['openmetrics_endpoint']
self.metric_transformer = MetricTransformer(self.check, config)
self.label_aggregator = LabelAggregator(self.check, config)
self.enable_telemetry = is_affirmative(config.get('telemetry', False))
# Make every telemetry submission method a no-op to avoid many lookups of `self.enable_telemetry`
if not self.enable_telemetry:
for name, _ in inspect.getmembers(self, predicate=inspect.ismethod):
if name.startswith('submit_telemetry_'):
setattr(self, name, no_op)
# Prevent overriding an integration's defined namespace
self.namespace = check.__NAMESPACE__ or config.get('namespace', '')
if not isinstance(self.namespace, str):
raise ConfigurationError('Setting `namespace` must be a string')
self.raw_metric_prefix = config.get('raw_metric_prefix', '')
if not isinstance(self.raw_metric_prefix, str):
raise ConfigurationError('Setting `raw_metric_prefix` must be a string')
self.enable_health_service_check = is_affirmative(config.get('enable_health_service_check', True))
self.ignore_connection_errors = is_affirmative(config.get('ignore_connection_errors', False))
self.hostname_label = config.get('hostname_label', '')
if not isinstance(self.hostname_label, str):
raise ConfigurationError('Setting `hostname_label` must be a string')
hostname_format = config.get('hostname_format', '')
if not isinstance(hostname_format, str):
raise ConfigurationError('Setting `hostname_format` must be a string')
self.hostname_formatter = None
if self.hostname_label and hostname_format:
placeholder = '<HOSTNAME>'
if placeholder not in hostname_format:
raise ConfigurationError(f'Setting `hostname_format` does not contain the placeholder `{placeholder}`')
self.hostname_formatter = lambda hostname: hostname_format.replace('<HOSTNAME>', hostname, 1)
exclude_labels = config.get('exclude_labels', [])
if not isinstance(exclude_labels, list):
raise ConfigurationError('Setting `exclude_labels` must be an array')
self.exclude_labels = set()
for i, entry in enumerate(exclude_labels, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `exclude_labels` must be a string')
self.exclude_labels.add(entry)
include_labels = config.get('include_labels', [])
if not isinstance(include_labels, list):
raise ConfigurationError('Setting `include_labels` must be an array')
self.include_labels = set()
for i, entry in enumerate(include_labels, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `include_labels` must be a string')
if entry in self.exclude_labels:
self.log.debug(
'Label `%s` is set in both `exclude_labels` and `include_labels`. Excluding label.', entry
)
self.include_labels.add(entry)
self.rename_labels = config.get('rename_labels', {})
if not isinstance(self.rename_labels, dict):
raise ConfigurationError('Setting `rename_labels` must be a mapping')
for key, value in self.rename_labels.items():
if not isinstance(value, str):
raise ConfigurationError(f'Value for label `{key}` of setting `rename_labels` must be a string')
exclude_metrics = config.get('exclude_metrics', [])
if not isinstance(exclude_metrics, list):
raise ConfigurationError('Setting `exclude_metrics` must be an array')
self.exclude_metrics = set()
self.exclude_metrics_pattern = None
exclude_metrics_patterns = []
for i, entry in enumerate(exclude_metrics, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `exclude_metrics` must be a string')
escaped_entry = re.escape(entry)
if entry == escaped_entry:
self.exclude_metrics.add(entry)
else:
exclude_metrics_patterns.append(entry)
if exclude_metrics_patterns:
self.exclude_metrics_pattern = re.compile('|'.join(exclude_metrics_patterns))
self.exclude_metrics_by_labels = {}
exclude_metrics_by_labels = config.get('exclude_metrics_by_labels', {})
if not isinstance(exclude_metrics_by_labels, dict):
raise ConfigurationError('Setting `exclude_metrics_by_labels` must be a mapping')
elif exclude_metrics_by_labels:
for label, values in exclude_metrics_by_labels.items():
if values is True:
self.exclude_metrics_by_labels[label] = return_true
elif isinstance(values, list):
for i, value in enumerate(values, 1):
if not isinstance(value, str):
raise ConfigurationError(
f'Value #{i} for label `{label}` of setting `exclude_metrics_by_labels` '
f'must be a string'
)
self.exclude_metrics_by_labels[label] = (
lambda label_value, pattern=re.compile('|'.join(values)): pattern.search( # noqa: B008
label_value
) # noqa: B008, E501
is not None
)
else:
raise ConfigurationError(
f'Label `{label}` of setting `exclude_metrics_by_labels` must be an array or set to `true`'
)
custom_tags = config.get('tags', []) # type: List[str]
if not isinstance(custom_tags, list):
raise ConfigurationError('Setting `tags` must be an array')
for i, entry in enumerate(custom_tags, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `tags` must be a string')
# Some tags can be ignored to reduce the cardinality.
# This can be useful for cost optimization in containerized environments
# when the openmetrics check is configured to collect custom metrics.
# Even when the Agent's Tagger is configured to add low-cardinality tags only,
# some tags can still generate unwanted metric contexts (e.g pod annotations as tags).
ignore_tags = config.get('ignore_tags', [])
if ignore_tags:
ignored_tags_re = re.compile('|'.join(set(ignore_tags)))
custom_tags = [tag for tag in custom_tags if not ignored_tags_re.search(tag)]
self.static_tags = copy(custom_tags)
if is_affirmative(self.config.get('tag_by_endpoint', True)):
self.static_tags.append(f'endpoint:{self.endpoint}')
# These will be applied only to service checks
self.static_tags = tuple(self.static_tags)
# These will be applied to everything except service checks
self.tags = self.static_tags
self.raw_line_filter = None
raw_line_filters = config.get('raw_line_filters', [])
if not isinstance(raw_line_filters, list):
raise ConfigurationError('Setting `raw_line_filters` must be an array')
elif raw_line_filters:
for i, entry in enumerate(raw_line_filters, 1):
if not isinstance(entry, str):
raise ConfigurationError(f'Entry #{i} of setting `raw_line_filters` must be a string')
self.raw_line_filter = re.compile('|'.join(raw_line_filters))
self.http = RequestsWrapper(config, self.check.init_config, self.check.HTTP_CONFIG_REMAPPER, self.check.log)
self._content_type = ''
self._use_latest_spec = is_affirmative(config.get('use_latest_spec', False))
if self._use_latest_spec:
accept_header = 'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1'
else:
accept_header = 'text/plain'
# Request the appropriate exposition format
if self.http.options['headers'].get('Accept') == '*/*':
self.http.options['headers']['Accept'] = accept_header
self.use_process_start_time = is_affirmative(config.get('use_process_start_time'))
# Used for monotonic counts
self.flush_first_value = False
scrape()
¶
Execute a scrape, and for each metric collected, transform the metric.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def scrape(self):
"""
Execute a scrape, and for each metric collected, transform the metric.
"""
runtime_data = {'flush_first_value': self.flush_first_value, 'static_tags': self.static_tags}
for metric in self.consume_metrics(runtime_data):
transformer = self.metric_transformer.get(metric)
if transformer is None:
continue
transformer(metric, self.generate_sample_data(metric), runtime_data)
self.flush_first_value = True
consume_metrics(runtime_data)
¶
Yield the processed metrics and filter out excluded metrics.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def consume_metrics(self, runtime_data):
"""
Yield the processed metrics and filter out excluded metrics.
"""
metric_parser = self.parse_metrics()
if not self.flush_first_value and self.use_process_start_time:
metric_parser = first_scrape_handler(metric_parser, runtime_data, datadog_agent.get_process_start_time())
if self.label_aggregator.configured:
metric_parser = self.label_aggregator(metric_parser)
for metric in metric_parser:
if metric.name in self.exclude_metrics or (
self.exclude_metrics_pattern is not None and self.exclude_metrics_pattern.search(metric.name)
):
self.submit_telemetry_number_of_ignored_metric_samples(metric)
continue
yield metric
parse_metrics()
¶
Get the line streamer and yield processed metrics.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def parse_metrics(self):
"""
Get the line streamer and yield processed metrics.
"""
line_streamer = self.stream_connection_lines()
if self.raw_line_filter is not None:
line_streamer = self.filter_connection_lines(line_streamer)
# Since we determine `self.parse_metric_families` dynamically from the response and that's done as a
# side effect inside the `line_streamer` generator, we need to consume the first line in order to
# trigger that side effect.
try:
line_streamer = chain([next(line_streamer)], line_streamer)
except StopIteration:
# If line_streamer is an empty iterator, next(line_streamer) fails.
return
for metric in self.parse_metric_families(line_streamer):
self.submit_telemetry_number_of_total_metric_samples(metric)
# It is critical that the prefix is removed immediately so that
# all other configuration may reference the trimmed metric name
if self.raw_metric_prefix and metric.name.startswith(self.raw_metric_prefix):
metric.name = metric.name[len(self.raw_metric_prefix) :]
yield metric
generate_sample_data(metric)
¶
Yield a sample of processed data.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def generate_sample_data(self, metric):
"""
Yield a sample of processed data.
"""
label_normalizer = get_label_normalizer(metric.type)
for sample in metric.samples:
value = sample.value
if isnan(value) or isinf(value):
self.log.debug('Ignoring sample for metric `%s` as it has an invalid value: %s', metric.name, value)
continue
tags = []
skip_sample = False
labels = sample.labels
self.label_aggregator.populate(labels)
label_normalizer(labels)
for label_name, label_value in labels.items():
sample_excluder = self.exclude_metrics_by_labels.get(label_name)
if sample_excluder is not None and sample_excluder(label_value):
skip_sample = True
break
elif label_name in self.exclude_labels:
continue
elif self.include_labels and label_name not in self.include_labels:
continue
label_name = self.rename_labels.get(label_name, label_name)
tags.append(f'{label_name}:{label_value}')
if skip_sample:
continue
tags.extend(self.tags)
hostname = ""
if self.hostname_label and self.hostname_label in labels:
hostname = labels[self.hostname_label]
if self.hostname_formatter is not None:
hostname = self.hostname_formatter(hostname)
self.submit_telemetry_number_of_processed_metric_samples()
yield sample, tags, hostname
stream_connection_lines()
¶
Yield the connection line.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def stream_connection_lines(self):
"""
Yield the connection line.
"""
try:
with self.get_connection() as connection:
# Media type will be used to select parser dynamically
self._content_type = connection.headers.get('Content-Type', '')
for line in connection.iter_lines(decode_unicode=True):
yield line
except ConnectionError as e:
if self.ignore_connection_errors:
self.log.warning("OpenMetrics endpoint %s is not accessible", self.endpoint)
else:
raise e
filter_connection_lines(line_streamer)
¶
Filter connection lines in the line streamer.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def filter_connection_lines(self, line_streamer):
"""
Filter connection lines in the line streamer.
"""
for line in line_streamer:
if self.raw_line_filter.search(line):
self.submit_telemetry_number_of_ignored_lines()
else:
yield line
get_connection()
¶
Send a request to scrape metrics. Return the response or throw an exception.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def get_connection(self):
"""
Send a request to scrape metrics. Return the response or throw an exception.
"""
try:
response = self.send_request()
except Exception as e:
self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
raise
else:
try:
response.raise_for_status()
except Exception as e:
self.submit_health_check(ServiceCheck.CRITICAL, message=str(e))
response.close()
raise
else:
self.submit_health_check(ServiceCheck.OK)
# Never derive the encoding from the locale
if response.encoding is None:
response.encoding = 'utf-8'
self.submit_telemetry_endpoint_response_size(response)
return response
set_dynamic_tags(*tags)
¶
Set dynamic tags.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def set_dynamic_tags(self, *tags):
"""
Set dynamic tags.
"""
self.tags = tuple(chain(self.static_tags, tags))
submit_health_check(status, **kwargs)
¶
If health service check is enabled, send an openmetrics.health
service check.
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper.py
def submit_health_check(self, status, **kwargs):
"""
If health service check is enabled, send an `openmetrics.health` service check.
"""
if self.enable_health_service_check:
self.service_check(self.SERVICE_CHECK_HEALTH, status, tags=self.static_tags, **kwargs)
Transformers¶
datadog_checks.base.checks.openmetrics.v2.transform.Transformers
¶
Source code in datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/transform.py
class Transformers(object):
pass
Options¶
For complete documentation on every option, see the associated templates for the instance and init_config sections.
Legacy¶
This OpenMetrics implementation is the updated version of the original Prometheus/OpenMetrics implementation. The docs for the deprecated implementation are still available as a reference.