Skip to content

Test framework


Environments

Most integrations monitor services like databases or web servers, rather than system properties like CPU usage. For such cases, you'll want to spin up an environment and gracefully tear it down when tests finish.

We define all environment actions in a fixture called dd_environment that looks semantically like this:

@pytest.fixture(scope='session')
def dd_environment():
    try:
        set_up_env()
        yield some_default_config
    finally:
        tear_down_env()

This is not only used for regular tests, but is also the basis of our E2E testing. The start command executes everything before the yield and the stop command executes everything after it.

We provide a few utilities for common environment types.

Docker

The docker_run utility makes it easy to create services using docker-compose.

from datadog_checks.dev import docker_run

@pytest.fixture(scope='session')
def dd_environment():
    with docker_run(os.path.join(HERE, 'docker', 'compose.yaml')):
        yield ...

Read the reference for more information.

Terraform

The terraform_run utility makes it easy to create services from a directory of Terraform files.

from datadog_checks.dev.terraform import terraform_run

@pytest.fixture(scope='session')
def dd_environment():
    with terraform_run(os.path.join(HERE, 'terraform')):
        yield ...

Currently, we only use this for services that would be too complex to setup with Docker (like OpenStack) or things that cannot be provided by Docker (like vSphere). We provide some ready-to-use cloud templates that are available for referencing by default. We prefer using GCP when possible.

Terraform E2E tests are not run in our public CI as that would needlessly slow down builds.

Read the reference for more information.

Mocker

The mocker fixture is provided by the pytest-mock plugin. This fixture automatically restores anything that was mocked at the end of each test and is more ergonomic to use than stacking decorators or nesting context managers.

Here's an example from their docs:

def test_foo(mocker):
    # all valid calls
    mocker.patch('os.remove')
    mocker.patch.object(os, 'listdir', autospec=True)
    mocked_isfile = mocker.patch('os.path.isfile')

It also has many other nice features, like using pytest introspection when comparing calls.

Benchmarks

The benchmark fixture is provided by the pytest-benchmark plugin. It enables the profiling of functions with the low-overhead cProfile module.

It is quite useful for seeing the approximate time a given check takes to run, as well as gaining insight into any potential performance bottlenecks. You would use it like this:

def test_large_payload(benchmark, dd_run_check):
    check = AwesomeCheck('awesome', {}, [instance])

    # Run once to get any initialization out of the way.
    dd_run_check(check)

    benchmark(dd_run_check, check)

To add benchmarks, define environments in tox.ini with bench somewhere in their names:

[tox]
...
envlist =
    ...
    bench

...

[testenv:bench]

By default, the test command skips all benchmark environments. To run only benchmark environments use the --bench/-b flag. The results are sorted by tottime, which is the total time spent in the given function (and excluding time made in calls to sub-functions).

Logs

We provide an easy way to utilize log collection with E2E Docker environments.

  1. Pass mount_logs=True to docker_run. This will use the logs example in the integration's config spec. For example, the following defines 2 example log files:

    - template: logs
      example:
      - type: file
        path: /var/log/apache2/access.log
        source: apache
        service: apache
      - type: file
        path: /var/log/apache2/error.log
        source: apache
        service: apache
    
    Alternatives
    • If mount_logs is a sequence of int, only the selected indices (starting at 1) will be used. So, using the Apache example above, to only monitor the error log you would set it to [2].
    • In lieu of a config spec, for whatever reason, you may set mount_logs to a dict containing the standard logs key.
  2. All requested log files are available to reference as environment variables for any Docker calls as DD_LOG_<LOG_CONFIG_INDEX> where the indices start at 1.

    volumes:
    - ${DD_LOG_1}:/usr/local/apache2/logs/access_log
    - ${DD_LOG_2}:/usr/local/apache2/logs/error_log
    
  3. To send logs to a custom URL, set log_url for the configured organization.

Reference

datadog_checks.dev.docker

compose_file_active(compose_file)

Returns a bool indicating whether or not a compose file has any active services.

Source code in datadog_checks/dev/docker.py
def compose_file_active(compose_file):
    """
    Returns a `bool` indicating whether or not a compose file has any active services.
    """
    command = ['docker-compose', '-f', compose_file, 'ps']
    lines = run_command(command, capture='out', check=True).stdout.splitlines()

    for i, line in enumerate(lines, 1):
        if set(line.strip()) == {'-'}:
            return len(lines[i:]) >= 1

    return False

