Skip to content

API


datadog_checks.base.checks.base.AgentCheck

The base class for any Agent based integration.

In general, you don't need to and you should not override anything from the base class except the check method but sometimes it might be useful for a Check to have its own constructor.

When overriding __init__ you have to remember that, depending on the configuration, the Agent might create several different Check instances and the method would be called as many times.

Agent 6,7 signature:

AgentCheck(name, init_config, instances)    # instances contain only 1 instance
AgentCheck.check(instance)

Agent 8 signature:

AgentCheck(name, init_config, instance)     # one instance
AgentCheck.check()                          # no more instance argument for check method

Note

when loading a Custom check, the Agent will inspect the module searching for a subclass of AgentCheck. If such a class exists but has been derived in turn, it'll be ignored - you should never derive from an existing Check.

Source code in datadog_checks_base/datadog_checks/base/checks/base.py
@traced_class
class AgentCheck(object):
    """
    The base class for any Agent based integration.

    In general, you don't need to and you should not override anything from the base
    class except the `check` method but sometimes it might be useful for a Check to
    have its own constructor.

    When overriding `__init__` you have to remember that, depending on the configuration,
    the Agent might create several different Check instances and the method would be
    called as many times.

    Agent 6,7 signature:

        AgentCheck(name, init_config, instances)    # instances contain only 1 instance
        AgentCheck.check(instance)

    Agent 8 signature:

        AgentCheck(name, init_config, instance)     # one instance
        AgentCheck.check()                          # no more instance argument for check method

    !!! note
        when loading a Custom check, the Agent will inspect the module searching
        for a subclass of `AgentCheck`. If such a class exists but has been derived in
        turn, it'll be ignored - **you should never derive from an existing Check**.
    """

    # If defined, this will be the prefix of every metric/service check and the source type of events
    __NAMESPACE__ = ''

    OK, WARNING, CRITICAL, UNKNOWN = ServiceCheck

    # Used by `self.http` for an instance of RequestsWrapper
    HTTP_CONFIG_REMAPPER = None

    # Used by `create_tls_context` for an instance of RequestsWrapper
    TLS_CONFIG_REMAPPER = None

    # Used by `self.set_metadata` for an instance of MetadataManager
    #
    # This is a mapping of metadata names to functions. When you call `self.set_metadata(name, value, **options)`,
    # if `name` is in this mapping then the corresponding function will be called with the `value`, and the
    # return value(s) will be sent instead.
    #
    # Transformer functions must satisfy the following signature:
    #
    #    def transform_<NAME>(value: Any, options: dict) -> Union[str, Dict[str, str]]:
    #
    # If the return type is a string, then it will be sent as the value for `name`. If the return type is
    # a mapping type, then each key will be considered a `name` and will be sent with its (str) value.
    METADATA_TRANSFORMERS = None

    FIRST_CAP_RE = re.compile(br'(.)([A-Z][a-z]+)')
    ALL_CAP_RE = re.compile(br'([a-z0-9])([A-Z])')
    METRIC_REPLACEMENT = re.compile(br'([^a-zA-Z0-9_.]+)|(^[^a-zA-Z]+)')
    TAG_REPLACEMENT = re.compile(br'[,\+\*\-/()\[\]{}\s]')
    MULTIPLE_UNDERSCORE_CLEANUP = re.compile(br'__+')
    DOT_UNDERSCORE_CLEANUP = re.compile(br'_*\._*')

    # allows to set a limit on the number of metric name and tags combination
    # this check can send per run. This is useful for checks that have an unbounded
    # number of tag values that depend on the input payload.
    # The logic counts one set of tags per gauge/rate/monotonic_count call, and de-duplicates
    # sets of tags for other metric types. The first N sets of tags in submission order will
    # be sent to the aggregator, the rest are dropped. The state is reset after each run.
    # See https://github.com/DataDog/integrations-core/pull/2093 for more information.
    DEFAULT_METRIC_LIMIT = 0

    # Allow tracing for classic integrations
    def __init_subclass__(cls, *args, **kwargs):
        try:
            # https://github.com/python/mypy/issues/4660
            super().__init_subclass__(*args, **kwargs)  # type: ignore
            return traced_class(cls)
        except Exception:
            return cls

    def __init__(self, *args, **kwargs):
        # type: (*Any, **Any) -> None
        """
        Parameters:
            name (str):
                the name of the check
            init_config (dict):
                the `init_config` section of the configuration.
            instance (list[dict]):
                a one-element list containing the instance options from the
                configuration file (a list is used to keep backward compatibility with
                older versions of the Agent).
        """
        # NOTE: these variable assignments exist to ease type checking when eventually assigned as attributes.
        name = kwargs.get('name', '')
        init_config = kwargs.get('init_config', {})
        agentConfig = kwargs.get('agentConfig', {})
        instances = kwargs.get('instances', [])

        if len(args) > 0:
            name = args[0]
        if len(args) > 1:
            init_config = args[1]
        if len(args) > 2:
            # agent pass instances as tuple but in test we are usually using list, so we are testing for both
            if len(args) > 3 or not isinstance(args[2], (list, tuple)) or 'instances' in kwargs:
                # old-style init: the 3rd argument is `agentConfig`
                agentConfig = args[2]
                if len(args) > 3:
                    instances = args[3]
            else:
                # new-style init: the 3rd argument is `instances`
                instances = args[2]

        # NOTE: Agent 6+ should pass exactly one instance... But we are not abiding by that rule on our side
        # everywhere just yet. It's complicated... See: https://github.com/DataDog/integrations-core/pull/5573
        instance = instances[0] if instances else None

        self.check_id = ''
        self.name = name  # type: str
        self.init_config = init_config  # type: InitConfigType
        self.agentConfig = agentConfig  # type: AgentConfigType
        self.instance = instance  # type: InstanceType
        self.instances = instances  # type: List[InstanceType]
        self.warnings = []  # type: List[str]
        self.disable_generic_tags = (
            is_affirmative(self.instance.get('disable_generic_tags', False)) if instance else False
        )
        self.debug_metrics = {}
        if self.init_config is not None:
            self.debug_metrics.update(self.init_config.get('debug_metrics', {}))
        if self.instance is not None:
            self.debug_metrics.update(self.instance.get('debug_metrics', {}))

        # `self.hostname` is deprecated, use `datadog_agent.get_hostname()` instead
        self.hostname = datadog_agent.get_hostname()  # type: str

        logger = logging.getLogger('{}.{}'.format(__name__, self.name))
        self.log = CheckLoggingAdapter(logger, self)

        metric_patterns = self.instance.get('metric_patterns', {}) if instance else {}
        if not isinstance(metric_patterns, dict):
            raise ConfigurationError('Setting `metric_patterns` must be a mapping')

        self.exclude_metrics_pattern = self._create_metrics_pattern(metric_patterns, 'exclude')
        self.include_metrics_pattern = self._create_metrics_pattern(metric_patterns, 'include')

        # TODO: Remove with Agent 5
        # Set proxy settings
        self.proxies = self._get_requests_proxy()
        if not self.init_config:
            self._use_agent_proxy = True
        else:
            self._use_agent_proxy = is_affirmative(self.init_config.get('use_agent_proxy', True))

        # TODO: Remove with Agent 5
        self.default_integration_http_timeout = float(self.agentConfig.get('default_integration_http_timeout', 9))

        self._deprecations = {
            'increment': (
                False,
                (
                    'DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please '
                    'use `AgentCheck.gauge` or `AgentCheck.count` instead, with a different metric name'
                ),
            ),
            'device_name': (
                False,
                (
                    'DEPRECATION NOTICE: `device_name` is deprecated, please use a `device:` '
                    'tag in the `tags` list instead'
                ),
            ),
            'in_developer_mode': (
                False,
                'DEPRECATION NOTICE: `in_developer_mode` is deprecated, please stop using it.',
            ),
            'no_proxy': (
                False,
                (
                    'DEPRECATION NOTICE: The `no_proxy` config option has been renamed '
                    'to `skip_proxy` and will be removed in a future release.'
                ),
            ),
            'service_tag': (
                False,
                (
                    'DEPRECATION NOTICE: The `service` tag is deprecated and has been renamed to `%s`. '
                    'Set `disable_legacy_service_tag` to `true` to disable this warning. '
                    'The default will become `true` and cannot be changed in Agent version 8.'
                ),
            ),
            '_config_renamed': (
                False,
                (
                    'DEPRECATION NOTICE: The `%s` config option has been renamed '
                    'to `%s` and will be removed in a future release.'
                ),
            ),
        }  # type: Dict[str, Tuple[bool, str]]

        # Setup metric limits
        self.metric_limiter = self._get_metric_limiter(self.name, instance=self.instance)

        # Lazily load and validate config
        self._config_model_instance = None  # type: Any
        self._config_model_shared = None  # type: Any

        # Functions that will be called exactly once (if successful) before the first check run
        self.check_initializations = deque()  # type: Deque[Callable[[], None]]

        self.check_initializations.append(self.load_configuration_models)

        self.__formatted_tags = None
        self.__logs_enabled = None

        if os.environ.get("GOFIPS", "0") == "1":
            enable_fips()

    def _create_metrics_pattern(self, metric_patterns, option_name):
        all_patterns = metric_patterns.get(option_name, [])

        if not isinstance(all_patterns, list):
            raise ConfigurationError('Setting `{}` of `metric_patterns` must be an array'.format(option_name))

        metrics_patterns = []
        for i, entry in enumerate(all_patterns, 1):
            if not isinstance(entry, str):
                raise ConfigurationError(
                    'Entry #{} of setting `{}` of `metric_patterns` must be a string'.format(i, option_name)
                )
            if not entry:
                self.log.debug(
                    'Entry #%s of setting `%s` of `metric_patterns` must not be empty, ignoring', i, option_name
                )
                continue

            metrics_patterns.append(entry)

        if metrics_patterns:
            return re.compile('|'.join(metrics_patterns))

        return None

    def _get_metric_limiter(self, name, instance=None):
        # type: (str, InstanceType) -> Optional[Limiter]
        limit = self._get_metric_limit(instance=instance)

        if limit > 0:
            return Limiter(name, 'metrics', limit, self.warning)

        return None

    def _get_metric_limit(self, instance=None):
        # type: (InstanceType) -> int
        if instance is None:
            # NOTE: Agent 6+ will now always pass an instance when calling into a check, but we still need to
            # account for this case due to some tests not always passing an instance on init.
            self.log.debug(
                "No instance provided (this is deprecated!). Reverting to the default metric limit: %s",
                self.DEFAULT_METRIC_LIMIT,
            )
            return self.DEFAULT_METRIC_LIMIT

        max_returned_metrics = instance.get('max_returned_metrics', self.DEFAULT_METRIC_LIMIT)

        try:
            limit = int(max_returned_metrics)
        except (ValueError, TypeError):
            self.warning(
                "Configured 'max_returned_metrics' cannot be interpreted as an integer: %s. "
                "Reverting to the default limit: %s",
                max_returned_metrics,
                self.DEFAULT_METRIC_LIMIT,
            )
            return self.DEFAULT_METRIC_LIMIT

        # Do not allow to disable limiting if the class has set a non-zero default value.
        if limit == 0 and self.DEFAULT_METRIC_LIMIT > 0:
            self.warning(
                "Setting 'max_returned_metrics' to zero is not allowed. Reverting to the default metric limit: %s",
                self.DEFAULT_METRIC_LIMIT,
            )
            return self.DEFAULT_METRIC_LIMIT

        return limit

    @staticmethod
    def load_config(yaml_str):
        # type: (str) -> Any
        """
        Convenience wrapper to ease programmatic use of this class from the C API.
        """
        return yaml.safe_load(yaml_str)

    @property
    def http(self):
        # type: () -> RequestsWrapper
        """
        Provides logic to yield consistent network behavior based on user configuration.

        Only new checks or checks on Agent 6.13+ can and should use this for HTTP requests.
        """
        if not hasattr(self, '_http'):
            self._http = RequestsWrapper(self.instance or {}, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log)

        return self._http

    @property
    def logs_enabled(self):
        # type: () -> bool
        """
        Returns True if logs are enabled, False otherwise.
        """
        if self.__logs_enabled is None:
            self.__logs_enabled = bool(datadog_agent.get_config('logs_enabled'))

        return self.__logs_enabled

    @property
    def formatted_tags(self):
        # type: () -> str
        if self.__formatted_tags is None:
            normalized_tags = set()
            for tag in self.instance.get('tags', []):
                key, _, value = tag.partition(':')
                if not value:
                    continue

                if self.disable_generic_tags and key in GENERIC_TAGS:
                    key = '{}_{}'.format(self.name, key)

                normalized_tags.add('{}:{}'.format(key, value))

            self.__formatted_tags = ','.join(sorted(normalized_tags))

        return self.__formatted_tags

    @property
    def diagnosis(self):
        # type: () -> Diagnosis
        """
        A Diagnosis object to register explicit diagnostics and record diagnoses.
        """
        if not hasattr(self, '_diagnosis'):
            self._diagnosis = Diagnosis(sanitize=self.sanitize)
        return self._diagnosis

    def get_tls_context(self, refresh=False, overrides=None):
        # type: (bool, Dict[AnyStr, Any]) -> ssl.SSLContext
        """
        Creates and cache an SSLContext instance based on user configuration.
        Note that user configuration can be overridden by using `overrides`.
        This should only be applied to older integration that manually set config values.

        Since: Agent 7.24
        """
        if not hasattr(self, '_tls_context_wrapper'):
            self._tls_context_wrapper = TlsContextWrapper(
                self.instance or {}, self.TLS_CONFIG_REMAPPER, overrides=overrides
            )

        if refresh:
            self._tls_context_wrapper.refresh_tls_context()

        return self._tls_context_wrapper.tls_context

    @property
    def metadata_manager(self):
        # type: () -> MetadataManager
        """
        Used for sending metadata via Go bindings.
        """
        if not hasattr(self, '_metadata_manager'):
            if not self.check_id and AGENT_RUNNING:
                raise RuntimeError('Attribute `check_id` must be set')

            self._metadata_manager = MetadataManager(self.name, self.check_id, self.log, self.METADATA_TRANSFORMERS)

        return self._metadata_manager

    @property
    def check_version(self):
        # type: () -> str
        """
        Return the dynamically detected integration version.
        """
        if not hasattr(self, '_check_version'):
            # 'datadog_checks.<PACKAGE>.<MODULE>...'
            module_parts = self.__module__.split('.')
            package_path = '.'.join(module_parts[:2])
            package = importlib.import_module(package_path)

            # Provide a default just in case
            self._check_version = getattr(package, '__version__', '0.0.0')

        return self._check_version

    @property
    def in_developer_mode(self):
        # type: () -> bool
        self._log_deprecation('in_developer_mode')
        return False

    def log_typos_in_options(self, user_config, models_config, level):
        # only import it when running in python 3
        from jellyfish import jaro_winkler_similarity

        user_configs = user_config or {}  # type: Dict[str, Any]
        models_config = models_config or {}
        typos = set()  # type: Set[str]

        known_options = {k for k, _ in models_config}  # type: Set[str]

        if isinstance(models_config, BaseModel):
            # Also add aliases, if any
            known_options.update(set(models_config.model_dump(by_alias=True)))

        unknown_options = [option for option in user_configs.keys() if option not in known_options]  # type: List[str]

        for unknown_option in unknown_options:
            similar_known_options = []  # type: List[Tuple[str, int]]
            for known_option in known_options:
                ratio = jaro_winkler_similarity(unknown_option, known_option)
                if ratio > TYPO_SIMILARITY_THRESHOLD:
                    similar_known_options.append((known_option, ratio))
                    typos.add(unknown_option)

            if len(similar_known_options) > 0:
                similar_known_options.sort(key=lambda option: option[1], reverse=True)
                similar_known_options_names = [option[0] for option in similar_known_options]  # type: List[str]
                message = (
                    'Detected potential typo in configuration option in {}/{} section: `{}`. Did you mean {}?'
                ).format(self.name, level, unknown_option, ', or '.join(similar_known_options_names))
                self.log.warning(message)
        return typos

    def load_configuration_models(self, package_path=None):
        if package_path is None:
            # 'datadog_checks.<PACKAGE>.<MODULE>...'
            module_parts = self.__module__.split('.')
            package_path = '{}.config_models'.format('.'.join(module_parts[:2]))
        if self._config_model_shared is None:
            shared_config = copy.deepcopy(self.init_config)
            context = self._get_config_model_context(shared_config)
            shared_model = self.load_configuration_model(package_path, 'SharedConfig', shared_config, context)
            try:
                self.log_typos_in_options(shared_config, shared_model, 'init_config')
            except Exception as e:
                self.log.debug("Failed to detect typos in `init_config` section: %s", e)
            if shared_model is not None:
                self._config_model_shared = shared_model

        if self._config_model_instance is None:
            instance_config = copy.deepcopy(self.instance)
            context = self._get_config_model_context(instance_config)
            instance_model = self.load_configuration_model(package_path, 'InstanceConfig', instance_config, context)
            try:
                self.log_typos_in_options(instance_config, instance_model, 'instances')
            except Exception as e:
                self.log.debug("Failed to detect typos in `instances` section: %s", e)
            if instance_model is not None:
                self._config_model_instance = instance_model

    @staticmethod
    def load_configuration_model(import_path, model_name, config, context):
        try:
            package = importlib.import_module(import_path)
        except ModuleNotFoundError as e:
            # Don't fail if there are no models
            if str(e).startswith('No module named '):
                return

            raise

        model = getattr(package, model_name, None)
        if model is not None:
            try:
                config_model = model.model_validate(config, context=context)
            except ValidationError as e:
                errors = e.errors()
                num_errors = len(errors)
                message_lines = [
                    'Detected {} error{} while loading configuration model `{}`:'.format(
                        num_errors, 's' if num_errors > 1 else '', model_name
                    )
                ]

                for error in errors:
                    message_lines.append(
                        ' -> '.join(
                            # Start array indexes at one for user-friendliness
                            str(loc + 1) if isinstance(loc, int) else str(loc)
                            for loc in error['loc']
                        )
                    )
                    message_lines.append('  {}'.format(error['msg']))

                raise ConfigurationError('\n'.join(message_lines)) from None
            else:
                return config_model

    def _get_config_model_context(self, config):
        return {'logger': self.log, 'warning': self.warning, 'configured_fields': frozenset(config)}

    def register_secret(self, secret):
        # type: (str) -> None
        """
        Register a secret to be scrubbed by `.sanitize()`.
        """
        if not hasattr(self, '_sanitizer'):
            # Configure lazily so that checks that don't use sanitization aren't affected.
            self._sanitizer = SecretsSanitizer()
            self.log.setup_sanitization(sanitize=self.sanitize)

        self._sanitizer.register(secret)

    def sanitize(self, text):
        # type: (str) -> str
        """
        Scrub any registered secrets in `text`.
        """
        try:
            sanitizer = self._sanitizer
        except AttributeError:
            return text
        else:
            return sanitizer.sanitize(text)

    def _context_uid(self, mtype, name, tags=None, hostname=None):
        # type: (int, str, Sequence[str], str) -> str
        return '{}-{}-{}-{}'.format(mtype, name, tags if tags is None else hash(frozenset(tags)), hostname)

    def submit_histogram_bucket(
        self, name, value, lower_bound, upper_bound, monotonic, hostname, tags, raw=False, flush_first_value=False
    ):
        # type: (str, float, int, int, bool, str, Sequence[str], bool, bool) -> None
        if value is None:
            # ignore metric sample
            return

        # make sure the value (bucket count) is an integer
        try:
            value = int(value)
        except ValueError:
            err_msg = 'Histogram: {} has non integer value: {}. Only integer are valid bucket values (count).'.format(
                repr(name), repr(value)
            )
            if not AGENT_RUNNING:
                raise ValueError(err_msg)
            self.warning(err_msg)
            return

        tags = self._normalize_tags_type(tags, metric_name=name)
        if hostname is None:
            hostname = ''

        aggregator.submit_histogram_bucket(
            self,
            self.check_id,
            self._format_namespace(name, raw),
            value,
            lower_bound,
            upper_bound,
            monotonic,
            hostname,
            tags,
            flush_first_value,
        )

    def database_monitoring_query_sample(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-samples")

    def database_monitoring_query_metrics(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-metrics")

    def database_monitoring_query_activity(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-activity")

    def database_monitoring_metadata(self, raw_event):
        # type: (str) -> None
        if raw_event is None:
            return

        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-metadata")

    def event_platform_event(self, raw_event, event_track_type):
        # type: (str, str) -> None
        """Send an event platform event.

        Parameters:
            raw_event (str):
                JSON formatted string representing the event to send
            event_track_type (str):
                type of event ingested and processed by the event platform
        """
        if raw_event is None:
            return
        aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), event_track_type)

    def should_send_metric(self, metric_name):
        return not self._metric_excluded(metric_name) and self._metric_included(metric_name)

    def _metric_included(self, metric_name):
        if self.include_metrics_pattern is None:
            return True

        return self.include_metrics_pattern.search(metric_name) is not None

    def _metric_excluded(self, metric_name):
        if self.exclude_metrics_pattern is None:
            return False

        return self.exclude_metrics_pattern.search(metric_name) is not None

    def _submit_metric(
        self, mtype, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
    ):
        # type: (int, str, float, Sequence[str], str, str, bool, bool) -> None
        if value is None:
            # ignore metric sample
            return

        name = self._format_namespace(name, raw)
        if not self.should_send_metric(name):
            return

        tags = self._normalize_tags_type(tags or [], device_name, name)
        if hostname is None:
            hostname = ''

        if self.metric_limiter:
            if mtype in ONE_PER_CONTEXT_METRIC_TYPES:
                # Fast path for gauges, rates, monotonic counters, assume one set of tags per call
                if self.metric_limiter.is_reached():
                    return
            else:
                # Other metric types have a legit use case for several calls per set of tags, track unique sets of tags
                context = self._context_uid(mtype, name, tags, hostname)
                if self.metric_limiter.is_reached(context):
                    return

        try:
            value = float(value)
        except ValueError:
            err_msg = 'Metric: {} has non float value: {}. Only float values can be submitted as metrics.'.format(
                repr(name), repr(value)
            )
            if not AGENT_RUNNING:
                raise ValueError(err_msg)
            self.warning(err_msg)
            return

        aggregator.submit_metric(self, self.check_id, mtype, name, value, tags, hostname, flush_first_value)

    def gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a gauge metric.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def count(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a raw count metric.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def monotonic_count(
        self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
    ):
        # type: (str, float, Sequence[str], str, str, bool, bool) -> None
        """Sample an increasing counter metric.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
            flush_first_value (bool):
                whether to sample the first value
        """
        self._submit_metric(
            aggregator.MONOTONIC_COUNT,
            name,
            value,
            tags=tags,
            hostname=hostname,
            device_name=device_name,
            raw=raw,
            flush_first_value=flush_first_value,
        )

    def rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a point, with the rate calculated at the end of the check.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a histogram metric.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Sample a histogram based on rate metrics.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._submit_metric(
            aggregator.HISTORATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def increment(self, name, value=1, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Increment a counter metric.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._log_deprecation('increment')
        self._submit_metric(
            aggregator.COUNTER, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def decrement(self, name, value=-1, tags=None, hostname=None, device_name=None, raw=False):
        # type: (str, float, Sequence[str], str, str, bool) -> None
        """Decrement a counter metric.

        Parameters:
            name (str):
                the name of the metric
            value (float):
                the value for the metric
            tags (list[str]):
                a list of tags to associate with this metric
            hostname (str):
                a hostname to associate with this metric. Defaults to the current host.
            device_name (str):
                **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        self._log_deprecation('increment')
        self._submit_metric(
            aggregator.COUNTER, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
        )

    def service_check(self, name, status, tags=None, hostname=None, message=None, raw=False):
        # type: (str, ServiceCheckStatus, Sequence[str], str, str, bool) -> None
        """Send the status of a service.

        Parameters:
            name (str):
                the name of the service check
            status (int):
                a constant describing the service status
            tags (list[str]):
                a list of tags to associate with this service check
            message (str):
                additional information or a description of why this status occurred.
            raw (bool):
                whether to ignore any defined namespace prefix
        """
        tags = self._normalize_tags_type(tags or [])
        if hostname is None:
            hostname = ''
        if message is None:
            message = ''
        else:
            message = to_native_string(message)

        message = self.sanitize(message)

        aggregator.submit_service_check(
            self, self.check_id, self._format_namespace(name, raw), status, tags, hostname, message
        )

    def send_log(self, data, cursor=None, stream='default'):
        # type: (dict[str, str], dict[str, Any] | None, str) -> None
        """Send a log for submission.

        Parameters:
            data (dict[str, str]):
                The log data to send. The following keys are treated specially, if present:

                - timestamp: should be an integer or float representing the number of seconds since the Unix epoch
                - ddtags: if not defined, it will automatically be set based on the instance's `tags` option
            cursor (dict[str, Any] or None):
                Metadata associated with the log which will be saved to disk. The most recent value may be
                retrieved with the `get_log_cursor` method.
            stream (str):
                The stream associated with this log, used for accurate cursor persistence.
                Has no effect if `cursor` argument is `None`.
        """
        attributes = data.copy()
        if 'ddtags' not in attributes and self.formatted_tags:
            attributes['ddtags'] = self.formatted_tags

        timestamp = attributes.get('timestamp')
        if timestamp is not None:
            # convert seconds to milliseconds
            attributes['timestamp'] = int(timestamp * 1000)

        datadog_agent.send_log(to_json(attributes), self.check_id)
        if cursor is not None:
            self.write_persistent_cache('log_cursor_{}'.format(stream), to_json(cursor))

    def get_log_cursor(self, stream='default'):
        # type: (str) -> dict[str, Any] | None
        """Returns the most recent log cursor from disk."""
        data = self.read_persistent_cache('log_cursor_{}'.format(stream))
        return from_json(data) if data else None

    def _log_deprecation(self, deprecation_key, *args):
        # type: (str, *str) -> None
        """
        Logs a deprecation notice at most once per AgentCheck instance, for the pre-defined `deprecation_key`
        """
        sent, message = self._deprecations[deprecation_key]
        if sent:
            return

        self.warning(message, *args)
        self._deprecations[deprecation_key] = (True, message)

    # TODO: Remove once our checks stop calling it
    def service_metadata(self, meta_name, value):
        # type: (str, Any) -> None
        pass

    def set_metadata(self, name, value, **options):
        # type: (str, Any, **Any) -> None
        """Updates the cached metadata `name` with `value`, which is then sent by the Agent at regular intervals.

        Parameters:
            name (str):
                the name of the metadata
            value (Any):
                the value for the metadata. if ``name`` has no transformer defined then the
                raw ``value`` will be submitted and therefore it must be a ``str``
            options (Any):
                keyword arguments to pass to any defined transformer
        """
        self.metadata_manager.submit(name, value, options)

    @staticmethod
    def is_metadata_collection_enabled():
        # type: () -> bool
        return is_affirmative(datadog_agent.get_config('enable_metadata_collection'))

    @classmethod
    def metadata_entrypoint(cls, method):
        # type: (Callable[..., None]) -> Callable[..., None]
        """
        Skip execution of the decorated method if metadata collection is disabled on the Agent.

        Usage:

        ```python
        class MyCheck(AgentCheck):
            @AgentCheck.metadata_entrypoint
            def collect_metadata(self):
                ...
        ```
        """

        @functools.wraps(method)
        def entrypoint(self, *args, **kwargs):
            # type: (AgentCheck, *Any, **Any) -> None
            if not self.is_metadata_collection_enabled():
                return

            # NOTE: error handling still at the discretion of the wrapped method.
            method(self, *args, **kwargs)

        return entrypoint

    def _persistent_cache_id(self, key):
        # type: (str) -> str
        return '{}_{}'.format(self.check_id, key)

    def read_persistent_cache(self, key):
        # type: (str) -> str
        """Returns the value previously stored with `write_persistent_cache` for the same `key`.

        Parameters:
            key (str):
                the key to retrieve
        """
        return datadog_agent.read_persistent_cache(self._persistent_cache_id(key))

    def write_persistent_cache(self, key, value):
        # type: (str, str) -> None
        """Stores `value` in a persistent cache for this check instance.
        The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in
            - `%ProgramData%\\Datadog\\run` on Windows.
            - `/opt/datadog-agent/run` everywhere else.
        The cache is persistent between agent restarts but will be rebuilt if the check instance configuration changes.

        Parameters:
            key (str):
                the key to retrieve
            value (str):
                the value to store
        """
        datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value)

    def set_external_tags(self, external_tags):
        # type: (Sequence[ExternalTagType]) -> None
        # Example of external_tags format
        # [
        #     ('hostname', {'src_name': ['test:t1']}),
        #     ('hostname2', {'src2_name': ['test2:t3']})
        # ]
        try:
            new_tags = []
            for hostname, source_map in external_tags:
                new_tags.append((to_native_string(hostname), source_map))
                for src_name, tags in source_map.items():
                    source_map[src_name] = self._normalize_tags_type(tags)
            datadog_agent.set_external_tags(new_tags)
        except IndexError:
            self.log.exception('Unexpected external tags format: %s', external_tags)
            raise

    def convert_to_underscore_separated(self, name):
        # type: (Union[str, bytes]) -> bytes
        """
        Convert from CamelCase to camel_case
        And substitute illegal metric characters
        """
        name = ensure_bytes(name)
        metric_name = self.FIRST_CAP_RE.sub(br'\1_\2', name)
        metric_name = self.ALL_CAP_RE.sub(br'\1_\2', metric_name).lower()
        metric_name = self.METRIC_REPLACEMENT.sub(br'_', metric_name)
        return self.DOT_UNDERSCORE_CLEANUP.sub(br'.', metric_name).strip(b'_')

    def warning(self, warning_message, *args, **kwargs):
        # type: (str, *Any, **Any) -> None
        """Log a warning message, display it in the Agent's status page and in-app.

        Using *args is intended to make warning work like log.warn/debug/info/etc
        and make it compliant with flake8 logging format linter.

        Parameters:
            warning_message (str):
                the warning message
            args (Any):
                format string args used to format the warning message e.g. `warning_message % args`
            kwargs (Any):
                not used for now, but added to match Python logger's `warning` method signature
        """
        warning_message = to_native_string(warning_message)
        # Interpolate message only if args is not empty. Same behavior as python logger:
        # https://github.com/python/cpython/blob/1dbe5373851acb85ba91f0be7b83c69563acd68d/Lib/logging/__init__.py#L368-L369
        if args:
            warning_message = warning_message % args
        frame = inspect.currentframe().f_back  # type: ignore
        lineno = frame.f_lineno
        # only log the last part of the filename, not the full path
        filename = basename(frame.f_code.co_filename)

        self.log.warning(warning_message, extra={'_lineno': lineno, '_filename': filename, '_check_id': self.check_id})
        self.warnings.append(warning_message)

    def get_warnings(self):
        # type: () -> List[str]
        """
        Return the list of warnings messages to be displayed in the info page
        """
        warnings = self.warnings
        self.warnings = []
        return warnings

    def get_diagnoses(self):
        # type: () -> str
        """
        Return the list of diagnosis as a JSON encoded string.

        The agent calls this method to retrieve diagnostics from integrations. This method
        runs explicit diagnostics if available.
        """
        return to_json([d._asdict() for d in (self.diagnosis.diagnoses + self.diagnosis.run_explicit())])

    def _get_requests_proxy(self):
        # type: () -> ProxySettings
        # TODO: Remove with Agent 5
        no_proxy_settings = {'http': None, 'https': None, 'no': []}  # type: ProxySettings

        # First we read the proxy configuration from datadog.conf
        proxies = self.agentConfig.get('proxy', datadog_agent.get_config('proxy'))
        if proxies:
            proxies = proxies.copy()

        # requests compliant dict
        if proxies and 'no_proxy' in proxies:
            proxies['no'] = proxies.pop('no_proxy')

        return proxies if proxies else no_proxy_settings

    def _format_namespace(self, s, raw=False):
        # type: (str, bool) -> str
        if not raw and self.__NAMESPACE__:
            return '{}.{}'.format(self.__NAMESPACE__, to_native_string(s))

        return to_native_string(s)

    def normalize(self, metric, prefix=None, fix_case=False):
        # type: (Union[str, bytes], Union[str, bytes], bool) -> str
        """
        Turn a metric into a well-formed metric name prefix.b.c

        Parameters:
            metric: The metric name to normalize
            prefix: A prefix to to add to the normalized name, default None
            fix_case: A boolean, indicating whether to make sure that the metric name returned is in "snake_case"
        """
        if isinstance(metric, str):
            metric = unicodedata.normalize('NFKD', metric).encode('ascii', 'ignore')

        if fix_case:
            name = self.convert_to_underscore_separated(metric)
            if prefix is not None:
                prefix = self.convert_to_underscore_separated(prefix)
        else:
            name = self.METRIC_REPLACEMENT.sub(br'_', metric)
            name = self.DOT_UNDERSCORE_CLEANUP.sub(br'.', name).strip(b'_')

        name = self.MULTIPLE_UNDERSCORE_CLEANUP.sub(br'_', name)

        if prefix is not None:
            name = ensure_bytes(prefix) + b"." + name

        return to_native_string(name)

    def normalize_tag(self, tag):
        # type: (Union[str, bytes]) -> str
        """Normalize tag values.

        This happens for legacy reasons, when we cleaned up some characters (like '-')
        which are allowed in tags.
        """
        if isinstance(tag, str):
            tag = tag.encode('utf-8', 'ignore')
        tag = self.TAG_REPLACEMENT.sub(br'_', tag)
        tag = self.MULTIPLE_UNDERSCORE_CLEANUP.sub(br'_', tag)
        tag = self.DOT_UNDERSCORE_CLEANUP.sub(br'.', tag).strip(b'_')
        return to_native_string(tag)

    def check(self, instance):
        # type: (InstanceType) -> None
        raise NotImplementedError

    def cancel(self):
        # type: () -> None
        """
        This method is called when the check in unscheduled by the agent. This
        is SIGNAL that the check is being unscheduled and can be called while
        the check is running. It's up to the python implementation to make sure
        cancel is thread safe and won't block.
        """
        pass

    def run(self):
        # type: () -> str
        try:
            self.diagnosis.clear()
            # Ignore check initializations if running in a separate process
            if is_affirmative(self.instance.get('process_isolation', self.init_config.get('process_isolation', False))):
                from ..utils.replay.execute import run_with_isolation

                run_with_isolation(self, aggregator, datadog_agent)
            else:
                while self.check_initializations:
                    initialization = self.check_initializations.popleft()
                    try:
                        initialization()
                    except Exception:
                        self.check_initializations.appendleft(initialization)
                        raise

                instance = copy.deepcopy(self.instances[0])

                if 'set_breakpoint' in self.init_config:
                    from ..utils.agent.debug import enter_pdb

                    enter_pdb(self.check, line=self.init_config['set_breakpoint'], args=(instance,))
                elif self.should_profile_memory():
                    # self.init_config['profile_memory'] could be `/tmp/datadog-agent-memory-profiler*`
                    # that is generated by Datadog Agent.
                    # If we use `--m-dir` for `agent check` command, a hidden flag, it should be same as a given value.
                    namespaces = [self.init_config['profile_memory']]
                    for id in self.check_id.split(":"):
                        namespaces.append(id)
                    self.profile_memory(func=self.check, namespaces=namespaces, args=(instance,))
                else:
                    self.check(instance)

            error_report = ''
        except Exception as e:
            message = self.sanitize(str(e))
            tb = self.sanitize(traceback.format_exc())
            error_report = to_json([{'message': message, 'traceback': tb}])
        finally:
            if self.metric_limiter:
                if is_affirmative(self.debug_metrics.get('metric_contexts', False)):
                    debug_metrics = self.metric_limiter.get_debug_metrics()

                    # Reset so we can actually submit the metrics
                    self.metric_limiter.reset()

                    tags = self.get_debug_metric_tags()
                    for metric_name, value in debug_metrics:
                        self.gauge(metric_name, value, tags=tags, raw=True)

                self.metric_limiter.reset()

        return error_report

    def event(self, event):
        # type: (Event) -> None
        """Send an event.

        An event is a dictionary with the following keys and data types:

        ```python
        {
            "timestamp": int,        # the epoch timestamp for the event
            "event_type": str,       # the event name
            "api_key": str,          # the api key for your account
            "msg_title": str,        # the title of the event
            "msg_text": str,         # the text body of the event
            "aggregation_key": str,  # a key to use for aggregating events
            "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
            "source_type_name": str, # (optional) the source type name
            "host": str,             # (optional) the name of the host
            "tags": list,            # (optional) a list of tags to associate with this event
            "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
        }
        ```

        Parameters:
            event (dict[str, Any]):
                the event to be sent
        """
        # Enforce types of some fields, considerably facilitates handling in go bindings downstream
        for key, value in event.items():
            if not isinstance(value, (str, bytes)):
                continue

            try:
                event[key] = to_native_string(value)  # type: ignore
                # ^ Mypy complains about dynamic key assignment -- arguably for good reason.
                # Ideally we should convert this to a dict literal so that submitted events only include known keys.
            except UnicodeError:
                self.log.warning('Encoding error with field `%s`, cannot submit event', key)
                return

        if event.get('tags'):
            event['tags'] = self._normalize_tags_type(event['tags'])
        if event.get('timestamp'):
            event['timestamp'] = int(event['timestamp'])
        if event.get('aggregation_key'):
            event['aggregation_key'] = to_native_string(event['aggregation_key'])

        if self.__NAMESPACE__:
            event.setdefault('source_type_name', self.__NAMESPACE__)

        aggregator.submit_event(self, self.check_id, event)

    def _normalize_tags_type(self, tags, device_name=None, metric_name=None):
        # type: (Sequence[Union[None, str, bytes]], str, str) -> List[str]
        """
        Normalize tags contents and type:
        - append `device_name` as `device:` tag
        - normalize tags type
        - doesn't mutate the passed list, returns a new list
        """
        normalized_tags = []

        if device_name:
            self._log_deprecation('device_name')
            try:
                normalized_tags.append('device:{}'.format(to_native_string(device_name)))
            except UnicodeError:
                self.log.warning(
                    'Encoding error with device name `%r` for metric `%r`, ignoring tag', device_name, metric_name
                )

        for tag in tags:
            if tag is None:
                continue
            try:
                tag = to_native_string(tag)
            except UnicodeError:
                self.log.warning('Encoding error with tag `%s` for metric `%s`, ignoring tag', tag, metric_name)
                continue
            if self.disable_generic_tags:
                normalized_tags.append(self.degeneralise_tag(tag))
            else:
                normalized_tags.append(tag)
        return normalized_tags

    def degeneralise_tag(self, tag):
        split_tag = tag.split(':', 1)
        if len(split_tag) > 1:
            tag_name, value = split_tag
        else:
            tag_name = tag
            value = None

        if tag_name in GENERIC_TAGS:
            new_name = '{}_{}'.format(self.name, tag_name)
            if value:
                return '{}:{}'.format(new_name, value)
            else:
                return new_name
        else:
            return tag

    def get_debug_metric_tags(self):
        tags = ['check_name:{}'.format(self.name), 'check_version:{}'.format(self.check_version)]
        tags.extend(self.instance.get('tags', []))
        return tags

    def get_memory_profile_tags(self):
        # type: () -> List[str]
        tags = self.get_debug_metric_tags()
        tags.extend(self.instance.get('__memory_profiling_tags', []))
        return tags

    def should_profile_memory(self):
        # type: () -> bool
        return 'profile_memory' in self.init_config or (
            datadog_agent.tracemalloc_enabled() and should_profile_memory(datadog_agent, self.name)
        )

    def profile_memory(self, func, namespaces=None, args=(), kwargs=None, extra_tags=None):
        # type: (Callable[..., Any], Optional[Sequence[str]], Sequence[Any], Optional[Dict[str, Any]], Optional[List[str]]) -> None  # noqa: E501
        from ..utils.agent.memory import profile_memory

        if namespaces is None:
            namespaces = self.check_id.split(':', 1)

        tags = self.get_memory_profile_tags()
        if extra_tags is not None:
            tags.extend(extra_tags)

        metrics = profile_memory(func, self.init_config, namespaces=namespaces, args=args, kwargs=kwargs)

        for m in metrics:
            self.gauge(m.name, m.value, tags=tags, raw=True)

http property

Provides logic to yield consistent network behavior based on user configuration.

Only new checks or checks on Agent 6.13+ can and should use this for HTTP requests.

gauge(name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a gauge metric.

Parameters:

Name Type Description Default
name str

the name of the metric

required
value float

the value for the metric

required
tags list[str]

a list of tags to associate with this metric

None
hostname str

a hostname to associate with this metric. Defaults to the current host.

None
device_name str

deprecated add a tag in the form device:<device_name> to the tags list instead.

None
raw bool

whether to ignore any defined namespace prefix

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a gauge metric.

    Parameters:
        name (str):
            the name of the metric
        value (float):
            the value for the metric
        tags (list[str]):
            a list of tags to associate with this metric
        hostname (str):
            a hostname to associate with this metric. Defaults to the current host.
        device_name (str):
            **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
        raw (bool):
            whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

count(name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a raw count metric.

Parameters:

Name Type Description Default
name str

the name of the metric

required
value float

the value for the metric

required
tags list[str]

a list of tags to associate with this metric

None
hostname str

a hostname to associate with this metric. Defaults to the current host.

None
device_name str

deprecated add a tag in the form device:<device_name> to the tags list instead.

None
raw bool

whether to ignore any defined namespace prefix

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def count(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a raw count metric.

    Parameters:
        name (str):
            the name of the metric
        value (float):
            the value for the metric
        tags (list[str]):
            a list of tags to associate with this metric
        hostname (str):
            a hostname to associate with this metric. Defaults to the current host.
        device_name (str):
            **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
        raw (bool):
            whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

monotonic_count(name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False)

Sample an increasing counter metric.

Parameters:

Name Type Description Default
name str

the name of the metric

required
value float

the value for the metric

required
tags list[str]

a list of tags to associate with this metric

None
hostname str

a hostname to associate with this metric. Defaults to the current host.

None
device_name str

deprecated add a tag in the form device:<device_name> to the tags list instead.

None
raw bool

whether to ignore any defined namespace prefix

False
flush_first_value bool

whether to sample the first value

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def monotonic_count(
    self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
):
    # type: (str, float, Sequence[str], str, str, bool, bool) -> None
    """Sample an increasing counter metric.

    Parameters:
        name (str):
            the name of the metric
        value (float):
            the value for the metric
        tags (list[str]):
            a list of tags to associate with this metric
        hostname (str):
            a hostname to associate with this metric. Defaults to the current host.
        device_name (str):
            **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
        raw (bool):
            whether to ignore any defined namespace prefix
        flush_first_value (bool):
            whether to sample the first value
    """
    self._submit_metric(
        aggregator.MONOTONIC_COUNT,
        name,
        value,
        tags=tags,
        hostname=hostname,
        device_name=device_name,
        raw=raw,
        flush_first_value=flush_first_value,
    )

rate(name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a point, with the rate calculated at the end of the check.

Parameters:

Name Type Description Default
name str

the name of the metric

required
value float

the value for the metric

required
tags list[str]

a list of tags to associate with this metric

None
hostname str

a hostname to associate with this metric. Defaults to the current host.

None
device_name str

deprecated add a tag in the form device:<device_name> to the tags list instead.

None
raw bool

whether to ignore any defined namespace prefix

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a point, with the rate calculated at the end of the check.

    Parameters:
        name (str):
            the name of the metric
        value (float):
            the value for the metric
        tags (list[str]):
            a list of tags to associate with this metric
        hostname (str):
            a hostname to associate with this metric. Defaults to the current host.
        device_name (str):
            **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
        raw (bool):
            whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

histogram(name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a histogram metric.

Parameters:

Name Type Description Default
name str

the name of the metric

required
value float

the value for the metric

required
tags list[str]

a list of tags to associate with this metric

None
hostname str

a hostname to associate with this metric. Defaults to the current host.

None
device_name str

deprecated add a tag in the form device:<device_name> to the tags list instead.

None
raw bool

whether to ignore any defined namespace prefix

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a histogram metric.

    Parameters:
        name (str):
            the name of the metric
        value (float):
            the value for the metric
        tags (list[str]):
            a list of tags to associate with this metric
        hostname (str):
            a hostname to associate with this metric. Defaults to the current host.
        device_name (str):
            **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
        raw (bool):
            whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

historate(name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a histogram based on rate metrics.

Parameters:

Name Type Description Default
name str

the name of the metric

required
value float

the value for the metric

required
tags list[str]

a list of tags to associate with this metric

None
hostname str

a hostname to associate with this metric. Defaults to the current host.

None
device_name str

deprecated add a tag in the form device:<device_name> to the tags list instead.

None
raw bool

whether to ignore any defined namespace prefix

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a histogram based on rate metrics.

    Parameters:
        name (str):
            the name of the metric
        value (float):
            the value for the metric
        tags (list[str]):
            a list of tags to associate with this metric
        hostname (str):
            a hostname to associate with this metric. Defaults to the current host.
        device_name (str):
            **deprecated** add a tag in the form `device:<device_name>` to the `tags` list instead.
        raw (bool):
            whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.HISTORATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

service_check(name, status, tags=None, hostname=None, message=None, raw=False)

Send the status of a service.

Parameters:

Name Type Description Default
name str

the name of the service check

required
status int

a constant describing the service status

required
tags list[str]

a list of tags to associate with this service check

None
message str

additional information or a description of why this status occurred.

None
raw bool

whether to ignore any defined namespace prefix

False
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def service_check(self, name, status, tags=None, hostname=None, message=None, raw=False):
    # type: (str, ServiceCheckStatus, Sequence[str], str, str, bool) -> None
    """Send the status of a service.

    Parameters:
        name (str):
            the name of the service check
        status (int):
            a constant describing the service status
        tags (list[str]):
            a list of tags to associate with this service check
        message (str):
            additional information or a description of why this status occurred.
        raw (bool):
            whether to ignore any defined namespace prefix
    """
    tags = self._normalize_tags_type(tags or [])
    if hostname is None:
        hostname = ''
    if message is None:
        message = ''
    else:
        message = to_native_string(message)

    message = self.sanitize(message)

    aggregator.submit_service_check(
        self, self.check_id, self._format_namespace(name, raw), status, tags, hostname, message
    )

event(event)

Send an event.

An event is a dictionary with the following keys and data types:

{
    "timestamp": int,        # the epoch timestamp for the event
    "event_type": str,       # the event name
    "api_key": str,          # the api key for your account
    "msg_title": str,        # the title of the event
    "msg_text": str,         # the text body of the event
    "aggregation_key": str,  # a key to use for aggregating events
    "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
    "source_type_name": str, # (optional) the source type name
    "host": str,             # (optional) the name of the host
    "tags": list,            # (optional) a list of tags to associate with this event
    "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
}

Parameters:

Name Type Description Default
event dict[str, Any]

the event to be sent

required
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def event(self, event):
    # type: (Event) -> None
    """Send an event.

    An event is a dictionary with the following keys and data types:

    ```python
    {
        "timestamp": int,        # the epoch timestamp for the event
        "event_type": str,       # the event name
        "api_key": str,          # the api key for your account
        "msg_title": str,        # the title of the event
        "msg_text": str,         # the text body of the event
        "aggregation_key": str,  # a key to use for aggregating events
        "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
        "source_type_name": str, # (optional) the source type name
        "host": str,             # (optional) the name of the host
        "tags": list,            # (optional) a list of tags to associate with this event
        "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
    }
    ```

    Parameters:
        event (dict[str, Any]):
            the event to be sent
    """
    # Enforce types of some fields, considerably facilitates handling in go bindings downstream
    for key, value in event.items():
        if not isinstance(value, (str, bytes)):
            continue

        try:
            event[key] = to_native_string(value)  # type: ignore
            # ^ Mypy complains about dynamic key assignment -- arguably for good reason.
            # Ideally we should convert this to a dict literal so that submitted events only include known keys.
        except UnicodeError:
            self.log.warning('Encoding error with field `%s`, cannot submit event', key)
            return

    if event.get('tags'):
        event['tags'] = self._normalize_tags_type(event['tags'])
    if event.get('timestamp'):
        event['timestamp'] = int(event['timestamp'])
    if event.get('aggregation_key'):
        event['aggregation_key'] = to_native_string(event['aggregation_key'])

    if self.__NAMESPACE__:
        event.setdefault('source_type_name', self.__NAMESPACE__)

    aggregator.submit_event(self, self.check_id, event)

set_metadata(name, value, **options)

Updates the cached metadata name with value, which is then sent by the Agent at regular intervals.

Parameters:

Name Type Description Default
name str

the name of the metadata

required
value Any

the value for the metadata. if name has no transformer defined then the raw value will be submitted and therefore it must be a str

required
options Any

keyword arguments to pass to any defined transformer

{}
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def set_metadata(self, name, value, **options):
    # type: (str, Any, **Any) -> None
    """Updates the cached metadata `name` with `value`, which is then sent by the Agent at regular intervals.

    Parameters:
        name (str):
            the name of the metadata
        value (Any):
            the value for the metadata. if ``name`` has no transformer defined then the
            raw ``value`` will be submitted and therefore it must be a ``str``
        options (Any):
            keyword arguments to pass to any defined transformer
    """
    self.metadata_manager.submit(name, value, options)

metadata_entrypoint(method) classmethod

Skip execution of the decorated method if metadata collection is disabled on the Agent.

Usage:

class MyCheck(AgentCheck):
    @AgentCheck.metadata_entrypoint
    def collect_metadata(self):
        ...
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
@classmethod
def metadata_entrypoint(cls, method):
    # type: (Callable[..., None]) -> Callable[..., None]
    """
    Skip execution of the decorated method if metadata collection is disabled on the Agent.

    Usage:

    ```python
    class MyCheck(AgentCheck):
        @AgentCheck.metadata_entrypoint
        def collect_metadata(self):
            ...
    ```
    """

    @functools.wraps(method)
    def entrypoint(self, *args, **kwargs):
        # type: (AgentCheck, *Any, **Any) -> None
        if not self.is_metadata_collection_enabled():
            return

        # NOTE: error handling still at the discretion of the wrapped method.
        method(self, *args, **kwargs)

    return entrypoint

read_persistent_cache(key)

Returns the value previously stored with write_persistent_cache for the same key.

Parameters:

Name Type Description Default
key str

the key to retrieve

required
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def read_persistent_cache(self, key):
    # type: (str) -> str
    """Returns the value previously stored with `write_persistent_cache` for the same `key`.

    Parameters:
        key (str):
            the key to retrieve
    """
    return datadog_agent.read_persistent_cache(self._persistent_cache_id(key))

write_persistent_cache(key, value)

Stores value in a persistent cache for this check instance. The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in - %ProgramData%\Datadog\run on Windows. - /opt/datadog-agent/run everywhere else. The cache is persistent between agent restarts but will be rebuilt if the check instance configuration changes.

Parameters:

Name Type Description Default
key str

the key to retrieve

required
value str

the value to store

required
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def write_persistent_cache(self, key, value):
    # type: (str, str) -> None
    """Stores `value` in a persistent cache for this check instance.
    The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in
        - `%ProgramData%\\Datadog\\run` on Windows.
        - `/opt/datadog-agent/run` everywhere else.
    The cache is persistent between agent restarts but will be rebuilt if the check instance configuration changes.

    Parameters:
        key (str):
            the key to retrieve
        value (str):
            the value to store
    """
    datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value)

send_log(data, cursor=None, stream='default')

Send a log for submission.

Parameters:

Name Type Description Default
data dict[str, str]

The log data to send. The following keys are treated specially, if present:

  • timestamp: should be an integer or float representing the number of seconds since the Unix epoch
  • ddtags: if not defined, it will automatically be set based on the instance's tags option
required
cursor dict[str, Any] or None

Metadata associated with the log which will be saved to disk. The most recent value may be retrieved with the get_log_cursor method.

None
stream str

The stream associated with this log, used for accurate cursor persistence. Has no effect if cursor argument is None.

'default'
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def send_log(self, data, cursor=None, stream='default'):
    # type: (dict[str, str], dict[str, Any] | None, str) -> None
    """Send a log for submission.

    Parameters:
        data (dict[str, str]):
            The log data to send. The following keys are treated specially, if present:

            - timestamp: should be an integer or float representing the number of seconds since the Unix epoch
            - ddtags: if not defined, it will automatically be set based on the instance's `tags` option
        cursor (dict[str, Any] or None):
            Metadata associated with the log which will be saved to disk. The most recent value may be
            retrieved with the `get_log_cursor` method.
        stream (str):
            The stream associated with this log, used for accurate cursor persistence.
            Has no effect if `cursor` argument is `None`.
    """
    attributes = data.copy()
    if 'ddtags' not in attributes and self.formatted_tags:
        attributes['ddtags'] = self.formatted_tags

    timestamp = attributes.get('timestamp')
    if timestamp is not None:
        # convert seconds to milliseconds
        attributes['timestamp'] = int(timestamp * 1000)

    datadog_agent.send_log(to_json(attributes), self.check_id)
    if cursor is not None:
        self.write_persistent_cache('log_cursor_{}'.format(stream), to_json(cursor))

get_log_cursor(stream='default')

Returns the most recent log cursor from disk.

Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def get_log_cursor(self, stream='default'):
    # type: (str) -> dict[str, Any] | None
    """Returns the most recent log cursor from disk."""
    data = self.read_persistent_cache('log_cursor_{}'.format(stream))
    return from_json(data) if data else None

warning(warning_message, *args, **kwargs)

Log a warning message, display it in the Agent's status page and in-app.

Using *args is intended to make warning work like log.warn/debug/info/etc and make it compliant with flake8 logging format linter.

Parameters:

Name Type Description Default
warning_message str

the warning message

required
args Any

format string args used to format the warning message e.g. warning_message % args

()
kwargs Any

not used for now, but added to match Python logger's warning method signature

{}
Source code in datadog_checks_base/datadog_checks/base/checks/base.py
def warning(self, warning_message, *args, **kwargs):
    # type: (str, *Any, **Any) -> None
    """Log a warning message, display it in the Agent's status page and in-app.

    Using *args is intended to make warning work like log.warn/debug/info/etc
    and make it compliant with flake8 logging format linter.

    Parameters:
        warning_message (str):
            the warning message
        args (Any):
            format string args used to format the warning message e.g. `warning_message % args`
        kwargs (Any):
            not used for now, but added to match Python logger's `warning` method signature
    """
    warning_message = to_native_string(warning_message)
    # Interpolate message only if args is not empty. Same behavior as python logger:
    # https://github.com/python/cpython/blob/1dbe5373851acb85ba91f0be7b83c69563acd68d/Lib/logging/__init__.py#L368-L369
    if args:
        warning_message = warning_message % args
    frame = inspect.currentframe().f_back  # type: ignore
    lineno = frame.f_lineno
    # only log the last part of the filename, not the full path
    filename = basename(frame.f_code.co_filename)

    self.log.warning(warning_message, extra={'_lineno': lineno, '_filename': filename, '_check_id': self.check_id})
    self.warnings.append(warning_message)

Stubs

datadog_checks.base.stubs.aggregator.AggregatorStub

This implements the methods defined by the Agent's C bindings which in turn call the Go backend.

It also provides utility methods for test assertions.

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
class AggregatorStub(object):
    """
    This implements the methods defined by the Agent's
    [C bindings](https://github.com/DataDog/datadog-agent/blob/master/rtloader/common/builtins/aggregator.c)
    which in turn call the
    [Go backend](https://github.com/DataDog/datadog-agent/blob/master/pkg/collector/python/aggregator.go).

    It also provides utility methods for test assertions.
    """

    # Replicate the Enum we have on the Agent
    METRIC_ENUM_MAP = OrderedDict(
        (
            ('gauge', 0),
            ('rate', 1),
            ('count', 2),
            ('monotonic_count', 3),
            ('counter', 4),
            ('histogram', 5),
            ('historate', 6),
        )
    )
    METRIC_ENUM_MAP_REV = {v: k for k, v in METRIC_ENUM_MAP.items()}
    GAUGE, RATE, COUNT, MONOTONIC_COUNT, COUNTER, HISTOGRAM, HISTORATE = list(METRIC_ENUM_MAP.values())
    AGGREGATE_TYPES = {COUNT, COUNTER}
    IGNORED_METRICS = {'datadog.agent.profile.memory.check_run_alloc'}
    METRIC_TYPE_SUBMISSION_TO_BACKEND_MAP = {
        'gauge': 'gauge',
        'rate': 'gauge',
        'count': 'count',
        'monotonic_count': 'count',
        'counter': 'rate',
        'histogram': 'rate',  # Checking .count only, the other are gauges
        'historate': 'rate',  # Checking .count only, the other are gauges
    }

    def __init__(self):
        self.reset()

    @classmethod
    def is_aggregate(cls, mtype):
        return mtype in cls.AGGREGATE_TYPES

    @classmethod
    def ignore_metric(cls, name):
        return name in cls.IGNORED_METRICS

    def submit_metric(self, check, check_id, mtype, name, value, tags, hostname, flush_first_value):
        check_tag_names(name, tags)
        if not self.ignore_metric(name):
            self._metrics[name].append(MetricStub(name, mtype, value, tags, hostname, None, flush_first_value))

    def submit_metric_e2e(
        self, check, check_id, mtype, name, value, tags, hostname, device=None, flush_first_value=False
    ):
        check_tag_names(name, tags)
        # Device is only present in metrics read from the real agent in e2e tests. Normally it is submitted as a tag
        if not self.ignore_metric(name):
            self._metrics[name].append(MetricStub(name, mtype, value, tags, hostname, device, flush_first_value))

    def submit_service_check(self, check, check_id, name, status, tags, hostname, message):
        if status == ServiceCheck.OK and message:
            raise Exception("Expected empty message on OK service check")

        check_tag_names(name, tags)
        self._service_checks[name].append(ServiceCheckStub(check_id, name, status, tags, hostname, message))

    def submit_event(self, check, check_id, event):
        self._events.append(event)

    def submit_event_platform_event(self, check, check_id, raw_event, event_type):
        self._event_platform_events[event_type].append(raw_event)

    def submit_histogram_bucket(
        self,
        check,
        check_id,
        name,
        value,
        lower_bound,
        upper_bound,
        monotonic,
        hostname,
        tags,
        flush_first_value=False,
    ):
        check_tag_names(name, tags)
        self._histogram_buckets[name].append(
            HistogramBucketStub(name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value)
        )

    def metrics(self, name):
        """
        Return the metrics received under the given name
        """
        return [
            MetricStub(
                ensure_unicode(stub.name),
                stub.type,
                stub.value,
                normalize_tags(stub.tags),
                ensure_unicode(stub.hostname),
                stub.device,
                stub.flush_first_value,
            )
            for stub in self._metrics.get(to_native_string(name), [])
        ]

    def service_checks(self, name):
        """
        Return the service checks received under the given name
        """
        return [
            ServiceCheckStub(
                ensure_unicode(stub.check_id),
                ensure_unicode(stub.name),
                stub.status,
                normalize_tags(stub.tags),
                ensure_unicode(stub.hostname),
                ensure_unicode(stub.message),
            )
            for stub in self._service_checks.get(to_native_string(name), [])
        ]

    @property
    def events(self):
        """
        Return all events
        """
        return self._events

    def get_event_platform_events(self, event_type, parse_json=True):
        """
        Return all event platform events for the event_type
        """
        return [json.loads(e) if parse_json else e for e in self._event_platform_events[event_type]]

    def histogram_bucket(self, name):
        """
        Return the histogram buckets received under the given name
        """
        return [
            HistogramBucketStub(
                ensure_unicode(stub.name),
                stub.value,
                stub.lower_bound,
                stub.upper_bound,
                stub.monotonic,
                ensure_unicode(stub.hostname),
                normalize_tags(stub.tags),
                stub.flush_first_value,
            )
            for stub in self._histogram_buckets.get(to_native_string(name), [])
        ]

    def assert_metric_has_tags(self, metric_name, tags, count=None, at_least=1):
        for tag in tags:
            self.assert_metric_has_tag(metric_name, tag, count, at_least)

    def assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1):
        """
        Assert a metric is tagged with tag
        """
        self._asserted.add(metric_name)

        candidates = []
        candidates_with_tag = []
        for metric in self.metrics(metric_name):
            candidates.append(metric)
            if tag in metric.tags:
                candidates_with_tag.append(metric)

        if candidates_with_tag:  # The metric was found with the tag but not enough times
            msg = "The metric '{}' with tag '{}' was only found {}/{} times".format(metric_name, tag, count, at_least)
        elif candidates:
            msg = (
                "The metric '{}' was found but not with the tag '{}'.\n".format(metric_name, tag)
                + "Similar submitted:\n"
                + "\n".join(["     {}".format(m) for m in candidates])
            )
        else:
            expected_stub = MetricStub(metric_name, type=None, value=None, tags=[tag], hostname=None, device=None)
            msg = "Metric '{}' not found".format(metric_name)
            msg = "{}\n{}".format(msg, build_similar_elements_msg(expected_stub, self._metrics))

        if count is not None:
            assert len(candidates_with_tag) == count, msg
        else:
            assert len(candidates_with_tag) >= at_least, msg

    # Potential kwargs: aggregation_key, alert_type, event_type,
    # msg_title, source_type_name
    def assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs):
        candidates = []
        for e in self.events:
            if exact_match and msg_text != e['msg_text'] or msg_text not in e['msg_text']:
                continue
            if tags and set(tags) != set(e['tags']):
                continue
            for name, value in kwargs.items():
                if e[name] != value:
                    break
            else:
                candidates.append(e)

        msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(msg_text, count, at_least)
        if count is not None:
            assert len(candidates) == count, msg
        else:
            assert len(candidates) >= at_least, msg

    def assert_histogram_bucket(
        self,
        name,
        value,
        lower_bound,
        upper_bound,
        monotonic,
        hostname,
        tags,
        count=None,
        at_least=1,
        flush_first_value=None,
    ):
        expected_tags = normalize_tags(tags, sort=True)

        candidates = []
        for bucket in self.histogram_bucket(name):
            if value is not None and value != bucket.value:
                continue

            if expected_tags and expected_tags != sorted(bucket.tags):
                continue

            if hostname and hostname != bucket.hostname:
                continue

            if monotonic != bucket.monotonic:
                continue

            if flush_first_value is not None and flush_first_value != bucket.flush_first_value:
                continue

            candidates.append(bucket)

        expected_bucket = HistogramBucketStub(
            name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value
        )

        if count is not None:
            msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
            condition = len(candidates) == count
        else:
            msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
            condition = len(candidates) >= at_least
        self._assert(
            condition=condition, msg=msg, expected_stub=expected_bucket, submitted_elements=self._histogram_buckets
        )

    def assert_metric(
        self,
        name,
        value=None,
        tags=None,
        count=None,
        at_least=1,
        hostname=None,
        metric_type=None,
        device=None,
        flush_first_value=None,
    ):
        """
        Assert a metric was processed by this stub
        """

        self._asserted.add(name)
        expected_tags = normalize_tags(tags, sort=True)

        candidates = []
        for metric in self.metrics(name):
            if value is not None and not self.is_aggregate(metric.type) and value != metric.value:
                continue

            if expected_tags and expected_tags != sorted(metric.tags):
                continue

            if hostname is not None and hostname != metric.hostname:
                continue

            if metric_type is not None and metric_type != metric.type:
                continue

            if device is not None and device != metric.device:
                continue

            if flush_first_value is not None and flush_first_value != metric.flush_first_value:
                continue

            candidates.append(metric)

        expected_metric = MetricStub(name, metric_type, value, expected_tags, hostname, device, flush_first_value)

        if value is not None and candidates and all(self.is_aggregate(m.type) for m in candidates):
            got = sum(m.value for m in candidates)
            msg = "Expected count value for '{}': {}, got {}".format(name, value, got)
            condition = value == got
        elif count is not None:
            msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
            condition = len(candidates) == count
        else:
            msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
            condition = len(candidates) >= at_least
        self._assert(condition, msg=msg, expected_stub=expected_metric, submitted_elements=self._metrics)

    def assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None):
        """
        Assert a service check was processed by this stub
        """
        tags = normalize_tags(tags, sort=True)
        candidates = []
        for sc in self.service_checks(name):
            if status is not None and status != sc.status:
                continue

            if tags and tags != sorted(sc.tags):
                continue

            if hostname is not None and hostname != sc.hostname:
                continue

            if message is not None and message != sc.message:
                continue

            candidates.append(sc)

        expected_service_check = ServiceCheckStub(
            None, name=name, status=status, tags=tags, hostname=hostname, message=message
        )

        if count is not None:
            msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
            condition = len(candidates) == count
        else:
            msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
            condition = len(candidates) >= at_least
        self._assert(
            condition=condition, msg=msg, expected_stub=expected_service_check, submitted_elements=self._service_checks
        )

    @staticmethod
    def _assert(condition, msg, expected_stub, submitted_elements):
        new_msg = msg
        if not condition:  # It's costly to build the message with similar metrics, so it's built only on failure.
            new_msg = "{}\n{}".format(msg, build_similar_elements_msg(expected_stub, submitted_elements))
        assert condition, new_msg

    def assert_all_metrics_covered(self):
        # use `condition` to avoid building the `msg` if not needed
        condition = self.metrics_asserted_pct >= 100.0
        msg = ''
        if not condition:
            prefix = '\n\t- '
            msg = 'Some metrics are collected but not asserted:'
            msg += '\nAsserted Metrics:{}{}'.format(prefix, prefix.join(sorted(self._asserted)))
            msg += '\nFound metrics that are not asserted:{}{}'.format(prefix, prefix.join(sorted(self.not_asserted())))
        assert condition, msg

    def assert_metrics_using_metadata(
        self, metadata_metrics, check_metric_type=True, check_submission_type=False, exclude=None
    ):
        """
        Assert metrics using metadata.csv

        Checking type: By default we are asserting the in-app metric type (`check_submission_type=False`),
        asserting this type make sense for e2e (metrics collected from agent).
        For integrations tests, we can check the submission type with `check_submission_type=True`, or
        use `check_metric_type=False` not to check types.

        Usage:

            from datadog_checks.dev.utils import get_metadata_metrics
            aggregator.assert_metrics_using_metadata(get_metadata_metrics())

        """

        exclude = exclude or []
        errors = set()
        for metric_name, metric_stubs in self._metrics.items():
            if metric_name in exclude:
                continue
            for metric_stub in metric_stubs:
                metric_stub_name = backend_normalize_metric_name(metric_stub.name)
                actual_metric_type = AggregatorStub.METRIC_ENUM_MAP_REV[metric_stub.type]

                # We only check `*.count` metrics for histogram and historate submissions
                # Note: all Openmetrics histogram and summary metrics are actually separately submitted
                if check_submission_type and actual_metric_type in ['histogram', 'historate']:
                    metric_stub_name += '.count'

                # Checking the metric is in `metadata.csv`
                if metric_stub_name not in metadata_metrics:
                    errors.add("Expect `{}` to be in metadata.csv.".format(metric_stub_name))
                    continue

                expected_metric_type = metadata_metrics[metric_stub_name]['metric_type']
                if check_submission_type:
                    # Integration tests type mapping
                    actual_metric_type = AggregatorStub.METRIC_TYPE_SUBMISSION_TO_BACKEND_MAP[actual_metric_type]
                else:
                    # E2E tests
                    if actual_metric_type == 'monotonic_count' and expected_metric_type == 'count':
                        actual_metric_type = 'count'

                if check_metric_type:
                    if expected_metric_type != actual_metric_type:
                        errors.add(
                            "Expect `{}` to have type `{}` but got `{}`.".format(
                                metric_stub_name, expected_metric_type, actual_metric_type
                            )
                        )

        assert not errors, "Metadata assertion errors using metadata.csv:" + "\n\t- ".join([''] + sorted(errors))

    def assert_service_checks(self, service_checks):
        """
        Assert service checks using service_checks.json

        Usage:

            from datadog_checks.dev.utils import get_service_checks
            aggregator.assert_service_checks(get_service_checks())

        """

        errors = set()

        for service_check_name, service_check_stubs in self._service_checks.items():
            for service_check_stub in service_check_stubs:
                # Checking the metric is in `service_checks.json`
                if service_check_name not in [sc['check'] for sc in service_checks]:
                    errors.add("Expect `{}` to be in service_check.json.".format(service_check_name))
                    continue

                status_string = {value: key for key, value in ServiceCheck._asdict().items()}[
                    service_check_stub.status
                ].lower()
                service_check = [c for c in service_checks if c['check'] == service_check_name][0]

                if status_string not in service_check['statuses']:
                    errors.add(
                        "Expect `{}` value to be in service_check.json for service check {}.".format(
                            status_string, service_check_stub.name
                        )
                    )

        assert not errors, "Service checks assertion errors using service_checks.json:" + "\n\t- ".join(
            [''] + sorted(errors)
        )

    def assert_no_duplicate_all(self):
        """
        Assert no duplicate metrics and service checks have been submitted.
        """
        self.assert_no_duplicate_metrics()
        self.assert_no_duplicate_service_checks()

    def assert_no_duplicate_metrics(self):
        """
        Assert no duplicate metrics have been submitted.

        Metrics are considered duplicate when all following fields match:

        - metric name
        - type (gauge, rate, etc)
        - tags
        - hostname
        """
        # metric types that intended to be called multiple times are ignored
        ignored_types = [self.COUNT, self.COUNTER]
        metric_stubs = [m for metrics in self._metrics.values() for m in metrics if m.type not in ignored_types]

        def stub_to_key_fn(stub):
            return stub.name, stub.type, str(sorted(stub.tags)), stub.hostname

        self._assert_no_duplicate_stub('metric', metric_stubs, stub_to_key_fn)

    def assert_no_duplicate_service_checks(self):
        """
        Assert no duplicate service checks have been submitted.

        Service checks are considered duplicate when all following fields match:
            - metric name
            - status
            - tags
            - hostname
        """
        service_check_stubs = [m for metrics in self._service_checks.values() for m in metrics]

        def stub_to_key_fn(stub):
            return stub.name, stub.status, str(sorted(stub.tags)), stub.hostname

        self._assert_no_duplicate_stub('service_check', service_check_stubs, stub_to_key_fn)

    @staticmethod
    def _assert_no_duplicate_stub(stub_type, all_metrics, stub_to_key_fn):
        all_contexts = defaultdict(list)
        for metric in all_metrics:
            context = stub_to_key_fn(metric)
            all_contexts[context].append(metric)

        dup_contexts = defaultdict(list)
        for context, metrics in all_contexts.items():
            if len(metrics) > 1:
                dup_contexts[context] = metrics

        err_msg_lines = ["Duplicate {}s found:".format(stub_type)]
        for key in sorted(dup_contexts):
            contexts = dup_contexts[key]
            err_msg_lines.append('- {}'.format(contexts[0].name))
            for metric in contexts:
                err_msg_lines.append('    ' + str(metric))

        assert len(dup_contexts) == 0, "\n".join(err_msg_lines)

    def reset(self):
        """
        Set the stub to its initial state
        """
        self._metrics = defaultdict(list)
        self._asserted = set()
        self._service_checks = defaultdict(list)
        self._events = []
        # dict[event_type, [events]]
        self._event_platform_events = defaultdict(list)
        self._histogram_buckets = defaultdict(list)

    def all_metrics_asserted(self):
        assert self.metrics_asserted_pct >= 100.0

    def not_asserted(self):
        present_metrics = {ensure_unicode(m) for m in self._metrics}
        return present_metrics - set(self._asserted)

    def assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1):
        candidates = []
        self._asserted.add(metric_name)

        for metric in self.metrics(metric_name):
            tags = metric.tags
            gtags = [t for t in tags if t.startswith(tag_prefix)]
            if len(gtags) > 0:
                candidates.append(metric)

        msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
        if count is not None:
            assert len(candidates) == count, msg
        else:
            assert len(candidates) >= at_least, msg

    @property
    def metrics_asserted_pct(self):
        """
        Return the metrics assertion coverage
        """
        num_metrics = len(self._metrics)
        num_asserted = len(self._asserted)

        if num_metrics == 0:
            if num_asserted == 0:
                return 100
            else:
                return 0

        # If it there have been assertions with at_least=0 the length of the num_metrics and num_asserted can match
        # even if there are different metrics in each set
        not_asserted = self.not_asserted()
        return (num_metrics - len(not_asserted)) / num_metrics * 100

    @property
    def metric_names(self):
        """
        Return all the metric names we've seen so far
        """
        return [ensure_unicode(name) for name in self._metrics.keys()]

    @property
    def service_check_names(self):
        """
        Return all the service checks names seen so far
        """
        return [ensure_unicode(name) for name in self._service_checks.keys()]

assert_metric(name, value=None, tags=None, count=None, at_least=1, hostname=None, metric_type=None, device=None, flush_first_value=None)

Assert a metric was processed by this stub

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_metric(
    self,
    name,
    value=None,
    tags=None,
    count=None,
    at_least=1,
    hostname=None,
    metric_type=None,
    device=None,
    flush_first_value=None,
):
    """
    Assert a metric was processed by this stub
    """

    self._asserted.add(name)
    expected_tags = normalize_tags(tags, sort=True)

    candidates = []
    for metric in self.metrics(name):
        if value is not None and not self.is_aggregate(metric.type) and value != metric.value:
            continue

        if expected_tags and expected_tags != sorted(metric.tags):
            continue

        if hostname is not None and hostname != metric.hostname:
            continue

        if metric_type is not None and metric_type != metric.type:
            continue

        if device is not None and device != metric.device:
            continue

        if flush_first_value is not None and flush_first_value != metric.flush_first_value:
            continue

        candidates.append(metric)

    expected_metric = MetricStub(name, metric_type, value, expected_tags, hostname, device, flush_first_value)

    if value is not None and candidates and all(self.is_aggregate(m.type) for m in candidates):
        got = sum(m.value for m in candidates)
        msg = "Expected count value for '{}': {}, got {}".format(name, value, got)
        condition = value == got
    elif count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(condition, msg=msg, expected_stub=expected_metric, submitted_elements=self._metrics)

assert_metric_has_tag(metric_name, tag, count=None, at_least=1)

Assert a metric is tagged with tag

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1):
    """
    Assert a metric is tagged with tag
    """
    self._asserted.add(metric_name)

    candidates = []
    candidates_with_tag = []
    for metric in self.metrics(metric_name):
        candidates.append(metric)
        if tag in metric.tags:
            candidates_with_tag.append(metric)

    if candidates_with_tag:  # The metric was found with the tag but not enough times
        msg = "The metric '{}' with tag '{}' was only found {}/{} times".format(metric_name, tag, count, at_least)
    elif candidates:
        msg = (
            "The metric '{}' was found but not with the tag '{}'.\n".format(metric_name, tag)
            + "Similar submitted:\n"
            + "\n".join(["     {}".format(m) for m in candidates])
        )
    else:
        expected_stub = MetricStub(metric_name, type=None, value=None, tags=[tag], hostname=None, device=None)
        msg = "Metric '{}' not found".format(metric_name)
        msg = "{}\n{}".format(msg, build_similar_elements_msg(expected_stub, self._metrics))

    if count is not None:
        assert len(candidates_with_tag) == count, msg
    else:
        assert len(candidates_with_tag) >= at_least, msg

assert_metric_has_tag_prefix(metric_name, tag_prefix, count=None, at_least=1)

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1):
    candidates = []
    self._asserted.add(metric_name)

    for metric in self.metrics(metric_name):
        tags = metric.tags
        gtags = [t for t in tags if t.startswith(tag_prefix)]
        if len(gtags) > 0:
            candidates.append(metric)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_service_check(name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None)

Assert a service check was processed by this stub

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None):
    """
    Assert a service check was processed by this stub
    """
    tags = normalize_tags(tags, sort=True)
    candidates = []
    for sc in self.service_checks(name):
        if status is not None and status != sc.status:
            continue

        if tags and tags != sorted(sc.tags):
            continue

        if hostname is not None and hostname != sc.hostname:
            continue

        if message is not None and message != sc.message:
            continue

        candidates.append(sc)

    expected_service_check = ServiceCheckStub(
        None, name=name, status=status, tags=tags, hostname=hostname, message=message
    )

    if count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(
        condition=condition, msg=msg, expected_stub=expected_service_check, submitted_elements=self._service_checks
    )

assert_event(msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs)

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs):
    candidates = []
    for e in self.events:
        if exact_match and msg_text != e['msg_text'] or msg_text not in e['msg_text']:
            continue
        if tags and set(tags) != set(e['tags']):
            continue
        for name, value in kwargs.items():
            if e[name] != value:
                break
        else:
            candidates.append(e)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(msg_text, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_histogram_bucket(name, value, lower_bound, upper_bound, monotonic, hostname, tags, count=None, at_least=1, flush_first_value=None)

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_histogram_bucket(
    self,
    name,
    value,
    lower_bound,
    upper_bound,
    monotonic,
    hostname,
    tags,
    count=None,
    at_least=1,
    flush_first_value=None,
):
    expected_tags = normalize_tags(tags, sort=True)

    candidates = []
    for bucket in self.histogram_bucket(name):
        if value is not None and value != bucket.value:
            continue

        if expected_tags and expected_tags != sorted(bucket.tags):
            continue

        if hostname and hostname != bucket.hostname:
            continue

        if monotonic != bucket.monotonic:
            continue

        if flush_first_value is not None and flush_first_value != bucket.flush_first_value:
            continue

        candidates.append(bucket)

    expected_bucket = HistogramBucketStub(
        name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value
    )

    if count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(
        condition=condition, msg=msg, expected_stub=expected_bucket, submitted_elements=self._histogram_buckets
    )

assert_metrics_using_metadata(metadata_metrics, check_metric_type=True, check_submission_type=False, exclude=None)

Assert metrics using metadata.csv

Checking type: By default we are asserting the in-app metric type (check_submission_type=False), asserting this type make sense for e2e (metrics collected from agent). For integrations tests, we can check the submission type with check_submission_type=True, or use check_metric_type=False not to check types.

Usage:

from datadog_checks.dev.utils import get_metadata_metrics
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_metrics_using_metadata(
    self, metadata_metrics, check_metric_type=True, check_submission_type=False, exclude=None
):
    """
    Assert metrics using metadata.csv

    Checking type: By default we are asserting the in-app metric type (`check_submission_type=False`),
    asserting this type make sense for e2e (metrics collected from agent).
    For integrations tests, we can check the submission type with `check_submission_type=True`, or
    use `check_metric_type=False` not to check types.

    Usage:

        from datadog_checks.dev.utils import get_metadata_metrics
        aggregator.assert_metrics_using_metadata(get_metadata_metrics())

    """

    exclude = exclude or []
    errors = set()
    for metric_name, metric_stubs in self._metrics.items():
        if metric_name in exclude:
            continue
        for metric_stub in metric_stubs:
            metric_stub_name = backend_normalize_metric_name(metric_stub.name)
            actual_metric_type = AggregatorStub.METRIC_ENUM_MAP_REV[metric_stub.type]

            # We only check `*.count` metrics for histogram and historate submissions
            # Note: all Openmetrics histogram and summary metrics are actually separately submitted
            if check_submission_type and actual_metric_type in ['histogram', 'historate']:
                metric_stub_name += '.count'

            # Checking the metric is in `metadata.csv`
            if metric_stub_name not in metadata_metrics:
                errors.add("Expect `{}` to be in metadata.csv.".format(metric_stub_name))
                continue

            expected_metric_type = metadata_metrics[metric_stub_name]['metric_type']
            if check_submission_type:
                # Integration tests type mapping
                actual_metric_type = AggregatorStub.METRIC_TYPE_SUBMISSION_TO_BACKEND_MAP[actual_metric_type]
            else:
                # E2E tests
                if actual_metric_type == 'monotonic_count' and expected_metric_type == 'count':
                    actual_metric_type = 'count'

            if check_metric_type:
                if expected_metric_type != actual_metric_type:
                    errors.add(
                        "Expect `{}` to have type `{}` but got `{}`.".format(
                            metric_stub_name, expected_metric_type, actual_metric_type
                        )
                    )

    assert not errors, "Metadata assertion errors using metadata.csv:" + "\n\t- ".join([''] + sorted(errors))

assert_all_metrics_covered()

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_all_metrics_covered(self):
    # use `condition` to avoid building the `msg` if not needed
    condition = self.metrics_asserted_pct >= 100.0
    msg = ''
    if not condition:
        prefix = '\n\t- '
        msg = 'Some metrics are collected but not asserted:'
        msg += '\nAsserted Metrics:{}{}'.format(prefix, prefix.join(sorted(self._asserted)))
        msg += '\nFound metrics that are not asserted:{}{}'.format(prefix, prefix.join(sorted(self.not_asserted())))
    assert condition, msg

assert_no_duplicate_metrics()

Assert no duplicate metrics have been submitted.

Metrics are considered duplicate when all following fields match:

  • metric name
  • type (gauge, rate, etc)
  • tags
  • hostname
Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_no_duplicate_metrics(self):
    """
    Assert no duplicate metrics have been submitted.

    Metrics are considered duplicate when all following fields match:

    - metric name
    - type (gauge, rate, etc)
    - tags
    - hostname
    """
    # metric types that intended to be called multiple times are ignored
    ignored_types = [self.COUNT, self.COUNTER]
    metric_stubs = [m for metrics in self._metrics.values() for m in metrics if m.type not in ignored_types]

    def stub_to_key_fn(stub):
        return stub.name, stub.type, str(sorted(stub.tags)), stub.hostname

    self._assert_no_duplicate_stub('metric', metric_stubs, stub_to_key_fn)

assert_no_duplicate_service_checks()

Assert no duplicate service checks have been submitted.

Service checks are considered duplicate when all following fields match
  • metric name
  • status
  • tags
  • hostname
Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_no_duplicate_service_checks(self):
    """
    Assert no duplicate service checks have been submitted.

    Service checks are considered duplicate when all following fields match:
        - metric name
        - status
        - tags
        - hostname
    """
    service_check_stubs = [m for metrics in self._service_checks.values() for m in metrics]

    def stub_to_key_fn(stub):
        return stub.name, stub.status, str(sorted(stub.tags)), stub.hostname

    self._assert_no_duplicate_stub('service_check', service_check_stubs, stub_to_key_fn)

assert_no_duplicate_all()

Assert no duplicate metrics and service checks have been submitted.

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def assert_no_duplicate_all(self):
    """
    Assert no duplicate metrics and service checks have been submitted.
    """
    self.assert_no_duplicate_metrics()
    self.assert_no_duplicate_service_checks()

all_metrics_asserted()

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def all_metrics_asserted(self):
    assert self.metrics_asserted_pct >= 100.0

reset()

Set the stub to its initial state

Source code in datadog_checks_base/datadog_checks/base/stubs/aggregator.py
def reset(self):
    """
    Set the stub to its initial state
    """
    self._metrics = defaultdict(list)
    self._asserted = set()
    self._service_checks = defaultdict(list)
    self._events = []
    # dict[event_type, [events]]
    self._event_platform_events = defaultdict(list)
    self._histogram_buckets = defaultdict(list)

datadog_checks.base.stubs.datadog_agent.DatadogAgentStub

This implements the methods defined by the Agent's C bindings which in turn call the Go backend.

It also provides utility methods for test assertions.

Source code in datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py
class DatadogAgentStub(object):
    """
    This implements the methods defined by the Agent's
    [C bindings](https://github.com/DataDog/datadog-agent/blob/master/rtloader/common/builtins/datadog_agent.c)
    which in turn call the
    [Go backend](https://github.com/DataDog/datadog-agent/blob/master/pkg/collector/python/datadog_agent.go).

    It also provides utility methods for test assertions.
    """

    def __init__(self):
        self._sent_logs = defaultdict(list)
        self._metadata = {}
        self._cache = {}
        self._config = self.get_default_config()
        self._hostname = 'stubbed.hostname'
        self._process_start_time = 0
        self._external_tags = []
        self._host_tags = "{}"
        self._sent_telemetry = defaultdict(list)

    def get_default_config(self):
        return {'enable_metadata_collection': True, 'disable_unsafe_yaml': True}

    def reset(self):
        self._sent_logs.clear()
        self._metadata.clear()
        self._cache.clear()
        self._config = self.get_default_config()
        self._process_start_time = 0
        self._external_tags = []
        self._host_tags = "{}"

    def assert_logs(self, check_id, logs):
        sent_logs = self._sent_logs[check_id]
        assert sent_logs == logs, 'Expected {} logs for check {}, found {}. Submitted logs: {}'.format(
            len(logs), check_id, len(self._sent_logs[check_id]), repr(self._sent_logs)
        )

    def assert_metadata(self, check_id, data):
        actual = {}
        for name in data:
            key = (check_id, name)
            if key in self._metadata:
                actual[name] = self._metadata[key]
        assert data == actual

    def assert_metadata_count(self, count):
        metadata_items = len(self._metadata)
        assert metadata_items == count, 'Expected {} metadata items, found {}. Submitted metadata: {}'.format(
            count, metadata_items, repr(self._metadata)
        )

    def assert_external_tags(self, hostname, external_tags, match_tags_order=False):
        for h, tags in self._external_tags:
            if h == hostname:
                if not match_tags_order:
                    external_tags = {k: sorted(v) for (k, v) in external_tags.items()}
                    tags = {k: sorted(v) for (k, v) in tags.items()}

                assert (
                    external_tags == tags
                ), 'Expected {} external tags for hostname {}, found {}. Submitted external tags: {}'.format(
                    external_tags, hostname, tags, repr(self._external_tags)
                )
                return

        raise AssertionError('Hostname {} not found in external tags {}'.format(hostname, repr(self._external_tags)))

    def assert_external_tags_count(self, count):
        tags_count = len(self._external_tags)
        assert tags_count == count, 'Expected {} external tags items, found {}. Submitted external tags: {}'.format(
            count, tags_count, repr(self._external_tags)
        )

    def assert_telemetry(self, check_name, metric_name, metric_type, metric_value):
        values = self._sent_telemetry[(check_name, metric_name, metric_type)]
        assert metric_value in values, 'Expected value {} for check {}, metric {}, type {}. Found {}.'.format(
            metric_value, check_name, metric_name, metric_type, values
        )

    def get_hostname(self):
        return self._hostname

    def set_hostname(self, hostname):
        self._hostname = hostname

    def reset_hostname(self):
        self._hostname = 'stubbed.hostname'

    def get_host_tags(self):
        return self._host_tags

    def _set_host_tags(self, tags_dict):
        self._host_tags = json.dumps(tags_dict)

    def _reset_host_tags(self):
        self._host_tags = "{}"

    def get_config(self, config_option):
        return self._config.get(config_option, '')

    def get_version(self):
        return '0.0.0'

    def log(self, *args, **kwargs):
        pass

    def set_check_metadata(self, check_id, name, value):
        self._metadata[(check_id, name)] = value

    def send_log(self, log_line, check_id):
        self._sent_logs[check_id].append(from_json(log_line))

    def set_external_tags(self, external_tags):
        self._external_tags = external_tags

    def tracemalloc_enabled(self, *args, **kwargs):
        return False

    def write_persistent_cache(self, key, value):
        self._cache[key] = value

    def read_persistent_cache(self, key):
        return self._cache.get(key, '')

    def obfuscate_sql(self, query, options=None):
        # Full obfuscation implementation is in go code.
        if options:
            # Options provided is a JSON string because the Go stub requires it, whereas
            # the python stub does not for things such as testing.
            if from_json(options).get('return_json_metadata', False):
                return to_json({'query': re.sub(r'\s+', ' ', query or '').strip(), 'metadata': {}})
        return re.sub(r'\s+', ' ', query or '').strip()

    def obfuscate_sql_exec_plan(self, plan, normalize=False):
        # Passthrough stub: obfuscation implementation is in Go code.
        return plan

    def get_process_start_time(self):
        return self._process_start_time

    def set_process_start_time(self, time):
        self._process_start_time = time

    def obfuscate_mongodb_string(self, command):
        # Passthrough stub: obfuscation implementation is in Go code.
        return command

    def emit_agent_telemetry(self, check_name, metric_name, metric_value, metric_type):
        self._sent_telemetry[(check_name, metric_name, metric_type)].append(metric_value)

assert_metadata(check_id, data)

Source code in datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py
def assert_metadata(self, check_id, data):
    actual = {}
    for name in data:
        key = (check_id, name)
        if key in self._metadata:
            actual[name] = self._metadata[key]
    assert data == actual

assert_metadata_count(count)

Source code in datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py
def assert_metadata_count(self, count):
    metadata_items = len(self._metadata)
    assert metadata_items == count, 'Expected {} metadata items, found {}. Submitted metadata: {}'.format(
        count, metadata_items, repr(self._metadata)
    )

reset()

Source code in datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py
def reset(self):
    self._sent_logs.clear()
    self._metadata.clear()
    self._cache.clear()
    self._config = self.get_default_config()
    self._process_start_time = 0
    self._external_tags = []
    self._host_tags = "{}"

Last update: September 4, 2024