Skip to content

API


datadog_checks.base.checks.base.AgentCheck

The base class for any Agent based integration.

In general, you don't need to and you should not override anything from the base class except the check method but sometimes it might be useful for a Check to have its own constructor.

When overriding __init__ you have to remember that, depending on the configuration, the Agent might create several different Check instances and the method would be called as many times.

Agent 6,7 signature:

AgentCheck(name, init_config, instances)    # instances contain only 1 instance
AgentCheck.check(instance)

Agent 8 signature:

AgentCheck(name, init_config, instance)     # one instance
AgentCheck.check()                          # no more instance argument for check method

Note

when loading a Custom check, the Agent will inspect the module searching for a subclass of AgentCheck. If such a class exists but has been derived in turn, it'll be ignored - you should never derive from an existing Check.

__init__(self, *args, **kwargs) special

  • name (str) - the name of the check
  • init_config (dict) - the init_config section of the configuration.
  • instance (List[dict]) - a one-element list containing the instance options from the configuration file (a list is used to keep backward compatibility with older versions of the Agent).
Source code in
def __init__(self, *args, **kwargs):
    # type: (*Any, **Any) -> None
    """
    - **name** (_str_) - the name of the check
    - **init_config** (_dict_) - the `init_config` section of the configuration.
    - **instance** (_List[dict]_) - a one-element list containing the instance options from the
            configuration file (a list is used to keep backward compatibility with
            older versions of the Agent).
    """
    # NOTE: these variable assignments exist to ease type checking when eventually assigned as attributes.
    name = kwargs.get('name', '')
    init_config = kwargs.get('init_config', {})
    agentConfig = kwargs.get('agentConfig', {})
    instances = kwargs.get('instances', [])

    if len(args) > 0:
        name = args[0]
    if len(args) > 1:
        init_config = args[1]
    if len(args) > 2:
        # agent pass instances as tuple but in test we are usually using list, so we are testing for both
        if len(args) > 3 or not isinstance(args[2], (list, tuple)) or 'instances' in kwargs:
            # old-style init: the 3rd argument is `agentConfig`
            agentConfig = args[2]
            if len(args) > 3:
                instances = args[3]
        else:
            # new-style init: the 3rd argument is `instances`
            instances = args[2]

    # NOTE: Agent 6+ should pass exactly one instance... But we are not abiding by that rule on our side
    # everywhere just yet. It's complicated... See: https://github.com/DataDog/integrations-core/pull/5573
    instance = instances[0] if instances else None

    self.check_id = ''
    self.name = name  # type: str
    self.init_config = init_config  # type: InitConfigType
    self.agentConfig = agentConfig  # type: AgentConfigType
    self.instance = instance  # type: InstanceType
    self.instances = instances  # type: List[InstanceType]
    self.warnings = []  # type: List[str]
    self.disable_generic_tags = (
        is_affirmative(self.instance.get('disable_generic_tags', False)) if instance else False
    )

    # `self.hostname` is deprecated, use `datadog_agent.get_hostname()` instead
    self.hostname = datadog_agent.get_hostname()  # type: str

    logger = logging.getLogger('{}.{}'.format(__name__, self.name))
    self.log = CheckLoggingAdapter(logger, self)

    # TODO: Remove with Agent 5
    # Set proxy settings
    self.proxies = self._get_requests_proxy()
    if not self.init_config:
        self._use_agent_proxy = True
    else:
        self._use_agent_proxy = is_affirmative(self.init_config.get('use_agent_proxy', True))

    # TODO: Remove with Agent 5
    self.default_integration_http_timeout = float(self.agentConfig.get('default_integration_http_timeout', 9))

    self._deprecations = {
        'increment': (
            False,
            (
                'DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please '
                'use `AgentCheck.gauge` or `AgentCheck.count` instead, with a different metric name'
            ),
        ),
        'device_name': (
            False,
            (
                'DEPRECATION NOTICE: `device_name` is deprecated, please use a `device:` '
                'tag in the `tags` list instead'
            ),
        ),
        'in_developer_mode': (
            False,
            'DEPRECATION NOTICE: `in_developer_mode` is deprecated, please stop using it.',
        ),
        'no_proxy': (
            False,
            (
                'DEPRECATION NOTICE: The `no_proxy` config option has been renamed '
                'to `skip_proxy` and will be removed in a future release.'
            ),
        ),
        'service_tag': (
            False,
            (
                'DEPRECATION NOTICE: The `service` tag is deprecated and has been renamed to `%s`. '
                'Set `disable_legacy_service_tag` to `true` to disable this warning. '
                'The default will become `true` and cannot be changed in Agent version 8.'
            ),
        ),
        '_config_renamed': (
            False,
            (
                'DEPRECATION NOTICE: The `%s` config option has been renamed '
                'to `%s` and will be removed in a future release.'
            ),
        ),
    }  # type: Dict[str, Tuple[bool, str]]

    # Setup metric limits
    self.metric_limiter = self._get_metric_limiter(self.name, instance=self.instance)

    # Lazily load and validate config
    self._config_model_instance = None  # type: Any
    self._config_model_shared = None  # type: Any

    # Functions that will be called exactly once (if successful) before the first check run
    self.check_initializations = deque([self.send_config_metadata])  # type: Deque[Callable[[], None]]

    if not PY2:
        self.check_initializations.append(self.load_configuration_models)