docker_run(compose_file=None, build=False, service_name=None, up=None, down=None, on_error=None, sleep=None, endpoints=None, log_patterns=None, mount_logs=False, conditions=None, env_vars=None, wrappers=None, attempts=None, attempts_wait=1)

A convenient context manager for safely setting up and tearing down Docker environments.

  • compose_file (str) - A path to a Docker compose file. A custom tear down is not required when using this.
  • build (bool) - Whether or not to build images for when compose_file is provided
  • service_name (str) - Optional name for when compose_file is provided
  • up (callable) - A custom setup callable
  • down (callable) - A custom tear down callable. This is required when using a custom setup.
  • on_error (callable) - A callable called in case of an unhandled exception
  • sleep (float) - Number of seconds to wait before yielding. This occurs after all conditions are successful.
  • endpoints (List[str]) - Endpoints to verify access for before yielding. Shorthand for adding CheckEndpoints(endpoints) to the conditions argument.
  • log_patterns (List[str|re.Pattern]) - Regular expression patterns to find in Docker logs before yielding. This is only available when compose_file is provided. Shorthand for adding CheckDockerLogs(compose_file, log_patterns) to the conditions argument.
  • mount_logs (bool) - Whether or not to mount log files in Agent containers based on example logs configuration
  • conditions (callable) - A list of callable objects that will be executed before yielding to check for errors
  • env_vars (dict) - A dictionary to update os.environ with during execution
  • wrappers (List[callable]) - A list of context managers to use during execution
  • attempts (int) - Number of attempts to run up successfully
  • attempts_wait (int) - Time to wait between attempts
Source code in datadog_checks/dev/docker.py
@contextmanager
def docker_run(
    compose_file=None,
    build=False,
    service_name=None,
    up=None,
    down=None,
    on_error=None,
    sleep=None,
    endpoints=None,
    log_patterns=None,
    mount_logs=False,
    conditions=None,
    env_vars=None,
    wrappers=None,
    attempts=None,
    attempts_wait=1,
):
    """
    A convenient context manager for safely setting up and tearing down Docker environments.

    - **compose_file** (_str_) - A path to a Docker compose file. A custom tear
      down is not required when using this.
    - **build** (_bool_) - Whether or not to build images for when `compose_file` is provided
    - **service_name** (_str_) - Optional name for when ``compose_file`` is provided
    - **up** (_callable_) - A custom setup callable
    - **down** (_callable_) - A custom tear down callable. This is required when using a custom setup.
    - **on_error** (_callable_) - A callable called in case of an unhandled exception
    - **sleep** (_float_) - Number of seconds to wait before yielding. This occurs after all conditions are successful.
    - **endpoints** (_List[str]_) - Endpoints to verify access for before yielding. Shorthand for adding
      `CheckEndpoints(endpoints)` to the `conditions` argument.
    - **log_patterns** (_List[str|re.Pattern]_) - Regular expression patterns to find in Docker logs before yielding.
      This is only available when `compose_file` is provided. Shorthand for adding
      `CheckDockerLogs(compose_file, log_patterns)` to the `conditions` argument.
    - **mount_logs** (_bool_) - Whether or not to mount log files in Agent containers based on example logs
      configuration
    - **conditions** (_callable_) - A list of callable objects that will be executed before yielding to
      check for errors
    - **env_vars** (_dict_) - A dictionary to update `os.environ` with during execution
    - **wrappers** (_List[callable]_) - A list of context managers to use during execution
    - **attempts** (_int_) - Number of attempts to run `up` successfully
    - **attempts_wait** (_int_) - Time to wait between attempts
    """
    if compose_file and up:
        raise TypeError('You must select either a compose file or a custom setup callable, not both.')

    if compose_file is not None:
        if not isinstance(compose_file, string_types):
            raise TypeError('The path to the compose file is not a string: {}'.format(repr(compose_file)))

        set_up = ComposeFileUp(compose_file, build=build, service_name=service_name)
        if down is not None:
            tear_down = down
        else:
            tear_down = ComposeFileDown(compose_file)
        if on_error is None:
            on_error = ComposeFileLogs(compose_file)
    else:
        set_up = up
        tear_down = down

    if attempts is not None:
        saved_set_up = set_up

        @retry(wait=wait_fixed(attempts_wait), stop=stop_after_attempt(attempts))
        def set_up_with_retry():
            return saved_set_up()

        set_up = set_up_with_retry

    docker_conditions = []

    if log_patterns is not None:
        if compose_file is None:
            raise ValueError(
                'The `log_patterns` convenience is unavailable when using '
                'a custom setup. Please use a custom condition instead.'
            )
        docker_conditions.append(CheckDockerLogs(compose_file, log_patterns))

    if conditions is not None:
        docker_conditions.extend(conditions)

    wrappers = list(wrappers) if wrappers is not None else []

    if mount_logs:
        if isinstance(mount_logs, dict):
            wrappers.append(shared_logs(mount_logs['logs']))
        # Easy mode, read example config
        else:
            # An extra level deep because of the context manager
            check_root = find_check_root(depth=2)

            example_log_configs = _read_example_logs_config(check_root)
            if mount_logs is True:
                wrappers.append(shared_logs(example_log_configs))
            elif isinstance(mount_logs, (list, set)):
                wrappers.append(shared_logs(example_log_configs, mount_whitelist=mount_logs))
            else:
                raise TypeError(
                    'mount_logs: expected True, a list or a set, but got {}'.format(type(mount_logs).__name__)
                )

    with environment_run(
        up=set_up,
        down=tear_down,
        on_error=on_error,
        sleep=sleep,
        endpoints=endpoints,
        conditions=docker_conditions,
        env_vars=env_vars,
        wrappers=wrappers,
    ) as result:
        yield result

