API¶
datadog_checks.base.checks.base.AgentCheck
¶
The base class for any Agent based integration.
In general, you don't need to and you should not override anything from the base class except the check
method but sometimes it might be useful for a Check to have its own constructor.
When overriding __init__
you have to remember that, depending on the configuration, the Agent might create several different Check instances and the method would be called as many times.
Agent 6,7 signature:
AgentCheck(name, init_config, instances) # instances contain only 1 instance
AgentCheck.check(instance)
Agent 8 signature:
AgentCheck(name, init_config, instance) # one instance
AgentCheck.check() # no more instance argument for check method
Note
when loading a Custom check, the Agent will inspect the module searching for a subclass of AgentCheck
. If such a class exists but has been derived in turn, it'll be ignored - you should never derive from an existing Check.
__init__(self, *args, **kwargs)
special
¶
- name (str) - the name of the check
- init_config (dict) - the
init_config
section of the configuration. - instance (List[dict]) - a one-element list containing the instance options from the configuration file (a list is used to keep backward compatibility with older versions of the Agent).
Source code in
def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
"""
- **name** (_str_) - the name of the check
- **init_config** (_dict_) - the `init_config` section of the configuration.
- **instance** (_List[dict]_) - a one-element list containing the instance options from the
configuration file (a list is used to keep backward compatibility with
older versions of the Agent).
"""
# NOTE: these variable assignments exist to ease type checking when eventually assigned as attributes.
name = kwargs.get('name', '')
init_config = kwargs.get('init_config', {})
agentConfig = kwargs.get('agentConfig', {})
instances = kwargs.get('instances', [])
if len(args) > 0:
name = args[0]
if len(args) > 1:
init_config = args[1]
if len(args) > 2:
# agent pass instances as tuple but in test we are usually using list, so we are testing for both
if len(args) > 3 or not isinstance(args[2], (list, tuple)) or 'instances' in kwargs:
# old-style init: the 3rd argument is `agentConfig`
agentConfig = args[2]
if len(args) > 3:
instances = args[3]
else:
# new-style init: the 3rd argument is `instances`
instances = args[2]
# NOTE: Agent 6+ should pass exactly one instance... But we are not abiding by that rule on our side
# everywhere just yet. It's complicated... See: https://github.com/DataDog/integrations-core/pull/5573
instance = instances[0] if instances else None
self.check_id = ''
self.name = name # type: str
self.init_config = init_config # type: InitConfigType
self.agentConfig = agentConfig # type: AgentConfigType
self.instance = instance # type: InstanceType
self.instances = instances # type: List[InstanceType]
self.warnings = [] # type: List[str]
# `self.hostname` is deprecated, use `datadog_agent.get_hostname()` instead
self.hostname = datadog_agent.get_hostname() # type: str
logger = logging.getLogger('{}.{}'.format(__name__, self.name))
self.log = CheckLoggingAdapter(logger, self)
# TODO: Remove with Agent 5
# Set proxy settings
self.proxies = self._get_requests_proxy()
if not self.init_config:
self._use_agent_proxy = True
else:
self._use_agent_proxy = is_affirmative(self.init_config.get('use_agent_proxy', True))
# TODO: Remove with Agent 5
self.default_integration_http_timeout = float(self.agentConfig.get('default_integration_http_timeout', 9))
self._deprecations = {
'increment': (
False,
(
'DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please '
'use `AgentCheck.gauge` or `AgentCheck.count` instead, with a different metric name'
),
),
'device_name': (
False,
(
'DEPRECATION NOTICE: `device_name` is deprecated, please use a `device:` '
'tag in the `tags` list instead'
),
),
'in_developer_mode': (
False,
'DEPRECATION NOTICE: `in_developer_mode` is deprecated, please stop using it.',
),
'no_proxy': (
False,
(
'DEPRECATION NOTICE: The `no_proxy` config option has been renamed '
'to `skip_proxy` and will be removed in a future release.'
),
),
'service_tag': (
False,
(
'DEPRECATION NOTICE: The `service` tag is deprecated and has been renamed to `%s`. '
'Set `disable_legacy_service_tag` to `true` to disable this warning. '
'The default will become `true` and cannot be changed in Agent version 8.'
),
),
} # type: Dict[str, Tuple[bool, str]]
# Setup metric limits
self.metric_limiter = self._get_metric_limiter(self.name, instance=self.instance)
# Lazily load and validate config
self._config_model_instance = None # type: Any
self._config_model_shared = None # type: Any
# Functions that will be called exactly once (if successful) before the first check run
self.check_initializations = deque([self.send_config_metadata]) # type: Deque[Callable[[], None]]
if not PY2:
self.check_initializations.append(self.load_configuration_models)
count(self, name, value, tags=None, hostname=None, device_name=None, raw=False)
¶
Sample a raw count metric.
- name (str) - the name of the metric
- value (float) - the value for the metric
- tags (List[str]) - a list of tags to associate with this metric
- hostname (str) - a hostname to associate with this metric. Defaults to the current host.
- device_name (str) - deprecated add a tag in the form
device:<device_name>
to thetags
list instead. - raw (bool) - whether to ignore any defined namespace prefix
Source code in
def count(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
# type: (str, float, Sequence[str], str, str, bool) -> None
"""Sample a raw count metric.
- **name** (_str_) - the name of the metric
- **value** (_float_) - the value for the metric
- **tags** (_List[str]_) - a list of tags to associate with this metric
- **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
- **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
list instead.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
"""
self._submit_metric(
aggregator.COUNT, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
)
event(self, event)
¶
Send an event.
An event is a dictionary with the following keys and data types:
{
"timestamp": int, # the epoch timestamp for the event
"event_type": str, # the event name
"api_key": str, # the api key for your account
"msg_title": str, # the title of the event
"msg_text": str, # the text body of the event
"aggregation_key": str, # a key to use for aggregating events
"alert_type": str, # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
"source_type_name": str, # (optional) the source type name
"host": str, # (optional) the name of the host
"tags": list, # (optional) a list of tags to associate with this event
"priority": str, # (optional) specifies the priority of the event ("normal" or "low")
}
- event (dict) - the event to be sent
Source code in
def event(self, event):
# type: (Event) -> None
"""Send an event.
An event is a dictionary with the following keys and data types:
```python
{
"timestamp": int, # the epoch timestamp for the event
"event_type": str, # the event name
"api_key": str, # the api key for your account
"msg_title": str, # the title of the event
"msg_text": str, # the text body of the event
"aggregation_key": str, # a key to use for aggregating events
"alert_type": str, # (optional) one of ('error', 'warning', 'success', 'info'), defaults to 'info'
"source_type_name": str, # (optional) the source type name
"host": str, # (optional) the name of the host
"tags": list, # (optional) a list of tags to associate with this event
"priority": str, # (optional) specifies the priority of the event ("normal" or "low")
}
```
- **event** (_dict_) - the event to be sent
"""
# Enforce types of some fields, considerably facilitates handling in go bindings downstream
for key, value in iteritems(event):
if not isinstance(value, (text_type, binary_type)):
continue
try:
event[key] = to_native_string(value) # type: ignore
# ^ Mypy complains about dynamic key assignment -- arguably for good reason.
# Ideally we should convert this to a dict literal so that submitted events only include known keys.
except UnicodeError:
self.log.warning('Encoding error with field `%s`, cannot submit event', key)
return
if event.get('tags'):
event['tags'] = self._normalize_tags_type(event['tags'])
if event.get('timestamp'):
event['timestamp'] = int(event['timestamp'])
if event.get('aggregation_key'):
event['aggregation_key'] = to_native_string(event['aggregation_key'])
if self.__NAMESPACE__:
event.setdefault('source_type_name', self.__NAMESPACE__)
aggregator.submit_event(self, self.check_id, event)
gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False)
¶
Sample a gauge metric.
Parameters:
- name (str) - the name of the metric
- value (float) - the value for the metric
- tags (List[str]) - a list of tags to associate with this metric
- hostname (str) - a hostname to associate with this metric. Defaults to the current host.
- device_name (str) - deprecated add a tag in the form
device:<device_name>
to thetags
list instead. - raw (bool) - whether to ignore any defined namespace prefix
Source code in
def gauge(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
# type: (str, float, Sequence[str], str, str, bool) -> None
"""Sample a gauge metric.
**Parameters:**
- **name** (_str_) - the name of the metric
- **value** (_float_) - the value for the metric
- **tags** (_List[str]_) - a list of tags to associate with this metric
- **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
- **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
list instead.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
"""
self._submit_metric(
aggregator.GAUGE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
)
histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False)
¶
Sample a histogram metric.
- name (str) - the name of the metric
- value (float) - the value for the metric
- tags (List[str]) - a list of tags to associate with this metric
- hostname (str) - a hostname to associate with this metric. Defaults to the current host.
- device_name (str) - deprecated add a tag in the form
device:<device_name>
to thetags
list instead. - raw (bool) - whether to ignore any defined namespace prefix
Source code in
def histogram(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
# type: (str, float, Sequence[str], str, str, bool) -> None
"""Sample a histogram metric.
- **name** (_str_) - the name of the metric
- **value** (_float_) - the value for the metric
- **tags** (_List[str]_) - a list of tags to associate with this metric
- **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
- **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
list instead.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
"""
self._submit_metric(
aggregator.HISTOGRAM, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
)
historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False)
¶
Sample a histogram based on rate metrics.
- name (str) - the name of the metric
- value (float) - the value for the metric
- tags (List[str]) - a list of tags to associate with this metric
- hostname (str) - a hostname to associate with this metric. Defaults to the current host.
- device_name (str) - deprecated add a tag in the form
device:<device_name>
to thetags
list instead. - raw (bool) - whether to ignore any defined namespace prefix
Source code in
def historate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
# type: (str, float, Sequence[str], str, str, bool) -> None
"""Sample a histogram based on rate metrics.
- **name** (_str_) - the name of the metric
- **value** (_float_) - the value for the metric
- **tags** (_List[str]_) - a list of tags to associate with this metric
- **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
- **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
list instead.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
"""
self._submit_metric(
aggregator.HISTORATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
)
metadata_entrypoint(method)
classmethod
¶
Skip execution of the decorated method if metadata collection is disabled on the Agent.
Usage:
class MyCheck(AgentCheck):
@AgentCheck.metadata_entrypoint
def collect_metadata(self):
...
Source code in
@classmethod
def metadata_entrypoint(cls, method):
# type: (Callable[..., None]) -> Callable[..., None]
"""
Skip execution of the decorated method if metadata collection is disabled on the Agent.
Usage:
```python
class MyCheck(AgentCheck):
@AgentCheck.metadata_entrypoint
def collect_metadata(self):
...
```
"""
@functools.wraps(method)
def entrypoint(self, *args, **kwargs):
# type: (AgentCheck, *Any, **Any) -> None
if not self.is_metadata_collection_enabled():
return
# NOTE: error handling still at the discretion of the wrapped method.
method(self, *args, **kwargs)
return entrypoint
monotonic_count(self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False)
¶
Sample an increasing counter metric.
- name (str) - the name of the metric
- value (float) - the value for the metric
- tags (List[str]) - a list of tags to associate with this metric
- hostname (str) - a hostname to associate with this metric. Defaults to the current host.
- device_name (str) - deprecated add a tag in the form
device:<device_name>
to thetags
list instead. - raw (bool) - whether to ignore any defined namespace prefix
- flush_first_value (bool) - whether to sample the first value
Source code in
def monotonic_count(
self, name, value, tags=None, hostname=None, device_name=None, raw=False, flush_first_value=False
):
# type: (str, float, Sequence[str], str, str, bool, bool) -> None
"""Sample an increasing counter metric.
- **name** (_str_) - the name of the metric
- **value** (_float_) - the value for the metric
- **tags** (_List[str]_) - a list of tags to associate with this metric
- **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
- **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
list instead.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
- **flush_first_value** (_bool_) - whether to sample the first value
"""
self._submit_metric(
aggregator.MONOTONIC_COUNT,
name,
value,
tags=tags,
hostname=hostname,
device_name=device_name,
raw=raw,
flush_first_value=flush_first_value,
)
rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False)
¶
Sample a point, with the rate calculated at the end of the check.
- name (str) - the name of the metric
- value (float) - the value for the metric
- tags (List[str]) - a list of tags to associate with this metric
- hostname (str) - a hostname to associate with this metric. Defaults to the current host.
- device_name (str) - deprecated add a tag in the form
device:<device_name>
to thetags
list instead. - raw (bool) - whether to ignore any defined namespace prefix
Source code in
def rate(self, name, value, tags=None, hostname=None, device_name=None, raw=False):
# type: (str, float, Sequence[str], str, str, bool) -> None
"""Sample a point, with the rate calculated at the end of the check.
- **name** (_str_) - the name of the metric
- **value** (_float_) - the value for the metric
- **tags** (_List[str]_) - a list of tags to associate with this metric
- **hostname** (_str_) - a hostname to associate with this metric. Defaults to the current host.
- **device_name** (_str_) - **deprecated** add a tag in the form `device:<device_name>` to the `tags`
list instead.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
"""
self._submit_metric(
aggregator.RATE, name, value, tags=tags, hostname=hostname, device_name=device_name, raw=raw
)
service_check(self, name, status, tags=None, hostname=None, message=None, raw=False)
¶
Send the status of a service.
- name (str) - the name of the service check
- status (int) - a constant describing the service status.
- tags (List[str]) - a list of tags to associate with this service check
- message (str) - additional information or a description of why this status occurred.
- raw (bool) - whether to ignore any defined namespace prefix
Source code in
def service_check(self, name, status, tags=None, hostname=None, message=None, raw=False):
# type: (str, ServiceCheckStatus, Sequence[str], str, str, bool) -> None
"""Send the status of a service.
- **name** (_str_) - the name of the service check
- **status** (_int_) - a constant describing the service status.
- **tags** (_List[str]_) - a list of tags to associate with this service check
- **message** (_str_) - additional information or a description of why this status occurred.
- **raw** (_bool_) - whether to ignore any defined namespace prefix
"""
tags = self._normalize_tags_type(tags or [])
if hostname is None:
hostname = ''
if message is None:
message = ''
else:
message = to_native_string(message)
message = self.sanitize(message)
aggregator.submit_service_check(
self, self.check_id, self._format_namespace(name, raw), status, tags, hostname, message
)
set_metadata(self, name, value, **options)
¶
Updates the cached metadata name
with value
, which is then sent by the Agent at regular intervals.
:param str name: the name of the metadata :param object value: the value for the metadata. if name
has no transformer defined then the raw value
will be submitted and therefore it must be a str
:param options: keyword arguments to pass to any defined transformer
Source code in
def set_metadata(self, name, value, **options):
# type: (str, Any, **Any) -> None
"""Updates the cached metadata ``name`` with ``value``, which is then sent by the Agent at regular intervals.
:param str name: the name of the metadata
:param object value: the value for the metadata. if ``name`` has no transformer defined then the
raw ``value`` will be submitted and therefore it must be a ``str``
:param options: keyword arguments to pass to any defined transformer
"""
self.metadata_manager.submit(name, value, options)
Stubs¶
datadog_checks.base.stubs.aggregator.AggregatorStub
¶
This implements the methods defined by the Agent's C bindings which in turn call the Go backend.
It also provides utility methods for test assertions.
assert_all_metrics_covered(self)
¶
Source code in
def assert_all_metrics_covered(self):
# use `condition` to avoid building the `msg` if not needed
condition = self.metrics_asserted_pct >= 100.0
msg = ''
if not condition:
prefix = '\n\t- '
msg = 'Some metrics are missing:'
msg += '\nAsserted Metrics:{}{}'.format(prefix, prefix.join(sorted(self._asserted)))
msg += '\nMissing Metrics:{}{}'.format(prefix, prefix.join(sorted(self.not_asserted())))
assert condition, msg
assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs)
¶
Source code in
def assert_event(self, msg_text, count=None, at_least=1, exact_match=True, tags=None, **kwargs):
candidates = []
for e in self.events:
if exact_match and msg_text != e['msg_text'] or msg_text not in e['msg_text']:
continue
if tags and set(tags) != set(e['tags']):
continue
for name, value in iteritems(kwargs):
if e[name] != value:
break
else:
candidates.append(e)
msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(msg_text, count, at_least)
if count is not None:
assert len(candidates) == count, msg
else:
assert len(candidates) >= at_least, msg
assert_metric(self, name, value=None, tags=None, count=None, at_least=1, hostname=None, metric_type=None, device=None)
¶
Assert a metric was processed by this stub
Source code in
def assert_metric(
self, name, value=None, tags=None, count=None, at_least=1, hostname=None, metric_type=None, device=None
):
"""
Assert a metric was processed by this stub
"""
self._asserted.add(name)
expected_tags = normalize_tags(tags, sort=True)
candidates = []
for metric in self.metrics(name):
if value is not None and not self.is_aggregate(metric.type) and value != metric.value:
continue
if expected_tags and expected_tags != sorted(metric.tags):
continue
if hostname is not None and hostname != metric.hostname:
continue
if metric_type is not None and metric_type != metric.type:
continue
if device is not None and device != metric.device:
continue
candidates.append(metric)
expected_metric = MetricStub(name, metric_type, value, tags, hostname, device)
if value is not None and candidates and all(self.is_aggregate(m.type) for m in candidates):
got = sum(m.value for m in candidates)
msg = "Expected count value for '{}': {}, got {}".format(name, value, got)
condition = value == got
elif count is not None:
msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
condition = len(candidates) == count
else:
msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
condition = len(candidates) >= at_least
self._assert(condition, msg=msg, expected_stub=expected_metric, submitted_elements=self._metrics)
assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1)
¶
Assert a metric is tagged with tag
Source code in
def assert_metric_has_tag(self, metric_name, tag, count=None, at_least=1):
"""
Assert a metric is tagged with tag
"""
self._asserted.add(metric_name)
candidates = []
for metric in self.metrics(metric_name):
if tag in metric.tags:
candidates.append(metric)
msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
if count is not None:
assert len(candidates) == count, msg
else:
assert len(candidates) >= at_least, msg
assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1)
¶
Source code in
def assert_metric_has_tag_prefix(self, metric_name, tag_prefix, count=None, at_least=1):
candidates = []
self._asserted.add(metric_name)
for metric in self.metrics(metric_name):
tags = metric.tags
gtags = [t for t in tags if t.startswith(tag_prefix)]
if len(gtags) > 0:
candidates.append(metric)
msg = "Candidates size assertion for `{}`, count: {}, at_least: {}) failed".format(metric_name, count, at_least)
if count is not None:
assert len(candidates) == count, msg
else:
assert len(candidates) >= at_least, msg
assert_no_duplicate_metrics(self)
¶
Assert no duplicate metrics have been submitted.
Metrics are considered duplicate when all following fields match:
- metric name
- type (gauge, rate, etc)
- tags
- hostname
Source code in
def assert_no_duplicate_metrics(self):
"""
Assert no duplicate metrics have been submitted.
Metrics are considered duplicate when all following fields match:
- metric name
- type (gauge, rate, etc)
- tags
- hostname
"""
# metric types that intended to be called multiple times are ignored
ignored_types = [self.COUNT, self.MONOTONIC_COUNT, self.COUNTER]
metric_stubs = [m for metrics in self._metrics.values() for m in metrics if m.type not in ignored_types]
def stub_to_key_fn(stub):
return stub.name, stub.type, str(sorted(stub.tags)), stub.hostname
self._assert_no_duplicate_stub('metric', metric_stubs, stub_to_key_fn)
assert_no_duplicate_service_checks(self)
¶
Assert no duplicate service checks have been submitted.
Service checks are considered duplicate when all following fields match: - metric name - status - tags - hostname
Source code in
def assert_no_duplicate_service_checks(self):
"""
Assert no duplicate service checks have been submitted.
Service checks are considered duplicate when all following fields match:
- metric name
- status
- tags
- hostname
"""
service_check_stubs = [m for metrics in self._service_checks.values() for m in metrics]
def stub_to_key_fn(stub):
return stub.name, stub.status, str(sorted(stub.tags)), stub.hostname
self._assert_no_duplicate_stub('service_check', service_check_stubs, stub_to_key_fn)
assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None)
¶
Assert a service check was processed by this stub
Source code in
def assert_service_check(self, name, status=None, tags=None, count=None, at_least=1, hostname=None, message=None):
"""
Assert a service check was processed by this stub
"""
tags = normalize_tags(tags, sort=True)
candidates = []
for sc in self.service_checks(name):
if status is not None and status != sc.status:
continue
if tags and tags != sorted(sc.tags):
continue
if hostname is not None and hostname != sc.hostname:
continue
if message is not None and message != sc.message:
continue
candidates.append(sc)
expected_service_check = ServiceCheckStub(
None, name=name, status=status, tags=tags, hostname=hostname, message=message
)
if count is not None:
msg = "Needed exactly {} candidates for '{}', got {}".format(count, name, len(candidates))
condition = len(candidates) == count
else:
msg = "Needed at least {} candidates for '{}', got {}".format(at_least, name, len(candidates))
condition = len(candidates) >= at_least
self._assert(
condition=condition, msg=msg, expected_stub=expected_service_check, submitted_elements=self._service_checks
)
reset(self)
¶
Set the stub to its initial state
Source code in
def reset(self):
"""
Set the stub to its initial state
"""
self._metrics = defaultdict(list)
self._asserted = set()
self._service_checks = defaultdict(list)
self._events = []
datadog_checks.base.stubs.datadog_agent.DatadogAgentStub
¶
This implements the methods defined by the Agent's C bindings which in turn call the Go backend.
It also provides utility methods for test assertions.
assert_metadata(self, check_id, data)
¶
Source code in
def assert_metadata(self, check_id, data):
actual = {}
for name in data:
key = (check_id, name)
if key in self._metadata:
actual[name] = self._metadata[key]
assert data == actual
assert_metadata_count(self, count)
¶
Source code in
def assert_metadata_count(self, count):
assert len(self._metadata) == count
reset(self)
¶
Source code in
def reset(self):
self._metadata.clear()
self._cache.clear()
self._config = self.get_default_config()