count(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a raw count metric.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in
def count(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a raw count metric.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

event(self, event)

Send an event.

An event is a dictionary with the following keys and data types:

{
    "timestamp": int,        # the epoch timestamp for the event
    "event_type": str,       # the event name
    "api_key": str,          # the api key for your account
    "msg_title": str,        # the title of the event
    "msg_text": str,         # the text body of the event
    "aggregation_key": str,  # a key to use for aggregating events
    "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
    "source_type_name": str, # (optional) the source type name
    "host": str,             # (optional) the name of the host
    "tags": list,            # (optional) a list of tags to associate with this event
    "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
}
  • event (dict) - the event to be sent
Source code in
def event(self, event):
    # type: (Event) -> None
    """Send an event.

    An event is a dictionary with the following keys and data types:

    ```python
    {
        "timestamp": int,        # the epoch timestamp for the event
        "event_type": str,       # the event name
        "api_key": str,          # the api key for your account
        "msg_title": str,        # the title of the event
        "msg_text": str,         # the text body of the event
        "aggregation_key": str,  # a key to use for aggregating events
        "alert_type": str,       # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
        "source_type_name": str, # (optional) the source type name
        "host": str,             # (optional) the name of the host
        "tags": list,            # (optional) a list of tags to associate with this event
        "priority": str,         # (optional) specifies the priority of the event ("normal" or "low")
    }
    ```

    - **event** (_dict_) - the event to be sent
    """
    # Enforce types of some fields, considerably facilitates handling in go bindings downstream
    for key, value in iteritems(event):
        if not isinstance(value, (text_type, binary_type)):
            continue

        try:
            event[key] = to_native_string(value)  # type: ignore
            # ^ Mypy complains about dynamic key assignment -- arguably for good reason.
            # Ideally we should convert this to a dict literal so that submitted events only include known keys.
        except UnicodeError:
            self.log.warning('Encoding error with field `%s`, cannot submit event', key)
            return

    if event.get('tags'):
        event['tags'] = self._normalize_tags_type(event['tags'])
    if event.get('timestamp'):
        event['timestamp'] = int(event['timestamp'])
    if event.get('aggregation_key'):
        event['aggregation_key'] = to_native_string(event['aggregation_key'])

    if self.__NAMESPACE__:
        event.setdefault('source_type_name', self.__NAMESPACE__)

    aggregator.submit_event(self, self.check_id, event)

gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a gauge metric.

Parameters:

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in
def gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a gauge metric.

    **Parameters:**

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a histogram metric.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in
def histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a histogram metric.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a histogram based on rate metrics.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in
def historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a histogram based on rate metrics.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.HISTORATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

metadata_entrypoint(method) classmethod

Skip execution of the decorated method if metadata collection is disabled on the Agent.

Usage:

class MyCheck(AgentCheck):
    @AgentCheck.metadata_entrypoint
    def collect_metadata(self):
        ...
Source code in
@classmethod
def metadata_entrypoint(cls, method):
    # type: (Callable[..., None]) -> Callable[..., None]
    """
    Skip execution of the decorated method if metadata collection is disabled on the Agent.

    Usage:

    ```python
    class MyCheck(AgentCheck):
        @AgentCheck.metadata_entrypoint
        def collect_metadata(self):
            ...
    ```
    """

    @functools.wraps(method)
    def entrypoint(self, *args, **kwargs):
        # type: (AgentCheck, *Any, **Any) -> None
        if not self.is_metadata_collection_enabled():
            return

        # NOTE: error handling still at the discretion of the wrapped method.
        method(self, *args, **kwargs)

    return entrypoint

monotonic_count(self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False)

Sample an increasing counter metric.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
  • flush_first_value (bool) - whether to sample the first value
Source code in
def monotonic_count(
    self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
):
    # type: (str, float, Sequence[str], str, str, bool, bool) -> None
    """Sample an increasing counter metric.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    - **flush_first_value** (_bool_) - whether to sample the first value
    """
    self._submit_metric(
        aggregator.MONOTONIC_COUNT,
        name,
        value,
        tags=tags,
        hostname=hostname,
        device_name=device_name,
        raw=raw,
        flush_first_value=flush_first_value,
    )

rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False)

Sample a point, with the rate calculated at the end of the check.

  • name (str) - the name of the metric
  • value (float) - the value for the metric
  • tags (List[str]) - a list of tags to associate with this metric
  • hostname (str) - a hostname to associate with this metric. Defaults to the current host.
  • device_name (str) - deprecated add a tag in the form device:<device_name> to the tags list instead.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in
def rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
    # type: (str, float, Sequence[str], str, str, bool) -> None
    """Sample a point, with the rate calculated at the end of the check.

    - **name** (_str_) - the name of the metric
    - **value** (_float_) - the value for the metric
    - **tags** (_List[str]_) - a list of tags to associate with this metric
    - **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
    - **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
        list instead.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    self._submit_metric(
        aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
    )

service_check(self, name, status, tags=None, hostname=None, message=None, raw=False)

Send the status of a service.

  • name (str) - the name of the service check
  • status (int) - a constant describing the service status.
  • tags (List[str]) - a list of tags to associate with this service check
  • message (str) - additional information or a description of why this status occurred.
  • raw (bool) - whether to ignore any defined namespace prefix
Source code in
def service_check(self, name, status, tags=None, hostname=None, message=None, raw=False):
    # type: (str, ServiceCheckStatus, Sequence[str], str, str, bool) -> None
    """Send the status of a service.

    - **name** (_str_) - the name of the service check
    - **status** (_int_) - a constant describing the service status.
    - **tags** (_List[str]_) - a list of tags to associate with this service check
    - **message** (_str_) - additional information or a description of why this status occurred.
    - **raw** (_bool_) - whether to ignore any defined namespace prefix
    """
    tags = self._normalize_tags_type(tags or [])
    if hostname is None:
        hostname = ''
    if message is None:
        message = ''
    else:
        message = to_native_string(message)

    message = self.sanitize(message)

    aggregator.submit_service_check(
        self, self.check_id, self._format_namespace(name, raw), status, tags, hostname, message
    )

set_metadata(self, name, value, **options)

Updates the cached metadata name with value, which is then sent by the Agent at regular intervals.

:param str name: the name of the metadata :param object value: the value for the metadata. if name has no transformer defined then the raw value will be submitted and therefore it must be a str :param options: keyword arguments to pass to any defined transformer

Source code in
def set_metadata(self, name, value, **options):
    # type: (str, Any, **Any) -> None
    """Updates the cached metadata ``name`` with ``value``, which is then sent by the Agent at regular intervals.

    :param str name: the name of the metadata
    :param object value: the value for the metadata. if ``name`` has no transformer defined then the
                         raw ``value`` will be submitted and therefore it must be a ``str``
    :param options: keyword arguments to pass to any defined transformer
    """
    self.metadata_manager.submit(name, value, options)

Stubs

datadog_checks.base.stubs.aggregator.AggregatorStub

This implements the methods defined by the Agent's C bindings which in turn call the Go backend.

It also provides utility methods for test assertions.

assert_all_metrics_covered(self)

Source code in
def assert_all_metrics_covered(self):
    # use `condition` to avoid building the `msg` if not needed
    condition = self.metrics_asserted_pct >= 100.0
    msg = ''
    if not condition:
        prefix = '\n\t- '
        msg = 'Some metrics are missing:'
        msg += '\nAsserted Metrics:{}{}'.format(prefix, prefix.join(sorted(self._asserted)))
        msg += '\nMissing Metrics:{}{}'.format(prefix, prefix.join(sorted(self.not_asserted())))
    assert condition, msg

assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs)

Source code in
def assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs):
    candidates = []
    for e in self.events:
        if exact_match and msg_text != e['msg_text'] or msg_text not in e['msg_text']:
            continue
        if tags and set(tags) != set(e['tags']):
            continue
        for name, value in iteritems(kwargs):
            if e[name] != value:
                break
        else:
            candidates.append(e)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(msg_text, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_metric(self, name, value=None, tags=None, count=None, at_least=1, hostname=None, metric_type=None, device=None)

Assert a metric was processed by this stub

Source code in
def assert_metric(
    self, name, value=None, tags=None, count=None, at_least=1, hostname=None, metric_type=None, device=None
):
    """
    Assert a metric was processed by this stub
    """

    self._asserted.add(name)
    expected_tags = normalize_tags(tags, sort=True)

    candidates = []
    for metric in self.metrics(name):
        if value is not None and not self.is_aggregate(metric.type) and value != metric.value:
            continue

        if expected_tags and expected_tags != sorted(metric.tags):
            continue

        if hostname is not None and hostname != metric.hostname:
            continue

        if metric_type is not None and metric_type != metric.type:
            continue

        if device is not None and device != metric.device:
            continue

        candidates.append(metric)

    expected_metric = MetricStub(name, metric_type, value, tags, hostname, device)

    if value is not None and candidates and all(self.is_aggregate(m.type) for m in candidates):
        got = sum(m.value for m in candidates)
        msg = "Expected count value for '{}': {}, got {}".format(name, value, got)
        condition = value == got
    elif count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(condition, msg=msg, expected_stub=expected_metric, submitted_elements=self._metrics)

assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1)

Assert a metric is tagged with tag

Source code in
def assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1):
    """
    Assert a metric is tagged with tag
    """
    self._asserted.add(metric_name)

    candidates = []
    for metric in self.metrics(metric_name):
        if tag in metric.tags:
            candidates.append(metric)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1)

Source code in
def assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1):
    candidates = []
    self._asserted.add(metric_name)

    for metric in self.metrics(metric_name):
        tags = metric.tags
        gtags = [t for t in tags if t.startswith(tag_prefix)]
        if len(gtags) > 0:
            candidates.append(metric)

    msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
    if count is not None:
        assert len(candidates) == count, msg
    else:
        assert len(candidates) >= at_least, msg

assert_no_duplicate_metrics(self)

Assert no duplicate metrics have been submitted.

Metrics are considered duplicate when all following fields match:

  • metric name
  • type (gauge, rate, etc)
  • tags
  • hostname
Source code in
def assert_no_duplicate_metrics(self):
    """
    Assert no duplicate metrics have been submitted.

    Metrics are considered duplicate when all following fields match:

    - metric name
    - type (gauge, rate, etc)
    - tags
    - hostname
    """
    # metric types that intended to be called multiple times are ignored
    ignored_types = [self.COUNT, self.COUNTER]
    metric_stubs = [m for metrics in self._metrics.values() for m in metrics if m.type not in ignored_types]

    def stub_to_key_fn(stub):
        return stub.name, stub.type, str(sorted(stub.tags)), stub.hostname

    self._assert_no_duplicate_stub('metric', metric_stubs, stub_to_key_fn)

assert_no_duplicate_service_checks(self)

Assert no duplicate service checks have been submitted.

Service checks are considered duplicate when all following fields match: - metric name - status - tags - hostname

Source code in
def assert_no_duplicate_service_checks(self):
    """
    Assert no duplicate service checks have been submitted.

    Service checks are considered duplicate when all following fields match:
        - metric name
        - status
        - tags
        - hostname
    """
    service_check_stubs = [m for metrics in self._service_checks.values() for m in metrics]

    def stub_to_key_fn(stub):
        return stub.name, stub.status, str(sorted(stub.tags)), stub.hostname

    self._assert_no_duplicate_stub('service_check', service_check_stubs, stub_to_key_fn)

assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None)

Assert a service check was processed by this stub

Source code in
def assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None):
    """
    Assert a service check was processed by this stub
    """
    tags = normalize_tags(tags, sort=True)
    candidates = []
    for sc in self.service_checks(name):
        if status is not None and status != sc.status:
            continue

        if tags and tags != sorted(sc.tags):
            continue

        if hostname is not None and hostname != sc.hostname:
            continue

        if message is not None and message != sc.message:
            continue

        candidates.append(sc)

    expected_service_check = ServiceCheckStub(
        None, name=name, status=status, tags=tags, hostname=hostname, message=message
    )

    if count is not None:
        msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
        condition = len(candidates) == count
    else:
        msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
        condition = len(candidates) >= at_least
    self._assert(
        condition=condition, msg=msg, expected_stub=expected_service_check, submitted_elements=self._service_checks
    )

reset(self)

Set the stub to its initial state

Source code in
def reset(self):
    """
    Set the stub to its initial state
    """
    self._metrics = defaultdict(list)
    self._asserted = set()
    self._service_checks = defaultdict(list)
    self._events = []
    self._event_platform_events = defaultdict(list)

datadog_checks.base.stubs.datadog_agent.DatadogAgentStub

This implements the methods defined by the Agent's C bindings which in turn call the Go backend.

It also provides utility methods for test assertions.

assert_metadata(self, check_id, data)

Source code in
def assert_metadata(self, check_id, data):
    actual = {}
    for name in data:
        key = (check_id, name)
        if key in self._metadata:
            actual[name] = self._metadata[key]
    assert data == actual

assert_metadata_count(self, count)

Source code in
def assert_metadata_count(self, count):
    assert len(self._metadata) == count

reset(self)

Source code in
def reset(self):
    self._metadata.clear()
    self._cache.clear()
    self._config = self.get_default_config()

Last update: June 22, 2020