Skip to content

API


datadog_checks.base.checks.base.AgentCheck

The base class for any Agent based integration.

In general, you don't need to and you should not override anything from the base class except the check method but sometimes it might be useful for a Check to have its own constructor.

When overriding __init__ you have to remember that, depending on the configuration, the Agent might create several different Check instances and the method would be called as many times.

Agent 6,7 signature:

AgentCheck(name, init_config, instances)    # instances contain only 1 instance
AgentCheck.check(instance)

Agent 8 signature:

AgentCheck(name, init_config, instance)     # one instance
AgentCheck.check()                          # no more instance argument for check method

Note

when loading a Custom check, the Agent will inspect the module searching for a subclass of AgentCheck. If such a class exists but has been derived in turn, it'll be ignored - you should never derive from an existing Check.

Source code in datadog_checks/base/checks/base.py
class AgentCheck(object):
    """
    The base class for any Agent based integration.

    In general, you don't need to and you should not override anything from the base
    class except the `check` method but sometimes it might be useful for a Check to
    have its own constructor.

    When overriding `__init__` you have to remember that, depending on the configuration,
    the Agent might create several different Check instances and the method would be
    called as many times.

    Agent 6,7 signature:

        AgentCheck(name, init_config, instances)    # instances contain only 1 instance
        AgentCheck.check(instance)

    Agent 8 signature:

        AgentCheck(name, init_config, instance)     # one instance
        AgentCheck.check()                          # no more instance argument for check method

    !!! note
        when loading a Custom check, the Agent will inspect the module searching
        for a subclass of `AgentCheck`. If such a class exists but has been derived in
        turn, it'll be ignored - **you should never derive from an existing Check**.
    """

    # If defined, this will be the prefix of every metric/service check and the source type of events
    __NAMESPACE__ = ''

    OK, WARNING, CRITICAL, UNKNOWN = ServiceCheck

    # Used by `self.http` for an instance of RequestsWrapper
    HTTP_CONFIG_REMAPPER = None

    # Used by `create_tls_context` for an instance of RequestsWrapper
    TLS_CONFIG_REMAPPER = None

    # Used by `self.set_metadata` for an instance of MetadataManager
    #
    # This is a mapping of metadata names to functions. When you call `self.set_metadata(name, value, **options)`,
    # if `name` is in this mapping then the corresponding function will be called with the `value`, and the
    # return value(s) will be sent instead.
    #
    # Transformer functions must satisfy the following signature:
    #
    #    def transform_<NAME>(value: Any, options: dict) -> Union[str, Dict[str, str]]:
    #
    # If the return type is a string, then it will be sent as the value for `name`. If the return type is
    # a mapping type, then each key will be considered a `name` and will be sent with its (str) value.
    METADATA_TRANSFORMERS = None

    # Default fields to whitelist for metadata submission
    METADATA_DEFAULT_CONFIG_INIT_CONFIG = None
    METADATA_DEFAULT_CONFIG_INSTANCE = None

    FIRST_CAP_RE = re.compile(br'(.)([A-Z][a-z]+)')
    ALL_CAP_RE = re.compile(br'([a-z0-9])([A-Z])')
    METRIC_REPLACEMENT = re.compile(br'([^a-zA-Z0-9_.]+)|(^[^a-zA-Z]+)')
    TAG_REPLACEMENT = re.compile(br'[,\+\*\-/()\[\]{}\s]')
    MULTIPLE_UNDERSCORE_CLEANUP = re.compile(br'__+')
    DOT_UNDERSCORE_CLEANUP = re.compile(br'_*\._*')

    # allows to set a limit on the number of metric name and tags combination
    # this check can send per run. This is useful for checks that have an unbounded
    # number of tag values that depend on the input payload.
    # The logic counts one set of tags per gauge/rate/monotonic_count call, and de-duplicates
    # sets of tags for other metric types. The first N sets of tags in submission order will
    # be sent to the aggregator, the rest are dropped. The state is reset after each run.
    # See https://github.com/DataDog/integrations-core/pull/2093 for more information.
    DEFAULT_METRIC_LIMIT = 0

    # Allow tracing for classic integrations
    def __init_subclass__(cls, *args, **kwargs):
        try:
            # https://github.com/python/mypy/issues/4660
            super().__init_subclass__(*args, **kwargs)  # type: ignore
            return traced_class(cls)
        except Exception:
            return cls

    def __init__(self, *args, **kwargs):
        # type: (*Any, **Any) -> None
        """
        - **name** (_str_) - the name of the check
        - **init_config** (_dict_) - the `init_config` section of the configuration.
        - **instance** (_List[dict]_) - a one-element list containing the instance options from the
                configuration file (a list is used to keep backward compatibility with
                older versions of the Agent).
        """
        # NOTE: these variable assignments exist to ease type checking when eventually assigned as attributes.
        name = kwargs.get('name', '')
        init_config = kwargs.get('init_config', {})
        agentConfig = kwargs.get('agentConfig', {})
        instances = kwargs.get('instances', [])

        if len(args) > 0:
            name = args[0]
        if len(args) > 1:
            init_config = args[1]
        if len(args) > 2:
            # agent pass instances as tuple but in test we are usually using list, so we are testing for both
            if len(args) > 3 or not isinstance(args[2], (list, tuple)) or 'instances' in kwargs:
                # old-style init: the 3rd argument is `agentConfig`
                agentConfig = args[2]
                if len(args) > 3:
                    instances = args[3]
            else:
                # new-style init: the 3rd argument is `instances`
                instances = args[2]

        # NOTE: Agent 6+ should pass exactly one instance... But we are not abiding by that rule on our side
        # everywhere just yet. It's complicated... See: https://github.com/DataDog/integrations-core/pull/5573
        instance = instances[0] if instances else None

        self.check_id = ''
        self.name = name  # type: str
        self.init_config = init_config  # type: InitConfigType
        self.agentConfig = agentConfig  # type: AgentConfigType
        self.instance = instance  # type: InstanceType
        self.instances = instances  # type: List[InstanceType]
        self.warnings = []  # type: List[str]
        self.disable_generic_tags = (
            is_affirmative(self.instance.get('disable_generic_tags', False)) if instance else False
        )
        self.debug_metrics = {}
        if self.init_config is not None:
            self.debug_metrics.update(self.init_config.get('debug_metrics', {}))
        if self.instance is not None:
            self.debug_metrics.update(self.instance.get('debug_metrics', {}))

        # `self.hostname` is deprecated, use `datadog_agent.get_hostname()` instead
        self.hostname = datadog_agent.get_hostname()  # type: str

        logger = logging.getLogger('{}.{}'.format(__name__, self.name))
        self.log = CheckLoggingAdapter(logger, self)

        metric_patterns = self.instance.get('metric_patterns', {}) if instance else {}
        if not isinstance(metric_patterns, dict):
            raise ConfigurationError('Setting `metric_patterns` must be a mapping')

        self.exclude_metrics_pattern = self._create_metrics_pattern(metric_patterns, 'exclude')
        self.include_metrics_pattern = self._create_metrics_pattern(metric_patterns, 'include')

        # TODO: Remove with Agent 5
        # Set proxy settings
        self.proxies = self._get_requests_proxy()
        if not self.init_config:
            self._use_agent_proxy = True
        else:
            self._use_agent_proxy = is_affirmative(self.init_config.get('use_agent_proxy', True))

        # TODO: Remove with Agent 5
        self.default_integration_http_timeout = float(self.agentConfig.get('default_integration_http_timeout', 9))

        self._deprecations = {
            'increment': (
                False,
                (
                    'DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please '
                    'use `AgentCheck.gauge` or `AgentCheck.count` instead, with a different metric name'
                ),
            ),
            'device_name': (
                False,
                (
                    'DEPRECATION NOTICE: `device_name` is deprecated, please use a `device:` '
                    'tag in the `tags` list instead'
                ),
            ),
            'in_developer_mode': (
                False,
                'DEPRECATION NOTICE: `in_developer_mode` is deprecated, please stop using it.',
            ),
            'no_proxy': (
                False,
                (
                    'DEPRECATION NOTICE: The `no_proxy` config option has been renamed '
                    'to `skip_proxy` and will be removed in a future release.'
                ),
            ),
            'service_tag': (
                False,
                (
                    'DEPRECATION NOTICE: The `service` tag is deprecated and has been renamed to `%s`. '
                    'Set `disable_legacy_service_tag` to `true` to disable this warning. '
                    'The default will become `true` and cannot be changed in Agent version 8.'
                ),
            ),
            '_config_renamed': (
                False,
                (
                    'DEPRECATION NOTICE: The `%s` config option has been renamed '
                    'to `%s` and will be removed in a future release.'
                ),
            ),
        }  # type: Dict[str, Tuple[bool, str]]

        # Setup metric limits
        self.metric_limiter = self._get_metric_limiter(self.name, instance=self.instance)

        # Lazily load and validate config
        self._config_model_instance = None  # type: Any
        self._config_model_shared = None  # type: Any

        # Functions that will be called exactly once (if successful) before the first check run
        self.check_initializations = deque([self.send_config_metadata])  # type: Deque[Callable[[], None]]

        if not PY2:
            self.check_initializations.append(self.load_configuration_models)

    def _create_metrics_pattern(self, metric_patterns, option_name):
        all_patterns = metric_patterns.get(option_name, [])

        if not isinstance(all_patterns, list):
            raise ConfigurationError('Setting `{}` of `metric_patterns` must be an array'.format(option_name))

        metrics_patterns = []
        for i, entry in enumerate(all_patterns, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(
                    'Entry #{} of setting `{}` of `metric_patterns` must be a string'.format(i, option_name)
                )
            if not entry:
                self.log.debug(
                    'Entry #%s of setting `%s` of `metric_patterns` must not be empty, ignoring', i, option_name
                )
                continue

            metrics_patterns.append(entry)

        if metrics_patterns:
            return re.compile('|'.join(metrics_patterns))

        return None

    def _get_metric_limiter(self, name, instance=None):
        # type: (str, InstanceType) -> Optional[Limiter]
        limit = self._get_metric_limit(instance=instance)

        if limit > 0:
            return Limiter(name, 'metrics', limit, self.warning)

        return None

    def _get_metric_limit(self, instance=None):
        # type: (InstanceType) -> int
        if instance is None:
            # NOTE: Agent 6+ will now always pass an instance when calling into a check, but we still need to
            # account for this case due to some tests not always passing an instance on init.
            self.log.debug(
                "No instance provided (this is deprecated!). Reverting to the default metric limit: %s",
                self.DEFAULT_METRIC_LIMIT,
            )
            return self.DEFAULT_METRIC_LIMIT

        max_returned_metrics = instance.get('max_returned_metrics', self.DEFAULT_METRIC_LIMIT)

        try:
            limit = int(max_returned_metrics)
        except (ValueError, TypeError):
            self.warning(
                "Configured 'max_returned_metrics' cannot be interpreted as an integer: %s. "
                "Reverting to the default limit: %s",
                max_returned_metrics,
                self.DEFAULT_METRIC_LIMIT,
            )
            return self.DEFAULT_METRIC_LIMIT

        # Do not allow to disable limiting if the class has set a non-zero default value.
        if limit == 0 and self.DEFAULT_METRIC_LIMIT > 0:
            self.warning(
                "Setting 'max_returned_metrics' to zero is not allowed. Reverting to the default metric limit: %s",
                self.DEFAULT_METRIC_LIMIT,
            )
            return self.DEFAULT_METRIC_LIMIT

        return limit

    @staticmethod
    def load_config(yaml_str):
        # type: (str) -> Any
        """
        Convenience wrapper to ease programmatic use of this class from the C API.
        """
        return yaml.safe_load(yaml_str)

    @property
    def http(self):
        # type: () -> RequestsWrapper
        """
        Provides logic to yield consistent network behavior based on user configuration.

        Only new checks or checks on Agent 6.13+ can and should use this for HTTP requests.
        """
        if not hasattr(self, '_http'):
            self._http = RequestsWrapper(self.instance or {}, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log)

        return self._http

    def get_tls_context(self, refresh=False, overrides=None):
        # type: (bool, Dict[AnyStr, Any]) -> ssl.SSLContext
        """
        Creates and cache an SSLContext instance based on user configuration.
        Note that user configuration can be overridden by using `overrides`.
        This should only be applied to older integration that manually set config values.

        Since: Agent 7.24
        """
        if not hasattr(self, '_tls_context_wrapper'):
            self._tls_context_wrapper = TlsContextWrapper(
                self.instance or {}, self.TLS_CONFIG_REMAPPER, overrides=overrides
            )

        if refresh:
            self._tls_context_wrapper.refresh_tls_context()

        return self._tls_context_wrapper.tls_context

    @property
    def metadata_manager(self):
        # type: () -> MetadataManager
        """
        Used for sending metadata via Go bindings.
        """
        if not hasattr(self, '_metadata_manager'):
            if not self.check_id and not using_stub_aggregator:
                raise RuntimeError('Attribute `check_id` must be set')

            self._metadata_manager = MetadataManager(self.name, self.check_id, self.log, self.METADATA_TRANSFORMERS)

        return self._metadata_manager

    @property
    def check_version(self):
        # type: () -> str
        """
        Return the dynamically detected integration version.
        """
        if not hasattr(self, '_check_version'):
            # 'datadog_checks.<PACKAGE>.<MODULE>...'
            module_parts = self.__module__.split('.')
            package_path = '.'.join(module_parts[:2])
            package = importlib.import_module(package_path)

            # Provide a default just in case
            self._check_version = getattr(package, '__version__', '0.0.0')

        return self._check_version

    @property
    def in_developer_mode(self):
        # type: () -> bool
        self._log_deprecation('in_developer_mode')
        return False

    def log_typos_in_options(self, user_config, models_config, level):
        # only import it when running in python 3
        from jellyfish import jaro_winkler_similarity

        user_configs = user_config or {}  # type: Dict[str, Any]
        models_config = models_config or {}
        typos = set()  # type: Set[str]

        known_options = [k for k, _ in models_config]  # type: List[str]
        unknown_options = [option for option in user_configs.keys() if option not in known_options]  # type: List[str]

        for unknown_option in unknown_options:
            similar_known_options = []  # type: List[Tuple[str, int]]
            for known_option in known_options:
                ratio = jaro_winkler_similarity(unknown_option, known_option)
                if ratio > TYPO_SIMILARITY_THRESHOLD:
                    similar_known_options.append((known_option, ratio))
                    typos.add(unknown_option)

            if len(similar_known_options) > 0:
                similar_known_options.sort(key=lambda option: option[1], reverse=True)
                similar_known_options_names = [option[0] for option in similar_known_options]  # type: List[str]
                message = (
                    'Detected potential typo in configuration option in {}/{} section: `{}`. Did you mean {}?'
                ).format(self.name, level, unknown_option, ', or '.join(similar_known_options_names))
                self.log.warning(message)
        return typos

    def load_configuration_models(self, package_path=None):
        if package_path is None:
            # 'datadog_checks.<PACKAGE>.<MODULE>...'
            module_parts = self.__module__.split('.')
            package_path = '{}.config_models'.format('.'.join(module_parts[:2]))
        if self._config_model_shared is None:
            raw_shared_config = self._get_config_model_initialization_data()
            intg_shared_config = self._get_shared_config()
            raw_shared_config.update(intg_shared_config)

            shared_config = self.load_configuration_model(package_path, 'SharedConfig', raw_shared_config)
            try:
                self.log_typos_in_options(intg_shared_config, shared_config, 'init_config')
            except Exception as e:
                self.log.debug("Failed to detect typos in `init_config` section: %s", e)
            if shared_config is not None:
                self._config_model_shared = shared_config

        if self._config_model_instance is None:
            raw_instance_config = self._get_config_model_initialization_data()
            intg_instance_config = self._get_instance_config()
            raw_instance_config.update(intg_instance_config)

            instance_config = self.load_configuration_model(package_path, 'InstanceConfig', raw_instance_config)
            try:
                self.log_typos_in_options(intg_instance_config, instance_config, 'instances')
            except Exception as e:
                self.log.debug("Failed to detect typos in `instances` section: %s", e)
            if instance_config is not None:
                self._config_model_instance = instance_config

    @staticmethod
    def load_configuration_model(import_path, model_name, config):
        try:
            package = importlib.import_module(import_path)
        # TODO: remove the type ignore when we drop Python 2
        except ModuleNotFoundError as e:  # type: ignore
            # Don't fail if there are no models
            if str(e).startswith('No module named '):
                return

            raise

        model = getattr(package, model_name, None)
        if model is not None:
            try:
                config_model = model(**config)
            # TODO: remove the type ignore when we drop Python 2
            except ValidationError as e:  # type: ignore
                errors = e.errors()
                num_errors = len(errors)
                message_lines = [
                    'Detected {} error{} while loading configuration model `{}`:'.format(
                        num_errors, 's' if num_errors > 1 else '', model_name
                    )
                ]

                for error in errors:
                    message_lines.append(
                        ' -> '.join(
                            # Start array indexes at one for user-friendliness
                            str(loc + 1) if isinstance(loc, int) else str(loc)
                            for loc in error['loc']
                        )
                    )
                    message_lines.append('  {}'.format(error['msg']))

                raise_from(ConfigurationError('\n'.join(message_lines)), None)
            else:
                return config_model

    def _get_shared_config(self):
        # Any extra fields will be available during a config model's initial validation stage
        return copy.deepcopy(self.init_config)

    def _get_instance_config(self):
        # Any extra fields will be available during a config model's initial validation stage
        return copy.deepcopy(self.instance)

    def _get_config_model_initialization_data(self):
        # Allow for advanced functionality during the initial root validation stage
        return {'__data': {'logger': self.log, 'warning': self.warning}}

    def register_secret(self, secret):
        # type: (str) -> None
        """
        Register a secret to be scrubbed by `.sanitize()`.
        """
        if not hasattr(self, '_sanitizer'):
            # Configure lazily so that checks that don't use sanitization aren't affected.
            self._sanitizer = SecretsSanitizer()
            self.log.setup_sanitization(sanitize=self.sanitize)

        self._sanitizer.register(secret)

    def sanitize(self, text):
        # type: (str) -> str
        """
        Scrub any registered secrets in `text`.
        """
        try:
            sanitizer = self._sanitizer
        except AttributeError:
            return text
        else:
            return sanitizer.sanitize(text)

    def _context_uid(self, mtype, name, tags=None, hostname=None):
        # type: (int, str, Sequence[str], str) -> str
        return '{}-{}-{}-{}'.format(mtype, name, tags if tags is None else hash(frozenset(tags)), hostname)

    def submit_histogram_bucket(
        self, name, value, lower_bound, upper_bound, monotonic, hostname, tags, raw=False, flush_first_value=False
    ):
        # type: (str, float, int, int, bool, str, Sequence[str], bool, bool) -> None
        if value is None:
            # ignore metric sample
            return

        # make sure the value (bucket count) is an integer
        try:
            value = int(value)
        except ValueError:
            err_msg = 'Histogram: {} has non integer value: {}. Only integer are valid bucket values (count).'.format(
                repr(name), repr(value)
            )
            if using_stub_aggregator:
                raise ValueError(err_msg)
            self.warning(err_msg)
            return

        tags = self._normalize_tags_type(tags, metric_name=name)
        if hostname is None:
            hostname = ''

        aggregator.submit_histogram_bucket(
            self,
            self.check_id,
            self._format_namespace(name, raw),
            value,
            lower_bound,
            upper_bound,
            monotonic,
            hostname,
            tags,
            flush_first_value,
        )

    def database_monitoring_query_sample(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-samples")

    def database_monitoring_query_metrics(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-metrics")

    def database_monitoring_query_activity(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-activity")

    def should_send_metric(self, metric_name):
        return not self._metric_excluded(metric_name) and self._metric_included(metric_name)

    def _metric_included(self, metric_name):
        if self.include_metrics_pattern is None:
            return True

        return self.include_metrics_pattern.search(metric_name) is not None

    def _metric_excluded(self, metric_name):
        if self.exclude_metrics_pattern is None:
            return False

        return self.exclude_metrics_pattern.search(metric_name) is not None

    def _submit_metric(
        self, mtype, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
    ):
        # type: (int, str, float, Sequence[str], str, str, bool, bool) -> None
        if value is None:
            # ignore metric sample
            return

        name = self._format_namespace(name, raw)
        if not self.should_send_metric(name):
            return

        tags = self._normalize_tags_type(tags or [], device_name, name)
        if hostname is None:
            hostname = ''

        if self.metric_limiter:
            if mtype in ONE_PER_CONTEXT_METRIC_TYPES:
                # Fast path for gauges, rates, monotonic counters, assume one set of tags per call
                if self.metric_limiter.is_reached():
                    return
            else:
                # Other metric types have a legit use case for several calls per set of tags, track unique sets of tags
                context = self._context_uid(mtype, name, tags, hostname)
                if self.metric_limiter.is_reached(context):
                    return

        try:
            value = float(value)
        except ValueError:
            err_msg = 'Metric: {} has non float value: {}. Only float values can be submitted as metrics.'.format(
                repr(name), repr(value)
            )
            if using_stub_aggregator:
                raise ValueError(err_msg)
            self.warning(err_msg)
            return

        aggregator.submit_metric(self, self.check_id, mtype, name, value, tags, hostname, flush_first_value)

    def gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a gauge metric.

        **Parameters:**

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def count(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a raw count metric.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def monotonic_count(
        self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
    ):
        # type: (str, float, Sequence[str], str, str, bool, bool) -> None
        """Sample an increasing counter metric.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        - **flush_first_value** (_bool_) - whether to sample the first value
        """
        self._submit_metric(
            aggregator.MONOTONIC_COUNT,
            name,
            value,
            tags=tags,
            hostname=hostname,
            device_name=device_name,
            raw=raw,
            flush_first_value=flush_first_value,
        )

    def rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a point, with the rate calculated at the end of the check.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a histogram metric.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a histogram based on rate metrics.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.HISTORATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def increment(self, name, value=1, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Increment a counter metric.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._log_deprecation('increment')
        self._submit_metric(
            aggregator.COUNTER, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def decrement(self, name, value=-1, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Decrement a counter metric.

        - **name** (_str_) - the name of the metric
        - **value** (_float_) - the value for the metric
        - **tags** (_List[str]_) - a list of tags to associate with this metric
        - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
        - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
            list instead.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        self._log_deprecation('increment')
        self._submit_metric(
            aggregator.COUNTER, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def service_check(self, name, status, tags=None, hostname=None, message=None, raw=False):
        # type: (str, ServiceCheckStatus, Sequence[str], str, str, bool) -> None
        """Send the status of a service.

        - **name** (_str_) - the name of the service check
        - **status** (_int_) - a constant describing the service status.
        - **tags** (_List[str]_) - a list of tags to associate with this service check
        - **message** (_str_) - additional information or a description of why this status occurred.
        - **raw** (_bool_) - whether to ignore any defined namespace prefix
        """
        tags = self._normalize_tags_type(tags or [])
        if hostname is None:
            hostname = ''
        if message is None:
            message = ''
        else:
            message = to_native_string(message)

        message = self.sanitize(message)

        aggregator.submit_service_check(
            self, self.check_id, self._format_namespace(name, raw), status, tags, hostname, message
        )

    def _log_deprecation(self, deprecation_key, *args):
        # type: (str, *str) -> None
        """
        Logs a deprecation notice at most once per AgentCheck instance, for the pre-defined `deprecation_key`
        """
        sent, message = self._deprecations[deprecation_key]
        if sent:
            return

        self.warning(message, *args)
        self._deprecations[deprecation_key] = (True, message)

    # TODO: Remove once our checks stop calling it
    def service_metadata(self, meta_name, value):
        # type: (str, Any) -> None
        pass

    def set_metadata(self, name, value, **options):
        # type: (str, Any, **Any) -> None
        """Updates the cached metadata ``name`` with ``value``, which is then sent by the Agent at regular intervals.

        :param str name: the name of the metadata
        :param object value: the value for the metadata. if ``name`` has no transformer defined then the
                             raw ``value`` will be submitted and therefore it must be a ``str``
        :param options: keyword arguments to pass to any defined transformer
        """
        self.metadata_manager.submit(name, value, options)

    def send_config_metadata(self):
        # type: () -> None
        self.set_metadata('config', self.instance, section='instance', whitelist=self.METADATA_DEFAULT_CONFIG_INSTANCE)
        self.set_metadata(
            'config', self.init_config, section='init_config', whitelist=self.METADATA_DEFAULT_CONFIG_INIT_CONFIG
        )

    @staticmethod
    def is_metadata_collection_enabled():
        # type: () -> bool
        return is_affirmative(datadog_agent.get_config('enable_metadata_collection'))

    @classmethod
    def metadata_entrypoint(cls, method):
        # type: (Callable[..., None]) -> Callable[..., None]
        """
        Skip execution of the decorated method if metadata collection is disabled on the Agent.

        Usage:

        ```python
        class MyCheck(AgentCheck):
            @AgentCheck.metadata_entrypoint
            def collect_metadata(self):
                ...
        ```
        """

        @functools.wraps(method)
        def entrypoint(self, *args, **kwargs):
            # type: (AgentCheck, *Any, **Any) -> None
            if not self.is_metadata_collection_enabled():
                return

            # NOTE: error handling still at the discretion of the wrapped method.
            method(self, *args, **kwargs)

        return entrypoint

    def _persistent_cache_id(self, key):
        # type: (str) -> str
        return '{}_{}'.format(self.check_id, key)

    def read_persistent_cache(self, key):
        # type: (str) -> str
        """Returns the value previously stored with `write_persistent_cache` for the same `key`.

        - **key** (_str_) - The key to retrieve
        """
        return datadog_agent.read_persistent_cache(self._persistent_cache_id(key))

    def write_persistent_cache(self, key, value):
        # type: (str, str) -> None
        """Stores `value` in a persistent cache for this check instance.
        The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in
            - `%ProgramData%\\Datadog\\run` on Windows.
            - `/opt/datadog-agent/run` everywhere else.
        The cache is persistent between agent restarts but will be rebuilt if the check instance configuration changes.

        - **key** (_str_) - Identifier used to build the filename
        - **value** (_str_) - Value to store
        """
        datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value)

    def set_external_tags(self, external_tags):
        # type: (Sequence[ExternalTagType]) -> None
        # Example of external_tags format
        # [
        #     ('hostname', {'src_name': ['test:t1']}),
        #     ('hostname2', {'src2_name': ['test2:t3']})
        # ]
        try:
            new_tags = []
            for hostname, source_map in external_tags:
                new_tags.append((to_native_string(hostname), source_map))
                for src_name, tags in iteritems(source_map):
                    source_map[src_name] = self._normalize_tags_type(tags)
            datadog_agent.set_external_tags(new_tags)
        except IndexError:
            self.log.exception('Unexpected external tags format: %s', external_tags)
            raise

    def convert_to_underscore_separated(self, name):
        # type: (Union[str, bytes]) -> bytes
        """
        Convert from CamelCase to camel_case
        And substitute illegal metric characters
        """
        name = ensure_bytes(name)
        metric_name = self.FIRST_CAP_RE.sub(br'\1_\2', name)
        metric_name = self.ALL_CAP_RE.sub(br'\1_\2', metric_name).lower()
        metric_name = self.METRIC_REPLACEMENT.sub(br'_', metric_name)
        return self.DOT_UNDERSCORE_CLEANUP.sub(br'.', metric_name).strip(b'_')

    def warning(self, warning_message, *args, **kwargs):
        # type: (str, *Any, **Any) -> None
        """Log a warning message and display it in the Agent's status page.

        Using *args is intended to make warning work like log.warn/debug/info/etc
        and make it compliant with flake8 logging format linter.

        :param str warning_message: the warning message.
        :param list args: format string args used to format warning_message e.g. `warning_message % args`
        :param dict kwargs: not used for now, but added to match Python logger's `warning` method signature
        """
        warning_message = to_native_string(warning_message)
        # Interpolate message only if args is not empty. Same behavior as python logger:
        # https://github.com/python/cpython/blob/1dbe5373851acb85ba91f0be7b83c69563acd68d/Lib/logging/__init__.py#L368-L369
        if args:
            warning_message = warning_message % args
        frame = inspect.currentframe().f_back  # type: ignore
        lineno = frame.f_lineno
        # only log the last part of the filename, not the full path
        filename = basename(frame.f_code.co_filename)

        self.log.warning(warning_message, extra={'_lineno': lineno, '_filename': filename, '_check_id': self.check_id})
        self.warnings.append(warning_message)

    def get_warnings(self):
        # type: () -> List[str]
        """
        Return the list of warnings messages to be displayed in the info page
        """
        warnings = self.warnings
        self.warnings = []
        return warnings

    def _get_requests_proxy(self):
        # type: () -> ProxySettings
        # TODO: Remove with Agent 5
        no_proxy_settings = {'http': None, 'https': None, 'no': []}  # type: ProxySettings

        # First we read the proxy configuration from datadog.conf
        proxies = self.agentConfig.get('proxy', datadog_agent.get_config('proxy'))
        if proxies:
            proxies = proxies.copy()

        # requests compliant dict
        if proxies and 'no_proxy' in proxies:
            proxies['no'] = proxies.pop('no_proxy')

        return proxies if proxies else no_proxy_settings

    def _format_namespace(self, s, raw=False):
        # type: (str, bool) -> str
        if not raw and self.__NAMESPACE__:
            return '{}.{}'.format(self.__NAMESPACE__, to_native_string(s))

        return to_native_string(s)

    def normalize(self, metric, prefix=None, fix_case=False):
        # type: (Union[str, bytes], Union[str, bytes], bool) -> str
        """
        Turn a metric into a well-formed metric name
        prefix.b.c
        :param metric The metric name to normalize
        :param prefix A prefix to to add to the normalized name, default None
        :param fix_case A boolean, indicating whether to make sure that the metric name returned is in "snake_case"
        """
        if isinstance(metric, text_type):
            metric = unicodedata.normalize('NFKD', metric).encode('ascii', 'ignore')

        if fix_case:
            name = self.convert_to_underscore_separated(metric)
            if prefix is not None:
                prefix = self.convert_to_underscore_separated(prefix)
        else:
            name = self.METRIC_REPLACEMENT.sub(br'_', metric)
            name = self.DOT_UNDERSCORE_CLEANUP.sub(br'.', name).strip(b'_')

        name = self.MULTIPLE_UNDERSCORE_CLEANUP.sub(br'_', name)

        if prefix is not None:
            name = ensure_bytes(prefix) + b"." + name

        return to_native_string(name)

    def normalize_tag(self, tag):
        # type: (Union[str, bytes]) -> str
        """Normalize tag values.

        This happens for legacy reasons, when we cleaned up some characters (like '-')
        which are allowed in tags.
        """
        if isinstance(tag, text_type):
            tag = tag.encode('utf-8', 'ignore')
        tag = self.TAG_REPLACEMENT.sub(br'_', tag)
        tag = self.MULTIPLE_UNDERSCORE_CLEANUP.sub(br'_', tag)
        tag = self.DOT_UNDERSCORE_CLEANUP.sub(br'.', tag).strip(b'_')
        return to_native_string(tag)

    def check(self, instance):
        # type: (InstanceType) -> None
        raise NotImplementedError

    def cancel(self):
        # type: () -> None
        """
        This method is called when the check in unscheduled by the agent. This
        is SIGNAL that the check is being unscheduled and can be called while
        the check is running. It's up to the python implementation to make sure
        cancel is thread safe and won't block.
        """
        pass

    def run(self):
        # type: () -> str
        try:
            while self.check_initializations:
                initialization = self.check_initializations.popleft()
                try:
                    initialization()
                except Exception:
                    self.check_initializations.appendleft(initialization)
                    raise

            instance = copy.deepcopy(self.instances[0])

            if 'set_breakpoint' in self.init_config:
                from ..utils.agent.debug import enter_pdb

                enter_pdb(self.check, line=self.init_config['set_breakpoint'], args=(instance,))
            elif 'profile_memory' in self.init_config or (
                datadog_agent.tracemalloc_enabled() and should_profile_memory(datadog_agent, self.name)
            ):
                from ..utils.agent.memory import profile_memory

                metrics = profile_memory(
                    self.check, self.init_config, namespaces=self.check_id.split(':', 1), args=(instance,)
                )

                tags = self.get_debug_metric_tags()
                tags.extend(instance.get('__memory_profiling_tags', []))
                for m in metrics:
                    self.gauge(m.name, m.value, tags=tags, raw=True)
            else:
                self.check(instance)

            result = ''
        except Exception as e:
            message = self.sanitize(str(e))
            tb = self.sanitize(traceback.format_exc())
            result = json.dumps([{'message': message, 'traceback': tb}])
        finally:
            if self.metric_limiter:
                if is_affirmative(self.debug_metrics.get('metric_contexts', False)):
                    debug_metrics = self.metric_limiter.get_debug_metrics()

                    # Reset so we can actually submit the metrics
                    self.metric_limiter.reset()

                    tags = self.get_debug_metric_tags()
                    for metric_name, value in debug_metrics:
                        self.gauge(metric_name, value, tags=tags, raw=True)

                self.metric_limiter.reset()

        return result

    def event(self, event):
        # type: (Event) -> None
        """Send an event.

        An event is a dictionary with the following keys and data types:

        ```python
        {
            "timestamp": int,        # the epoch timestamp for the event
            "event_type": str,       # the event name
            "api_key": str,          # the api key for your account
            "msg_title": str,        # the title of the event
            "msg_text": str,         # the text body of the event
            "aggregation_key": str,  # a key to use for aggregating events
            "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
            "source_type_name": str, # (optional) the source type name
            "host": str,             # (optional) the name of the host
            "tags": list,            # (optional) a list of tags to associate with this event
            "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
        }
        ```

        - **event** (_dict_) - the event to be sent
        """
        # Enforce types of some fields, considerably facilitates handling in go bindings downstream
        for key, value in iteritems(event):
            if not isinstance(value, (text_type, binary_type)):
                continue

            try:
                event[key] = to_native_string(value)  # type: ignore
                # ^ Mypy complains about dynamic key assignment -- arguably for good reason.
                # Ideally we should convert this to a dict literal so that submitted events only include known keys.
            except UnicodeError:
                self.log.warning('Encoding error with field `%s`, cannot submit event', key)
                return

        if event.get('tags'):
            event['tags'] = self._normalize_tags_type(event['tags'])
        if event.get('timestamp'):
            event['timestamp'] = int(event['timestamp'])
        if event.get('aggregation_key'):
            event['aggregation_key'] = to_native_string(event['aggregation_key'])

        if self.__NAMESPACE__:
            event.setdefault('source_type_name', self.__NAMESPACE__)

        aggregator.submit_event(self, self.check_id, event)

    def _normalize_tags_type(self, tags, device_name=None, metric_name=None):
        # type: (Sequence[Union[None, str, bytes]], str, str) -> List[str]
        """
        Normalize tags contents and type:
        - append `device_name` as `device:` tag
        - normalize tags type
        - doesn't mutate the passed list, returns a new list
        """
        normalized_tags = []

        if device_name:
            self._log_deprecation('device_name')
            try:
                normalized_tags.append('device:{}'.format(to_native_string(device_name)))
            except UnicodeError:
                self.log.warning(
                    'Encoding error with device name `%r` for metric `%r`, ignoring tag', device_name, metric_name
                )

        for tag in tags:
            if tag is None:
                continue
            try:
                tag = to_native_string(tag)
            except UnicodeError:
                self.log.warning('Encoding error with tag `%s` for metric `%s`, ignoring tag', tag, metric_name)
                continue
            if self.disable_generic_tags:
                normalized_tags.append(self.degeneralise_tag(tag))
            else:
                normalized_tags.append(tag)
        return normalized_tags

    def degeneralise_tag(self, tag):
        split_tag = tag.split(':', 1)
        if len(split_tag) > 1:
            tag_name, value = split_tag
        else:
            tag_name = tag
            value = None

        if tag_name in GENERIC_TAGS:
            new_name = '{}_{}'.format(self.name, tag_name)
            if value:
                return '{}:{}'.format(new_name, value)
            else:
                return new_name
        else:
            return tag

    def get_debug_metric_tags(self):
        tags = ['check_name:{}'.format(self.name), 'check_version:{}'.format(self.check_version)]
        tags.extend(self.instance.get('tags', []))
        return tags

__init__(self, *args, **kwargs) special

  • name (str) - the name of the check
  • init_config (dict) - the init_config section of the configuration.
  • instance (List[dict]) - a one-element list containing the instance options from the configuration file (a list is used to keep backward compatibility with older versions of the Agent).
Source code in datadog_checks/base/checks/base.py
def __init__(self, *args, **kwargs):
    # type: (*Any, **Any) -> None
    """
    - **name** (_str_) - the name of the check
    - **init_config** (_dict_) - the `init_config` section of the configuration.
    - **instance** (_List[dict]_) - a one-element list containing the instance options from the
            configuration file (a list is used to keep backward compatibility with
            older versions of the Agent).
    """
    # NOTE: these variable assignments exist to ease type checking when eventually assigned as attributes.
    name = kwargs.get('name', '')
    init_config = kwargs.get('init_config', {})
    agentConfig = kwargs.get('agentConfig', {})
    instances = kwargs.get('instances', [])

    if len(args) > 0:
        name = args[0]
    if len(args) > 1:
        init_config = args[1]
    if len(args) > 2:
        # agent pass instances as tuple but in test we are usually using list, so we are testing for both
        if len(args) > 3 or not isinstance(args[2], (list, tuple)) or 'instances' in kwargs:
            # old-style init: the 3rd argument is `agentConfig`
            agentConfig = args[2]
            if len(args) > 3:
                instances = args[3]
        else:
            # new-style init: the 3rd argument is `instances`
            instances = args[2]

    # NOTE: Agent 6+ should pass exactly one instance... But we are not abiding by that rule on our side
    # everywhere just yet. It's complicated... See: https://github.com/DataDog/integrations-core/pull/5573
    instance = instances[0] if instances else None

    self.check_id = ''
    self.name = name  # type: str
    self.init_config = init_config  # type: InitConfigType
    self.agentConfig = agentConfig  # type: AgentConfigType
    self.instance = instance  # type: InstanceType
    self.instances = instances  # type: List[InstanceType]
    self.warnings = []  # type: List[str]
    self.disable_generic_tags = (
        is_affirmative(self.instance.get('disable_generic_tags', False)) if instance else False
    )
    self.debug_metrics = {}
    if self.init_config is not None:
        self.debug_metrics.update(self.init_config.get('debug_metrics', {}))
    if self.instance is not None:
        self.debug_metrics.update(self.instance.get('debug_metrics', {}))

    # `self.hostname` is deprecated, use `datadog_agent.get_hostname()` instead
    self.hostname = datadog_agent.get_hostname()  # type: str

    logger = logging.getLogger('{}.{}'.format(__name__, self.name))
    self.log = CheckLoggingAdapter(logger, self)

    metric_patterns = self.instance.get('metric_patterns', {}) if instance else {}
    if not isinstance(metric_patterns, dict):
        raise ConfigurationError('Setting `metric_patterns` must be a mapping')

    self.exclude_metrics_pattern = self._create_metrics_pattern(metric_patterns, 'exclude')
    self.include_metrics_pattern = self._create_metrics_pattern(metric_patterns, 'include')

    # TODO: Remove with Agent 5
    # Set proxy settings
    self.proxies = self._get_requests_proxy()
    if not self.init_config:
        self._use_agent_proxy = True
    else:
        self._use_agent_proxy = is_affirmative(self.init_config.get('use_agent_proxy', True))

    # TODO: Remove with Agent 5
    self.default_integration_http_timeout = float(self.agentConfig.get('default_integration_http_timeout', 9))

    self._deprecations = {
        'increment': (
            False,
            (
                'DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please '
                'use `AgentCheck.gauge` or `AgentCheck.count` instead, with a different metric name'
            ),
        ),
        'device_name': (
            False,
            (
                'DEPRECATION NOTICE: `device_name` is deprecated, please use a `device:` '
                'tag in the `tags` list instead'
            ),
        ),
        'in_developer_mode': (
            False,
            'DEPRECATION NOTICE: `in_developer_mode` is deprecated, please stop using it.',
        ),
        'no_proxy': (
            False,
            (
                'DEPRECATION NOTICE: The `no_proxy` config option has been renamed '
                'to `skip_proxy` and will be removed in a future release.'
            ),
        ),
        'service_tag': (
            False,
            (
                'DEPRECATION NOTICE: The `service` tag is deprecated and has been renamed to `%s`. '
                'Set `disable_legacy_service_tag` to `true` to disable this warning. '
                'The default will become `true` and cannot be changed in Agent version 8.'
            ),
        ),
        '_config_renamed': (
            False,
            (
                'DEPRECATION NOTICE: The `%s` config option has been renamed '
                'to `%s` and will be removed in a future release.'
            ),
        ),
    }  # type: Dict[str, Tuple[bool, str]]

    # Setup metric limits
    self.metric_limiter = self._get_metric_limiter(self.name, instance=self.instance)

    # Lazily load and validate config
    self._config_model_instance = None  # type: Any
    self._config_model_shared = None  # type: Any

    # Functions that will be called exactly once (if successful) before the first check run
    self.check_initializations = deque([self.send_config_metadata])  # type: Deque[Callable[[], None]]

    if not PY2:
        self.check_initializations.append(self.load_configuration_models)

count(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a raw count metric.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in datadog_checks/base/checks/base.py
def count(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a raw count metric.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

event(self, event)

Send an event.

An event is a dictionary with the following keys and data types:

{
    "timestamp": int,        # the epoch timestamp for the event
    "event_type": str,       # the event name
    "api_key": str,          # the api key for your account
    "msg_title": str,        # the title of the event
    "msg_text": str,         # the text body of the event
    "aggregation_key": str,  # a key to use for aggregating events
    "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
    "source_type_name": str, # (optional) the source type name
    "host": str,             # (optional) the name of the host
    "tags": list,            # (optional) a list of tags to associate with this event
    "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
}
  • event (dict) - the event to be sent
Source code in datadog_checks/base/checks/base.py
def event(self, event):
    # type: (Event) -> None
    """Send an event.

    An event is a dictionary with the following keys and data types:

    ```python
    {
        "timestamp": int,        # the epoch timestamp for the event
        "event_type": str,       # the event name
        "api_key": str,          # the api key for your account
        "msg_title": str,        # the title of the event
        "msg_text": str,         # the text body of the event
        "aggregation_key": str,  # a key to use for aggregating events
        "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
        "source_type_name": str, # (optional) the source type name
        "host": str,             # (optional) the name of the host
        "tags": list,            # (optional) a list of tags to associate with this event
        "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
    }
    ```

    - **event** (_dict_) - the event to be sent
    """
    # Enforce types of some fields, considerably facilitates handling in go bindings downstream
    for key, value in iteritems(event):
        if not isinstance(value, (text_type, binary_type)):
            continue

        try:
            event[key] = to_native_string(value)  # type: ignore
            # ^ Mypy complains about dynamic key assignment -- arguably for good reason.
            # Ideally we should convert this to a dict literal so that submitted events only include known keys.
        except UnicodeError:
            self.log.warning('Encoding error with field `%s`, cannot submit event', key)
            return

    if event.get('tags'):
        event['tags'] = self._normalize_tags_type(event['tags'])
    if event.get('timestamp'):
        event['timestamp'] = int(event['timestamp'])
    if event.get('aggregation_key'):
        event['aggregation_key'] = to_native_string(event['aggregation_key'])

    if self.__NAMESPACE__:
        event.setdefault('source_type_name', self.__NAMESPACE__)

    aggregator.submit_event(self, self.check_id, event)

gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a gauge metric.

Parameters:

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in datadog_checks/base/checks/base.py
def gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a gauge metric.

    **Parameters:**

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a histogram metric.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in datadog_checks/base/checks/base.py
def histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a histogram metric.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a histogram based on rate metrics.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in datadog_checks/base/checks/base.py
def historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a histogram based on rate metrics.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.HISTORATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

metadata_entrypoint(method) classmethod

Skip execution of the decorated method if metadata collection is disabled on the Agent.

Usage:

class MyCheck(AgentCheck):
    @AgentCheck.metadata_entrypoint
    def collect_metadata(self):
        ...
Source code in datadog_checks/base/checks/base.py
@classmethod
def metadata_entrypoint(cls, method):
    # type: (Callable[..., None]) -> Callable[..., None]
    """
    Skip execution of the decorated method if metadata collection is disabled on the Agent.

    Usage:

    ```python
    class MyCheck(AgentCheck):
        @AgentCheck.metadata_entrypoint
        def collect_metadata(self):
            ...
    ```
    """

    @functools.wraps(method)
    def entrypoint(self, *args, **kwargs):
        # type: (AgentCheck, *Any, **Any) -> None
        if not self.is_metadata_collection_enabled():
            return

        # NOTE: error handling still at the discretion of the wrapped method.
        method(self, *args, **kwargs)

    return entrypoint

monotonic_count(self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False)

Sample an increasing counter metric.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
  • flush_first_value (bool) - whether to sample the first value
Source code in datadog_checks/base/checks/base.py
def monotonic_count(
    self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
):
    # type: (str, float, Sequence[str], str, str, bool, bool) -> None
    """Sample an increasing counter metric.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    - **flush_first_value** (_bool_) - whether to sample the first value
    """
    self._submit_metric(
        aggregator.MONOTONIC_COUNT,
        name,
        value,
        tags=tags,
        hostname=hostname,
        device_name=device_name,
        raw=raw,
        flush_first_value=flush_first_value,
    )

rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a point, with the rate calculated at the end of the check.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in datadog_checks/base/checks/base.py
def rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a point, with the rate calculated at the end of the check.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

service_check(self, name, status, tags=None, hostname=None, message=None, raw=False)

Send the status of a service.

  • name (str) - the name of the service check
  • status (int) - a constant describing the service status.
  • tags (List[str]) - a list of tags to associate with this service check
  • message (str) - additional information or a description of why this status occurred.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in datadog_checks/base/checks/base.py
def service_check(self, name, status, tags=None, hostname=None, message=None, raw=False):
    # type: (str, ServiceCheckStatus, Sequence[str], str, str, bool) -> None
    """Send the status of a service.

    - **name** (_str_) - the name of the service check
    - **status** (_int_) - a constant describing the service status.
    - **tags** (_List[str]_) - a list of tags to associate with this service check
    - **message** (_str_) - additional information or a description of why this status occurred.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    tags = self._normalize_tags_type(tags or [])
    if hostname is None:
        hostname = ''
    if message is None:
        message = ''
    else:
        message = to_native_string(message)

    message = self.sanitize(message)

    aggregator.submit_service_check(
        self, self.check_id, self._format_namespace(name, raw), status, tags, hostname, message
    )

set_metadata(self, name, value, **options)

Updates the cached metadata name with value, which is then sent by the Agent at regular intervals.

:param str name: the name of the metadata :param object value: the value for the metadata. if name has no transformer defined then the raw value will be submitted and therefore it must be a str :param options: keyword arguments to pass to any defined transformer

Source code in datadog_checks/base/checks/base.py
def set_metadata(self, name, value, **options):
    # type: (str, Any, **Any) -> None
    """Updates the cached metadata ``name`` with ``value``, which is then sent by the Agent at regular intervals.

    :param str name: the name of the metadata
    :param object value: the value for the metadata. if ``name`` has no transformer defined then the
                         raw ``value`` will be submitted and therefore it must be a ``str``
    :param options: keyword arguments to pass to any defined transformer
    """
    self.metadata_manager.submit(name, value, options)

Stubs

datadog_checks.base.stubs.aggregator.AggregatorStub

This implements the methods defined by the Agent's C bindings which in turn call the Go backend.

It also provides utility methods for test assertions.

Source code in datadog_checks/base/stubs/aggregator.py
class AggregatorStub(object):
    """
    This implements the methods defined by the Agent's
    [C bindings](https://github.com/DataDog/datadog-agent/blob/master/rtloader/common/builtins/aggregator.c)
    which in turn call the
    [Go backend](https://github.com/DataDog/datadog-agent/blob/master/pkg/collector/python/aggregator.go).

    It also provides utility methods for test assertions.
    """

    # Replicate the Enum we have on the Agent
    METRIC_ENUM_MAP = OrderedDict(
        (
            ('gauge', 0),
            ('rate', 1),
            ('count', 2),
            ('monotonic_count', 3),
            ('counter', 4),
            ('histogram', 5),
            ('historate', 6),
        )
    )
    METRIC_ENUM_MAP_REV = {v: k for k, v in iteritems(METRIC_ENUM_MAP)}
    GAUGE, RATE, COUNT, MONOTONIC_COUNT, COUNTER, HISTOGRAM, HISTORATE = list(METRIC_ENUM_MAP.values())
    AGGREGATE_TYPES = {COUNT, COUNTER}
    IGNORED_METRICS = {'datadog.agent.profile.memory.check_run_alloc'}
    METRIC_TYPE_SUBMISSION_TO_BACKEND_MAP = {
        'gauge': 'gauge',
        'rate': 'gauge',
        'count': 'count',
        'monotonic_count': 'count',
        'counter': 'rate',
        'histogram': 'rate',  # Checking .count only, the other are gauges
        'historate': 'rate',  # Checking .count only, the other are gauges
    }

    def __init__(self):
        self.reset()

    @classmethod
    def is_aggregate(cls, mtype):
        return mtype in cls.AGGREGATE_TYPES

    @classmethod
    def ignore_metric(cls, name):
        return name in cls.IGNORED_METRICS

    def submit_metric(self, check, check_id, mtype, name, value, tags, hostname, flush_first_value):
        check_tag_names(name, tags)
        if not self.ignore_metric(name):
            self._metrics[name].append(MetricStub(name, mtype, value, tags, hostname, None, flush_first_value))

    def submit_metric_e2e(
        self, check, check_id, mtype, name, value, tags, hostname, device=None, flush_first_value=False
    ):
        check_tag_names(name, tags)
        # Device is only present in metrics read from the real agent in e2e tests. Normally it is submitted as a tag
        if not self.ignore_metric(name):
            self._metrics[name].append(MetricStub(name, mtype, value, tags, hostname, device, flush_first_value))

    def submit_service_check(self, check, check_id, name, status, tags, hostname, message):
        if status == ServiceCheck.OK and message:
            raise Exception("Expected empty message on OK service check")

        check_tag_names(name, tags)
        self._service_checks[name].append(ServiceCheckStub(check_id, name, status, tags, hostname, message))

    def submit_event(self, check, check_id, event):
        self._events.append(event)

    def submit_event_platform_event(self, check, check_id, raw_event, event_type):
        self._event_platform_events[event_type].append(raw_event)

    def submit_histogram_bucket(
        self,
        check,
        check_id,
        name,
        value,
        lower_bound,
        upper_bound,
        monotonic,
        hostname,
        tags,
        flush_first_value=False,
    ):
        check_tag_names(name, tags)
        self._histogram_buckets[name].append(
            HistogramBucketStub(name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value)
        )

    def metrics(self, name):
        """
        Return the metrics received under the given name
        """
        return [
            MetricStub(
                ensure_unicode(stub.name),
                stub.type,
                stub.value,
                normalize_tags(stub.tags),
                ensure_unicode(stub.hostname),
                stub.device,
                stub.flush_first_value,
            )
            for stub in self._metrics.get(to_native_string(name), [])
        ]

    def service_checks(self, name):
        """
        Return the service checks received under the given name
        """
        return [
            ServiceCheckStub(
                ensure_unicode(stub.check_id),
                ensure_unicode(stub.name),
                stub.status,
                normalize_tags(stub.tags),
                ensure_unicode(stub.hostname),
                ensure_unicode(stub.message),
            )
            for stub in self._service_checks.get(to_native_string(name), [])
        ]

    @property
    def events(self):
        """
        Return all events
        """
        return self._events

    def get_event_platform_events(self, event_type, parse_json=True):
        """
        Return all event platform events for the event_type
        """
        return [json.loads(e) if parse_json else e for e in self._event_platform_events[event_type]]

    def histogram_bucket(self, name):
        """
        Return the histogram buckets received under the given name
        """
        return [
            HistogramBucketStub(
                ensure_unicode(stub.name),
                stub.value,
                stub.lower_bound,
                stub.upper_bound,
                stub.monotonic,
                ensure_unicode(stub.hostname),
                normalize_tags(stub.tags),
                stub.flush_first_value,
            )
            for stub in self._histogram_buckets.get(to_native_string(name), [])
        ]

    def assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1):
        """
        Assert a metric is tagged with tag
        """
        self._asserted.add(metric_name)

        candidates = []
        candidates_with_tag = []
        for metric in self.metrics(metric_name):
            candidates.append(metric)
            if tag in metric.tags:
                candidates_with_tag.append(metric)

        if candidates_with_tag:  # The metric was found with the tag but not enough times
            msg = "The metric '{}' with tag '{}' was only found {}/{} times".format(metric_name, tag, count, at_least)
        elif candidates:
            msg = (
                "The metric '{}' was found but not with the tag '{}'.\n"
                + "Similar submitted:\n"
                + "\n".join(["     {}".format(m) for m in candidates])
            )
        else:
            msg = "Metric '{}' not found".format(metric_name)

        if count is not None:
            assert len(candidates_with_tag) == count, msg
        else:
            assert len(candidates_with_tag) >= at_least, msg

    # Potential kwargs: aggregation_key, alert_type, event_type,
    # msg_title, source_type_name
    def assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs):
        candidates = []
        for e in self.events:
            if exact_match and msg_text != e['msg_text'] or msg_text not in e['msg_text']:
                continue
            if tags and set(tags) != set(e['tags']):
                continue
            for name, value in iteritems(kwargs):
                if e[name] != value:
                    break
            else:
                candidates.append(e)

        msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(msg_text, count, at_least)
        if count is not None:
            assert len(candidates) == count, msg
        else:
            assert len(candidates) >= at_least, msg

    def assert_histogram_bucket(
        self,
        name,
        value,
        lower_bound,
        upper_bound,
        monotonic,
        hostname,
        tags,
        count=None,
        at_least=1,
        flush_first_value=None,
    ):
        expected_tags = normalize_tags(tags, sort=True)

        candidates = []
        for bucket in self.histogram_bucket(name):
            if value is not None and value != bucket.value:
                continue

            if expected_tags and expected_tags != sorted(bucket.tags):
                continue

            if hostname and hostname != bucket.hostname:
                continue

            if monotonic != bucket.monotonic:
                continue

            if flush_first_value is not None and flush_first_value != bucket.flush_first_value:
                continue

            candidates.append(bucket)

        expected_bucket = HistogramBucketStub(
            name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value
        )

        if count is not None:
            msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
            condition = len(candidates) == count
        else:
            msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
            condition = len(candidates) >= at_least
        self._assert(
            condition=condition, msg=msg, expected_stub=expected_bucket, submitted_elements=self._histogram_buckets
        )

    def assert_metric(
        self,
        name,
        value=None,
        tags=None,
        count=None,
        at_least=1,
        hostname=None,
        metric_type=None,
        device=None,
        flush_first_value=None,
    ):
        """
        Assert a metric was processed by this stub
        """

        self._asserted.add(name)
        expected_tags = normalize_tags(tags, sort=True)

        candidates = []
        for metric in self.metrics(name):
            if value is not None and not self.is_aggregate(metric.type) and value != metric.value:
                continue

            if expected_tags and expected_tags != sorted(metric.tags):
                continue

            if hostname is not None and hostname != metric.hostname:
                continue

            if metric_type is not None and metric_type != metric.type:
                continue

            if device is not None and device != metric.device:
                continue

            if flush_first_value is not None and flush_first_value != metric.flush_first_value:
                continue

            candidates.append(metric)

        expected_metric = MetricStub(name, metric_type, value, expected_tags, hostname, device, flush_first_value)

        if value is not None and candidates and all(self.is_aggregate(m.type) for m in candidates):
            got = sum(m.value for m in candidates)
            msg = "Expected count value for '{}': {}, got {}".format(name, value, got)
            condition = value == got
        elif count is not None:
            msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
            condition = len(candidates) == count
        else:
            msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
            condition = len(candidates) >= at_least
        self._assert(condition, msg=msg, expected_stub=expected_metric, submitted_elements=self._metrics)

    def assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None):
        """
        Assert a service check was processed by this stub
        """
        tags = normalize_tags(tags, sort=True)
        candidates = []
        for sc in self.service_checks(name):
            if status is not None and status != sc.status:
                continue

            if tags and tags != sorted(sc.tags):
                continue

            if hostname is not None and hostname != sc.hostname:
                continue

            if message is not None and message != sc.message:
                continue

            candidates.append(sc)

        expected_service_check = ServiceCheckStub(
            None, name=name, status=status, tags=tags, hostname=hostname, message=message
        )

        if count is not None:
            msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
            condition = len(candidates) == count
        else:
            msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
            condition = len(candidates) >= at_least
        self._assert(
            condition=condition, msg=msg, expected_stub=expected_service_check, submitted_elements=self._service_checks
        )

    @staticmethod
    def _assert(condition, msg, expected_stub, submitted_elements):
        new_msg = msg
        if not condition:  # It's costly to build the message with similar metrics, so it's built only on failure.
            new_msg = "{}\n{}".format(msg, build_similar_elements_msg(expected_stub, submitted_elements))
        assert condition, new_msg

    def assert_all_metrics_covered(self):
        # use `condition` to avoid building the `msg` if not needed
        condition = self.metrics_asserted_pct >= 100.0
        msg = ''
        if not condition:
            prefix = '\n\t- '
            msg = 'Some metrics are collected but not asserted:'
            msg += '\nAsserted Metrics:{}{}'.format(prefix, prefix.join(sorted(self._asserted)))
            msg += '\nFound metrics that are not asserted:{}{}'.format(prefix, prefix.join(sorted(self.not_asserted())))
        assert condition, msg

    def assert_metrics_using_metadata(
        self, metadata_metrics, check_metric_type=True, check_submission_type=False, exclude=None
    ):
        """
        Assert metrics using metadata.csv

        Checking type: By default we are asserting the in-app metric type (`check_submission_type=False`),
        asserting this type make sense for e2e (metrics collected from agent).
        For integrations tests, we can check the submission type with `check_submission_type=True`, or
        use `check_metric_type=False` not to check types.

        Usage:

            from datadog_checks.dev.utils import get_metadata_metrics
            aggregator.assert_metrics_using_metadata(get_metadata_metrics())

        """

        exclude = exclude or []
        errors = set()
        for metric_name, metric_stubs in iteritems(self._metrics):
            if metric_name in exclude:
                continue
            for metric_stub in metric_stubs:
                metric_stub_name = backend_normalize_metric_name(metric_stub.name)
                actual_metric_type = AggregatorStub.METRIC_ENUM_MAP_REV[metric_stub.type]

                # We only check `*.count` metrics for histogram and historate submissions
                # Note: all Openmetrics histogram and summary metrics are actually separately submitted
                if check_submission_type and actual_metric_type in ['histogram', 'historate']:
                    metric_stub_name += '.count'

                # Checking the metric is in `metadata.csv`
                if metric_stub_name not in metadata_metrics:
                    errors.add("Expect `{}` to be in metadata.csv.".format(metric_stub_name))
                    continue

                expected_metric_type = metadata_metrics[metric_stub_name]['metric_type']
                if check_submission_type:
                    # Integration tests type mapping
                    actual_metric_type = AggregatorStub.METRIC_TYPE_SUBMISSION_TO_BACKEND_MAP[actual_metric_type]
                else:
                    # E2E tests
                    if actual_metric_type == 'monotonic_count' and expected_metric_type == 'count':
                        actual_metric_type = 'count'

                if check_metric_type:
                    if expected_metric_type != actual_metric_type:
                        errors.add(
                            "Expect `{}` to have type `{}` but got `{}`.".format(
                                metric_stub_name, expected_metric_type, actual_metric_type
                            )
                        )

        assert not errors, "Metadata assertion errors using metadata.csv:" + "\n\t- ".join([''] + sorted(errors))

    def assert_no_duplicate_all(self):
        """
        Assert no duplicate metrics and service checks have been submitted.
        """
        self.assert_no_duplicate_metrics()
        self.assert_no_duplicate_service_checks()

    def assert_no_duplicate_metrics(self):
        """
        Assert no duplicate metrics have been submitted.

        Metrics are considered duplicate when all following fields match:

        - metric name
        - type (gauge, rate, etc)
        - tags
        - hostname
        """
        # metric types that intended to be called multiple times are ignored
        ignored_types = [self.COUNT, self.COUNTER]
        metric_stubs = [m for metrics in self._metrics.values() for m in metrics if m.type not in ignored_types]

        def stub_to_key_fn(stub):
            return stub.name, stub.type, str(sorted(stub.tags)), stub.hostname

        self._assert_no_duplicate_stub('metric', metric_stubs, stub_to_key_fn)

    def assert_no_duplicate_service_checks(self):
        """
        Assert no duplicate service checks have been submitted.

        Service checks are considered duplicate when all following fields match:
            - metric name
            - status
            - tags
            - hostname
        """
        service_check_stubs = [m for metrics in self._service_checks.values() for m in metrics]

        def stub_to_key_fn(stub):
            return stub.name, stub.status, str(sorted(stub.tags)), stub.hostname

        self._assert_no_duplicate_stub('service_check', service_check_stubs, stub_to_key_fn)

    @staticmethod
    def _assert_no_duplicate_stub(stub_type, all_metrics, stub_to_key_fn):
        all_contexts = defaultdict(list)
        for metric in all_metrics:
            context = stub_to_key_fn(metric)
            all_contexts[context].append(metric)

        dup_contexts = defaultdict(list)
        for context, metrics in iteritems(all_contexts):
            if len(metrics) > 1:
                dup_contexts[context] = metrics

        err_msg_lines = ["Duplicate {}s found:".format(stub_type)]
        for key in sorted(dup_contexts):
            contexts = dup_contexts[key]
            err_msg_lines.append('- {}'.format(contexts[0].name))
            for metric in contexts:
                err_msg_lines.append('    ' + str(metric))

        assert len(dup_contexts) == 0, "\n".join(err_msg_lines)

    def reset(self):
        """
        Set the stub to its initial state
        """
        self._metrics = defaultdict(list)
        self._asserted = set()
        self._service_checks = defaultdict(list)
        self._events = []
        # dict[event_type, [events]]
        self._event_platform_events = defaultdict(list)
        self._histogram_buckets = defaultdict(list)

    def all_metrics_asserted(self):
        assert self.metrics_asserted_pct >= 100.0

    def not_asserted(self):
        present_metrics = {ensure_unicode(m) for m in self._metrics}
        return present_metrics - set(self._asserted)

    def assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1):
        candidates = []
        self._asserted.add(metric_name)

        for metric in self.metrics(metric_name):
            tags = metric.tags
            gtags = [t for t in tags if t.startswith(tag_prefix)]
            if len(gtags) > 0:
                candidates.append(metric)

        msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
        if count is not None:
            assert len(candidates) == count, msg
        else:
            assert len(candidates) >= at_least, msg

    @property
    def metrics_asserted_pct(self):
        """
        Return the metrics assertion coverage
        """
        num_metrics = len(self._metrics)
        num_asserted = len(self._asserted)

        if num_metrics == 0:
            if num_asserted == 0:
                return 100
            else:
                return 0

        # If it there have been assertions with at_least=0 the length of the num_metrics and num_asserted can match
        # even if there are different metrics in each set
        not_asserted = self.not_asserted()
        return (num_metrics - len(not_asserted)) / num_metrics * 100

    @property
    def metric_names(self):
        """
        Return all the metric names we've seen so far
        """
        return [ensure_unicode(name) for name in self._metrics.keys()]

    @property
    def service_check_names(self):
        """
        Return all the service checks names seen so far
        """
        return [ensure_unicode(name) for name in self._service_checks.keys()]

assert_all_metrics_covered(self)

Source code in datadog_checks/base/stubs/aggregator.py
def assert_all_metrics_covered(self):
    # use `condition` to avoid building the `msg` if not needed
    condition = self.metrics_asserted_pct >= 100.0
    msg = ''
    if not condition:
        prefix = '\n\t- '
        msg = 'Some metrics are collected but not asserted:'
        msg += '\nAsserted Metrics:{}{}'.format(prefix, prefix.join(sorted(self._asserted)))
        msg += '\nFound metrics that are not asserted:{}{}'.format(prefix, prefix.join(sorted(self.not_asserted())))
    assert condition, msg

assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs)

Source code in datadog_checks/base/stubs/aggregator.py
def assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs):
    candidates = []
    for e in self.events:
        if exact_match and msg_text != e['msg_text'] or msg_text not in e['msg_text']:
            continue
        if tags and set(tags) != set(e['tags']):
            continue
        for name, value in iteritems(kwargs):
            if e[name] != value:
                break
        else:
            candidates.append(e)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(msg_text, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_metric(self, name, value=None, tags=None, count=None, at_least=1, hostname=None, metric_type=None, device=None, flush_first_value=None)

Assert a metric was processed by this stub

Source code in datadog_checks/base/stubs/aggregator.py
def assert_metric(
    self,
    name,
    value=None,
    tags=None,
    count=None,
    at_least=1,
    hostname=None,
    metric_type=None,
    device=None,
    flush_first_value=None,
):
    """
    Assert a metric was processed by this stub
    """

    self._asserted.add(name)
    expected_tags = normalize_tags(tags, sort=True)

    candidates = []
    for metric in self.metrics(name):
        if value is not None and not self.is_aggregate(metric.type) and value != metric.value:
            continue

        if expected_tags and expected_tags != sorted(metric.tags):
            continue

        if hostname is not None and hostname != metric.hostname:
            continue

        if metric_type is not None and metric_type != metric.type:
            continue

        if device is not None and device != metric.device:
            continue

        if flush_first_value is not None and flush_first_value != metric.flush_first_value:
            continue

        candidates.append(metric)

    expected_metric = MetricStub(name, metric_type, value, expected_tags, hostname, device, flush_first_value)

    if value is not None and candidates and all(self.is_aggregate(m.type) for m in candidates):
        got = sum(m.value for m in candidates)
        msg = "Expected count value for '{}': {}, got {}".format(name, value, got)
        condition = value == got
    elif count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(condition, msg=msg, expected_stub=expected_metric, submitted_elements=self._metrics)

assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1)

Assert a metric is tagged with tag

Source code in datadog_checks/base/stubs/aggregator.py
def assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1):
    """
    Assert a metric is tagged with tag
    """
    self._asserted.add(metric_name)

    candidates = []
    candidates_with_tag = []
    for metric in self.metrics(metric_name):
        candidates.append(metric)
        if tag in metric.tags:
            candidates_with_tag.append(metric)

    if candidates_with_tag:  # The metric was found with the tag but not enough times
        msg = "The metric '{}' with tag '{}' was only found {}/{} times".format(metric_name, tag, count, at_least)
    elif candidates:
        msg = (
            "The metric '{}' was found but not with the tag '{}'.\n"
            + "Similar submitted:\n"
            + "\n".join(["     {}".format(m) for m in candidates])
        )
    else:
        msg = "Metric '{}' not found".format(metric_name)

    if count is not None:
        assert len(candidates_with_tag) == count, msg
    else:
        assert len(candidates_with_tag) >= at_least, msg

assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1)

Source code in datadog_checks/base/stubs/aggregator.py
def assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1):
    candidates = []
    self._asserted.add(metric_name)

    for metric in self.metrics(metric_name):
        tags = metric.tags
        gtags = [t for t in tags if t.startswith(tag_prefix)]
        if len(gtags) > 0:
            candidates.append(metric)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_no_duplicate_metrics(self)

Assert no duplicate metrics have been submitted.

Metrics are considered duplicate when all following fields match:

  • metric name
  • type (gauge, rate, etc)
  • tags
  • hostname
Source code in datadog_checks/base/stubs/aggregator.py
def assert_no_duplicate_metrics(self):
    """
    Assert no duplicate metrics have been submitted.

    Metrics are considered duplicate when all following fields match:

    - metric name
    - type (gauge, rate, etc)
    - tags
    - hostname
    """
    # metric types that intended to be called multiple times are ignored
    ignored_types = [self.COUNT, self.COUNTER]
    metric_stubs = [m for metrics in self._metrics.values() for m in metrics if m.type not in ignored_types]

    def stub_to_key_fn(stub):
        return stub.name, stub.type, str(sorted(stub.tags)), stub.hostname

    self._assert_no_duplicate_stub('metric', metric_stubs, stub_to_key_fn)

assert_no_duplicate_service_checks(self)

Assert no duplicate service checks have been submitted.

Service checks are considered duplicate when all following fields match: - metric name - status - tags - hostname

Source code in datadog_checks/base/stubs/aggregator.py
def assert_no_duplicate_service_checks(self):
    """
    Assert no duplicate service checks have been submitted.

    Service checks are considered duplicate when all following fields match:
        - metric name
        - status
        - tags
        - hostname
    """
    service_check_stubs = [m for metrics in self._service_checks.values() for m in metrics]

    def stub_to_key_fn(stub):
        return stub.name, stub.status, str(sorted(stub.tags)), stub.hostname

    self._assert_no_duplicate_stub('service_check', service_check_stubs, stub_to_key_fn)

assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None)

Assert a service check was processed by this stub

Source code in datadog_checks/base/stubs/aggregator.py
def assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None):
    """
    Assert a service check was processed by this stub
    """
    tags = normalize_tags(tags, sort=True)
    candidates = []
    for sc in self.service_checks(name):
        if status is not None and status != sc.status:
            continue

        if tags and tags != sorted(sc.tags):
            continue

        if hostname is not None and hostname != sc.hostname:
            continue

        if message is not None and message != sc.message:
            continue

        candidates.append(sc)

    expected_service_check = ServiceCheckStub(
        None, name=name, status=status, tags=tags, hostname=hostname, message=message
    )

    if count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(
        condition=condition, msg=msg, expected_stub=expected_service_check, submitted_elements=self._service_checks
    )

reset(self)

Set the stub to its initial state

Source code in datadog_checks/base/stubs/aggregator.py
def reset(self):
    """
    Set the stub to its initial state
    """
    self._metrics = defaultdict(list)
    self._asserted = set()
    self._service_checks = defaultdict(list)
    self._events = []
    # dict[event_type, [events]]
    self._event_platform_events = defaultdict(list)
    self._histogram_buckets = defaultdict(list)

datadog_checks.base.stubs.datadog_agent.DatadogAgentStub

This implements the methods defined by the Agent's C bindings which in turn call the Go backend.

It also provides utility methods for test assertions.

Source code in datadog_checks/base/stubs/datadog_agent.py
class DatadogAgentStub(object):
    """
    This implements the methods defined by the Agent's
    [C bindings](https://github.com/DataDog/datadog-agent/blob/master/rtloader/common/builtins/datadog_agent.c)
    which in turn call the
    [Go backend](https://github.com/DataDog/datadog-agent/blob/master/pkg/collector/python/datadog_agent.go).

    It also provides utility methods for test assertions.
    """

    def __init__(self):
        self._metadata = {}
        self._cache = {}
        self._config = self.get_default_config()
        self._hostname = 'stubbed.hostname'
        self._process_start_time = 0

    def get_default_config(self):
        return {'enable_metadata_collection': True, 'disable_unsafe_yaml': True}

    def reset(self):
        self._metadata.clear()
        self._cache.clear()
        self._config = self.get_default_config()
        self._process_start_time = 0

    def assert_metadata(self, check_id, data):
        actual = {}
        for name in data:
            key = (check_id, name)
            if key in self._metadata:
                actual[name] = self._metadata[key]
        assert data == actual

    def assert_metadata_count(self, count):
        assert len(self._metadata) == count

    def get_hostname(self):
        return self._hostname

    def set_hostname(self, hostname):
        self._hostname = hostname

    def reset_hostname(self):
        self._hostname = 'stubbed.hostname'

    def get_config(self, config_option):
        return self._config.get(config_option, '')

    def get_version(self):
        return '0.0.0'

    def log(self, *args, **kwargs):
        pass

    def set_check_metadata(self, check_id, name, value):
        self._metadata[(check_id, name)] = value

    def set_external_tags(self, *args, **kwargs):
        pass

    def tracemalloc_enabled(self, *args, **kwargs):
        return False

    def write_persistent_cache(self, key, value):
        self._cache[key] = value

    def read_persistent_cache(self, key):
        return self._cache.get(key, '')

    def obfuscate_sql(self, query, options=None):
        # Full obfuscation implementation is in go code.
        if options:
            # Options provided is a JSON string because the Go stub requires it, whereas
            # the python stub does not for things such as testing.
            if json.loads(options).get('return_json_metadata', False):
                return json.dumps({'query': re.sub(r'\s+', ' ', query or '').strip(), 'metadata': {}})
        return re.sub(r'\s+', ' ', query or '').strip()

    def obfuscate_sql_exec_plan(self, plan, normalize=False):
        # Passthrough stub: obfuscation implementation is in Go code.
        return plan

    def get_process_start_time(self):
        return self._process_start_time

    def set_process_start_time(self, time):
        self._process_start_time = time

assert_metadata(self, check_id, data)

Source code in datadog_checks/base/stubs/datadog_agent.py
def assert_metadata(self, check_id, data):
    actual = {}
    for name in data:
        key = (check_id, name)
        if key in self._metadata:
            actual[name] = self._metadata[key]
    assert data == actual

assert_metadata_count(self, count)

Source code in datadog_checks/base/stubs/datadog_agent.py
def assert_metadata_count(self, count):
    assert len(self._metadata) == count

reset(self)

Source code in datadog_checks/base/stubs/datadog_agent.py
def reset(self):
    self._metadata.clear()
    self._cache.clear()
    self._config = self.get_default_config()
    self._process_start_time = 0

Last update: June 22, 2020