get_container_ip(container_id_or_name)

Get a Docker container's IP address from its ID or name.

Source code in datadog_checks/dev/docker.py
def get_container_ip(container_id_or_name):
    """
    Get a Docker container's IP address from its ID or name.
    """
    command = [
        'docker',
        'inspect',
        '-f',
        '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}',
        container_id_or_name,
    ]

    return run_command(command, capture='out', check=True).stdout.strip()

get_docker_hostname()

Determine the hostname Docker uses based on the environment, defaulting to localhost.

Source code in datadog_checks/dev/docker.py
def get_docker_hostname():
    """
    Determine the hostname Docker uses based on the environment, defaulting to `localhost`.
    """
    return urlparse(os.getenv('DOCKER_HOST', '')).hostname or 'localhost'

datadog_checks.dev.terraform

terraform_run(directory, sleep=None, endpoints=None, conditions=None, env_vars=None, wrappers=None)

A convenient context manager for safely setting up and tearing down Terraform environments.

  • directory (str) - A path containing Terraform files
  • sleep (float) - Number of seconds to wait before yielding. This occurs after all conditions are successful.
  • endpoints (List[str]) - Endpoints to verify access for before yielding. Shorthand for adding CheckEndpoints(endpoints) to the conditions argument.
  • conditions (callable) - A list of callable objects that will be executed before yielding to check for errors
  • env_vars (dict) - A dictionary to update os.environ with during execution
  • wrappers (List[callable]) - A list of context managers to use during execution
Source code in datadog_checks/dev/terraform.py
@contextmanager
def terraform_run(directory, sleep=None, endpoints=None, conditions=None, env_vars=None, wrappers=None):
    """
    A convenient context manager for safely setting up and tearing down Terraform environments.

    - **directory** (_str_) - A path containing Terraform files
    - **sleep** (_float_) - Number of seconds to wait before yielding. This occurs after all conditions are successful.
    - **endpoints** (_List[str]_) - Endpoints to verify access for before yielding. Shorthand for adding
      `CheckEndpoints(endpoints)` to the `conditions` argument.
    - **conditions** (_callable_) - A list of callable objects that will be executed before yielding to
      check for errors
    - **env_vars** (_dict_) - A dictionary to update `os.environ` with during execution
    - **wrappers** (_List[callable]_) - A list of context managers to use during execution
    """
    if not which('terraform'):
        pytest.skip('Terraform not available')

    set_up = TerraformUp(directory)
    tear_down = TerraformDown(directory)

    with environment_run(
        up=set_up,
        down=tear_down,
        sleep=sleep,
        endpoints=endpoints,
        conditions=conditions,
        env_vars=env_vars,
        wrappers=wrappers,
    ) as result:
        yield result

Last update: September 30, 2021