Skip to content

Databases


No matter the database you wish to monitor, the base package provides a standard way to define and collect data from arbitrary queries.

The core premise is that you define a function that accepts a query (usually a str) and it returns a sequence of equal length results.

Interface

All the functionality is exposed by the Query and QueryManager classes.

datadog_checks.base.utils.db.query.Query

This class accepts a single dict argument which is necessary to run the query. The representation is based on our custom_queries format originally designed and implemented in !1528.

It is now part of all our database integrations and other products have since adopted this format.

__init__(self, query_data) special

Source code in
def __init__(self, query_data):
    # type: (Dict[str, Any]) -> Query
    # Contains the data to fill the rest of the attributes
    self.query_data = deepcopy(query_data or {})  # type: Dict[str, Any]
    self.name = None  # type: str
    # The actual query
    self.query = None  # type: str
    # Contains a mapping of column_name -> column_type, transformer
    self.column_transformers = None  # type: Tuple[Tuple[str, Tuple[str, Transformer]]]
    # These transformers are used to collect extra metrics calculated from the query result
    self.extra_transformers = None  # type: List[Tuple[str, Transformer]]
    # Contains the tags defined in query_data, more tags can be added later from the query result
    self.base_tags = None  # type: List[str]

compile(self, column_transformers, extra_transformers)

This idempotent method will be called by QueryManager.compile_queries so you should never need to call it directly.

Source code in
def compile(
    self,
    column_transformers,  # type: Dict[str, TransformerFactory]
    extra_transformers,  # type: Dict[str, TransformerFactory]
):
    # type: (...) -> None

    """
    This idempotent method will be called by `QueryManager.compile_queries` so you
    should never need to call it directly.
    """
    # Check for previous compilation
    if self.name is not None:
        return

    query_name = self.query_data.get('name')
    if not query_name:
        raise ValueError('query field `name` is required')
    elif not isinstance(query_name, str):
        raise ValueError('query field `name` must be a string')

    query = self.query_data.get('query')
    if not query:
        raise ValueError('field `query` for {} is required'.format(query_name))
    elif query_name.startswith('custom query #') and not isinstance(query, str):
        raise ValueError('field `query` for {} must be a string'.format(query_name))

    columns = self.query_data.get('columns')
    if not columns:
        raise ValueError('field `columns` for {} is required'.format(query_name))
    elif not isinstance(columns, list):
        raise ValueError('field `columns` for {} must be a list'.format(query_name))

    tags = self.query_data.get('tags', [])
    if tags is not None and not isinstance(tags, list):
        raise ValueError('field `tags` for {} must be a list'.format(query_name))

    # Keep track of all defined names
    sources = {}

    column_data = []
    for i, column in enumerate(columns, 1):
        # Columns can be ignored via configuration.
        if not column:
            column_data.append((None, None))
            continue
        elif not isinstance(column, dict):
            raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))

        column_name = column.get('name')
        if not column_name:
            raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
        elif not isinstance(column_name, str):
            raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))
        elif column_name in sources:
            raise ValueError(
                'the name {} of {} was already defined in {} #{}'.format(
                    column_name, query_name, sources[column_name]['type'], sources[column_name]['index']
                )
            )

        sources[column_name] = {'type': 'column', 'index': i}

        column_type = column.get('type')
        if not column_type:
            raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
        elif not isinstance(column_type, str):
            raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
        elif column_type == 'source':
            column_data.append((column_name, (None, None)))
            continue
        elif column_type not in column_transformers:
            raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))

        modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}

        try:
            transformer = column_transformers[column_type](column_transformers, column_name, **modifiers)
        except Exception as e:
            error = 'error compiling type `{}` for column {} of {}: {}'.format(
                column_type, column_name, query_name, e
            )

            # Prepend helpful error text.
            #
            # When an exception is raised in the context of another one, both will be printed. To avoid
            # this we set the context to None. https://www.python.org/dev/peps/pep-0409/
            raise_from(type(e)(error), None)
        else:
            if column_type in ('tag', 'tag_list'):
                column_data.append((column_name, (column_type, transformer)))
            else:
                # All these would actually submit data. As that is the default case, we represent it as
                # a reference to None since if we use e.g. `value` it would never be checked anyway.
                column_data.append((column_name, (None, transformer)))

    submission_transformers = column_transformers.copy()  # type: Dict[str, Transformer]
    submission_transformers.pop('tag')
    submission_transformers.pop('tag_list')

    extras = self.query_data.get('extras', [])  # type: List[Dict[str, Any]]
    if not isinstance(extras, list):
        raise ValueError('field `extras` for {} must be a list'.format(query_name))

    extra_data = []  # type: List[Tuple[str, Transformer]]
    for i, extra in enumerate(extras, 1):
        if not isinstance(extra, dict):
            raise ValueError('extra #{} of {} is not a mapping'.format(i, query_name))

        extra_name = extra.get('name')  # type: str
        if not extra_name:
            raise ValueError('field `name` for extra #{} of {} is required'.format(i, query_name))
        elif not isinstance(extra_name, str):
            raise ValueError('field `name` for extra #{} of {} must be a string'.format(i, query_name))
        elif extra_name in sources:
            raise ValueError(
                'the name {} of {} was already defined in {} #{}'.format(
                    extra_name, query_name, sources[extra_name]['type'], sources[extra_name]['index']
                )
            )

        sources[extra_name] = {'type': 'extra', 'index': i}

        extra_type = extra.get('type')  # type: str  # Is the key in a transformers dict
        if not extra_type:
            if 'expression' in extra:
                extra_type = 'expression'
            else:
                raise ValueError('field `type` for extra {} of {} is required'.format(extra_name, query_name))
        elif not isinstance(extra_type, str):
            raise ValueError('field `type` for extra {} of {} must be a string'.format(extra_name, query_name))
        elif extra_type not in extra_transformers and extra_type not in submission_transformers:
            raise ValueError('unknown type `{}` for extra {} of {}'.format(extra_type, extra_name, query_name))

        transformer_factory = extra_transformers.get(
            extra_type, submission_transformers.get(extra_type)
        )  # type: TransformerFactory

        extra_source = extra.get('source')
        if extra_type in submission_transformers:
            if not extra_source:
                raise ValueError('field `source` for extra {} of {} is required'.format(extra_name, query_name))

            modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type', 'source')}
        else:
            modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type')}
            modifiers['sources'] = sources

        try:
            transformer = transformer_factory(submission_transformers, extra_name, **modifiers)
        except Exception as e:
            error = 'error compiling type `{}` for extra {} of {}: {}'.format(extra_type, extra_name, query_name, e)

            raise_from(type(e)(error), None)
        else:
            if extra_type in submission_transformers:
                transformer = create_extra_transformer(transformer, extra_source)

            extra_data.append((extra_name, transformer))

    self.name = query_name
    self.query = query
    self.column_transformers = tuple(column_data)
    self.extra_transformers = tuple(extra_data)
    self.base_tags = tags
    del self.query_data

datadog_checks.base.utils.db.core.QueryManager

This class is in charge of running any number of Query instances for a single Check instance.

You will most often see it created during Check initialization like this:

self._query_manager = QueryManager(
    self,
    self.execute_query,
    queries=[
        queries.SomeQuery1,
        queries.SomeQuery2,
        queries.SomeQuery3,
        queries.SomeQuery4,
        queries.SomeQuery5,
    ],
    tags=self.instance.get('tags', []),
    error_handler=self._error_sanitizer,
)
self.check_initializations.append(self._query_manager.compile_queries)

Note: This class is not in charge of opening or closing connections, just running queries.

__init__(self, check, executor, queries=None, tags=None, error_handler=None, hostname=None) special

  • check (AgentCheck) - an instance of a Check
  • executor (callable) - a callable accepting a str query as its sole argument and returning a sequence representing either the full result set or an iterator over the result set
  • queries (List[Dict]) - a list of queries in dict format
  • tags (List[str]) - a list of tags to associate with every submission
  • error_handler (callable) - a callable accepting a str error as its sole argument and returning a sanitized string, useful for scrubbing potentially sensitive information libraries emit
Source code in
def __init__(
    self,
    check,  # type: AgentCheck
    executor,  # type:  QueryExecutor
    queries=None,  # type: List[Dict[str, Any]]
    tags=None,  # type: List[str]
    error_handler=None,  # type: Callable[[str], str]
    hostname=None,  # type: str
):  # type: (...) -> QueryManager
    """
    - **check** (_AgentCheck_) - an instance of a Check
    - **executor** (_callable_) - a callable accepting a `str` query as its sole argument and returning
      a sequence representing either the full result set or an iterator over the result set
    - **queries** (_List[Dict]_) - a list of queries in dict format
    - **tags** (_List[str]_) - a list of tags to associate with every submission
    - **error_handler** (_callable_) - a callable accepting a `str` error as its sole argument and returning
      a sanitized string, useful for scrubbing potentially sensitive information libraries emit
    """
    self.check = check  # type: AgentCheck
    self.executor = executor  # type:  QueryExecutor
    self.tags = tags or []
    self.error_handler = error_handler
    self.queries = [Query(payload) for payload in queries or []]  # type: List[Query]
    self.hostname = hostname  # type: str
    self.logger = self.check.log

    only_custom_queries = is_affirmative(self.check.instance.get('only_custom_queries', False))  # type: bool
    custom_queries = list(self.check.instance.get('custom_queries', []))  # type: List[str]
    use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True)  # type: str

    # Handle overrides
    if use_global_custom_queries == 'extend':
        custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
    elif (
        not custom_queries
        and 'global_custom_queries' in self.check.init_config
        and is_affirmative(use_global_custom_queries)
    ):
        custom_queries = self.check.init_config.get('global_custom_queries', [])

    # Override statement queries if only running custom queries
    if only_custom_queries:
        self.queries = []

    # Deduplicate
    for i, custom_query in enumerate(iter_unique(custom_queries), 1):
        query = Query(custom_query)
        query.query_data.setdefault('name', 'custom query #{}'.format(i))
        self.queries.append(query)

    if len(self.queries) == 0:
        self.logger.warning('QueryManager initialized with no query')

compile_queries(self)

This method compiles every Query object.

Source code in
def compile_queries(self):
    """This method compiles every `Query` object."""
    column_transformers = COLUMN_TRANSFORMERS.copy()  # type: Dict[str, Transformer]

    for submission_method, transformer_name in SUBMISSION_METHODS.items():
        method = getattr(self.check, submission_method)
        # Save each method in the initializer -> callable format
        column_transformers[transformer_name] = create_submission_transformer(method)

    for query in self.queries:
        query.compile(column_transformers, EXTRA_TRANSFORMERS.copy())

execute(self, extra_tags=None)

This method is what you call every check run.

Source code in
def execute(self, extra_tags=None):
    """This method is what you call every check run."""
    # This needs to stay here b/c when we construct a QueryManager in a check's __init__
    # there is no check ID at that point
    self.logger = self.check.log

    global_tags = list(self.tags)
    if extra_tags:
        global_tags.extend(list(extra_tags))

    for query in self.queries:
        query_name = query.name
        query_columns = query.column_transformers
        extra_transformers = query.extra_transformers
        query_tags = query.base_tags

        try:
            rows = self.execute_query(query.query)
        except Exception as e:
            if self.error_handler:
                self.logger.error('Error querying %s: %s', query_name, self.error_handler(str(e)))
            else:
                self.logger.error('Error querying %s: %s', query_name, e)

            continue

        for row in rows:
            if not self._is_row_valid(query, row):
                continue

            # It holds the query results
            sources = {}  # type: Dict[str, str]
            # It holds the transformers defined in query_columns along with the column value
            submission_queue = []  # type: List[Tuple[Transformer, Any]]
            tags = global_tags + query_tags

            for (column_name, type_transformer), column_value in zip(query_columns, row):
                # Columns can be ignored via configuration
                if not column_name:
                    continue

                sources[column_name] = column_value
                column_type, transformer = type_transformer

                # The transformer can be None for `source` types. Those such columns do not submit
                # anything but are collected into the row values for other columns to reference.
                if transformer is None:
                    continue
                elif column_type == 'tag':
                    tags.append(transformer(None, column_value))  # get_tag transformer
                elif column_type == 'tag_list':
                    tags.extend(transformer(None, column_value))  # get_tag_list transformer
                else:
                    submission_queue.append((transformer, column_value))

            for transformer, value in submission_queue:
                transformer(sources, value, tags=tags, hostname=self.hostname)

            for name, transformer in extra_transformers:
                try:
                    result = transformer(sources, tags=tags, hostname=self.hostname)
                except Exception as e:
                    self.logger.error('Error transforming %s: %s', name, e)
                    continue
                else:
                    if result is not None:
                        sources[name] = result

execute_query(self, query)

Called by execute, this triggers query execution to check for errors immediately in a way that is compatible with any library. If there are no errors, this is guaranteed to return an iterator over the result set.

Source code in
def execute_query(self, query):
    """
    Called by `execute`, this triggers query execution to check for errors immediately in a way that is compatible
    with any library. If there are no errors, this is guaranteed to return an iterator over the result set.
    """
    rows = self.executor(query)
    if rows is None:
        return iter([])
    else:
        rows = iter(rows)

    # Ensure we trigger query execution
    try:
        first_row = next(rows)
    except StopIteration:
        return iter([])

    return chain((first_row,), rows)

Transformers


datadog_checks.base.utils.db.transform.ColumnTransformers

match(transformers, column_name, **modifiers)

This is used for querying unstructured data.

For example, say you want to collect the fields named foo and bar. Typically, they would be stored like:

foo bar
4 2

and would be queried like:

SELECT foo, bar FROM ...

Often, you will instead find data stored in the following format:

metric value
foo 4
bar 2

and would be queried like:

SELECT metric, value FROM ...

In this case, the metric column stores the name with which to match on and its value is stored in a separate column.

The required items modifier is a mapping of matched names to column data values. Consider the values to be exactly the same as the entries in the columns top level field. You must also define a source modifier either for this transformer itself or in the values of items (which will take precedence). The source will be treated as the value of the match.

Say this is your configuration:

query: SELECT source1, source2, metric FROM TABLE
columns:
  - name: value1
    type: source
  - name: value2
    type: source
  - name: metric_name
    type: match
    source: value1
    items:
      foo:
        name: test.foo
        type: gauge
        source: value2
      bar:
        name: test.bar
        type: monotonic_gauge

and the result set is:

source1 source2 metric
1 2 foo
3 4 baz
5 6 bar

Here's what would be submitted:

  • foo - test.foo as a gauge with a value of 2
  • bar - test.bar.total as a gauge and test.bar.count as a monotonic_count, both with a value of 5
  • baz - nothing since it was not defined as a match item
Source code in
def get_match(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    This is used for querying unstructured data.

    For example, say you want to collect the fields named `foo` and `bar`. Typically, they would be stored like:

    | foo | bar |
    | --- | --- |
    | 4   | 2   |

    and would be queried like:

    ```sql
    SELECT foo, bar FROM ...
    ```

    Often, you will instead find data stored in the following format:

    | metric | value |
    | ------ | ----- |
    | foo    | 4     |
    | bar    | 2     |

    and would be queried like:

    ```sql
    SELECT metric, value FROM ...
    ```

    In this case, the `metric` column stores the name with which to match on and its `value` is
    stored in a separate column.

    The required `items` modifier is a mapping of matched names to column data values. Consider the values
    to be exactly the same as the entries in the `columns` top level field. You must also define a `source`
    modifier either for this transformer itself or in the values of `items` (which will take precedence).
    The source will be treated as the value of the match.

    Say this is your configuration:

    ```yaml
    query: SELECT source1, source2, metric FROM TABLE
    columns:
      - name: value1
        type: source
      - name: value2
        type: source
      - name: metric_name
        type: match
        source: value1
        items:
          foo:
            name: test.foo
            type: gauge
            source: value2
          bar:
            name: test.bar
            type: monotonic_gauge
    ```

    and the result set is:

    | source1 | source2 | metric |
    | ------- | ------- | ------ |
    | 1       | 2       | foo    |
    | 3       | 4       | baz    |
    | 5       | 6       | bar    |

    Here's what would be submitted:

    - `foo` - `test.foo` as a `gauge` with a value of `2`
    - `bar` - `test.bar.total` as a `gauge` and `test.bar.count` as a `monotonic_count`, both with a value of `5`
    - `baz` - nothing since it was not defined as a match item
    """
    # Do work in a separate function to avoid having to `del` a bunch of variables
    compiled_items = _compile_match_items(transformers, modifiers)  # type: Dict[str, Tuple[str, Transformer]]

    def match(sources, value, **kwargs):
        # type: (Dict[str, Any], str, Dict[str, Any]) -> None
        if value in compiled_items:
            source, transformer = compiled_items[value]  # type: str, Transformer
            transformer(sources, sources[source], **kwargs)

    return match

monotonic_gauge(transformers, column_name, **modifiers)

Send the result as both a gauge suffixed by .total and a monotonic_count suffixed by .count.

Source code in
def get_monotonic_gauge(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Send the result as both a `gauge` suffixed by `.total` and a `monotonic_count` suffixed by `.count`.
    """
    gauge = transformers['gauge'](transformers, '{}.total'.format(column_name), **modifiers)  # type: Callable
    monotonic_count = transformers['monotonic_count'](
        transformers, '{}.count'.format(column_name), **modifiers
    )  # type: Callable

    def monotonic_gauge(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> None
        gauge(_, value, **kwargs)
        monotonic_count(_, value, **kwargs)

    return monotonic_gauge

service_check(transformers, column_name, **modifiers)

Submit a service check.

The required modifier status_map is a mapping of values to statuses. Valid statuses include:

  • OK
  • WARNING
  • CRITICAL
  • UNKNOWN

Any encountered values that are not defined will be sent as UNKNOWN.

Source code in
def get_service_check(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Submit a service check.

    The required modifier `status_map` is a mapping of values to statuses. Valid statuses include:

    - `OK`
    - `WARNING`
    - `CRITICAL`
    - `UNKNOWN`

    Any encountered values that are not defined will be sent as `UNKNOWN`.
    """
    # Do work in a separate function to avoid having to `del` a bunch of variables
    status_map = _compile_service_check_statuses(modifiers)

    service_check_method = transformers['__service_check'](transformers, column_name, **modifiers)  # type: Callable

    def service_check(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> None
        service_check_method(_, status_map.get(value, ServiceCheck.UNKNOWN), **kwargs)

    return service_check

tag(transformers, column_name, **modifiers)

Convert a column to a tag that will be used in every subsequent submission.

For example, if you named the column env and the column returned the value prod1, all submissions from that row will be tagged by env:prod1.

This also accepts an optional modifier called boolean that when set to true will transform the result to the string true or false. So for example if you named the column alive and the result was the number 0 the tag will be alive:false.

Source code in
def get_tag(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Convert a column to a tag that will be used in every subsequent submission.

    For example, if you named the column `env` and the column returned the value `prod1`, all submissions
    from that row will be tagged by `env:prod1`.

    This also accepts an optional modifier called `boolean` that when set to `true` will transform the result
    to the string `true` or `false`. So for example if you named the column `alive` and the result was the
    number `0` the tag will be `alive:false`.
    """
    template = '{}:{{}}'.format(column_name)
    boolean = is_affirmative(modifiers.pop('boolean', None))

    def tag(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> str
        if boolean:
            value = str(is_affirmative(value)).lower()

        return template.format(value)

    return tag

tag_list(transformers, column_name, **modifiers)

Convert a column to a list of tags that will be used in every submission.

Tag name is determined by column_name. The column value represents a list of values. It is expected to be either a list of strings, or a comma-separated string.

For example, if the column is named server_tag and the column returned the value 'us,primary', then all submissions for that row will be tagged by server_tag:us and server_tag:primary.

Source code in
def get_tag_list(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Convert a column to a list of tags that will be used in every submission.

    Tag name is determined by `column_name`. The column value represents a list of values. It is expected to be either
    a list of strings, or a comma-separated string.

    For example, if the column is named `server_tag` and the column returned the value `'us,primary'`, then all
    submissions for that row will be tagged by `server_tag:us` and `server_tag:primary`.
    """
    template = '%s:{}' % column_name

    def tag_list(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> List[str]
        if isinstance(value, str):
            value = [v.strip() for v in value.split(',')]

        return [template.format(v) for v in value]

    return tag_list

temporal_percent(transformers, column_name, **modifiers)

Send the result as percentage of time since the last check run as a rate.

For example, say the result is a forever increasing counter representing the total time spent pausing for garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent pausing since the previous collection interval it becomes a useful metric.

There is one required parameter called scale that indicates what unit of time the result should be considered. Valid values are:

  • second
  • millisecond
  • microsecond
  • nanosecond

You may also define the unit as an integer number of parts compared to seconds e.g. millisecond is equivalent to 1000.

Source code in
def get_temporal_percent(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Send the result as percentage of time since the last check run as a `rate`.

    For example, say the result is a forever increasing counter representing the total time spent pausing for
    garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent
    pausing since the previous collection interval it becomes a useful metric.

    There is one required parameter called `scale` that indicates what unit of time the result should be considered.
    Valid values are:

    - `second`
    - `millisecond`
    - `microsecond`
    - `nanosecond`

    You may also define the unit as an integer number of parts compared to seconds e.g. `millisecond` is
    equivalent to `1000`.
    """
    scale = modifiers.pop('scale', None)
    if scale is None:
        raise ValueError('the `scale` parameter is required')

    if isinstance(scale, str):
        scale = constants.TIME_UNITS.get(scale.lower())
        if scale is None:
            raise ValueError(
                'the `scale` parameter must be one of: {}'.format(' | '.join(sorted(constants.TIME_UNITS)))
            )
    elif not isinstance(scale, int):
        raise ValueError(
            'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond'
        )

    rate = transformers['rate'](transformers, column_name, **modifiers)  # type: Callable

    def temporal_percent(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> None
        rate(_, total_time_to_temporal_percent(float(value), scale=scale), **kwargs)

    return temporal_percent

time_elapsed(transformers, column_name, **modifiers)

Send the number of seconds elapsed from a time in the past as a gauge.

For example, if the result is an instance of datetime.datetime representing 5 seconds ago, then this would submit with a value of 5.

The optional modifier format indicates what format the result is in. By default it is native, assuming the underlying library provides timestamps as datetime objects.

If the value is a UNIX timestamp you can set the format modifier to unix_time.

If the value is a string representation of a date, you must provide the expected timestamp format using the supported codes.

Examples:

    columns:
  - name: time_since_x
    type: time_elapsed
    format: native  # default value and can be omitted
  - name: time_since_y
    type: time_elapsed
    format: unix_time
  - name: time_since_z
    type: time_elapsed
    format: "%d/%m/%Y %H:%M:%S"

Note

The code %z (lower case) is not supported on Windows.

Source code in
def get_time_elapsed(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Send the number of seconds elapsed from a time in the past as a `gauge`.

    For example, if the result is an instance of
    [datetime.datetime](https://docs.python.org/3/library/datetime.html#datetime.datetime) representing 5 seconds ago,
    then this would submit with a value of `5`.

    The optional modifier `format` indicates what format the result is in. By default it is `native`, assuming the
    underlying library provides timestamps as `datetime` objects.

    If the value is a UNIX timestamp you can set the `format` modifier to `unix_time`.

    If the value is a string representation of a date, you must provide the expected timestamp format using the
    [supported codes](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).

    Example:

    ```yaml
        columns:
      - name: time_since_x
        type: time_elapsed
        format: native  # default value and can be omitted
      - name: time_since_y
        type: time_elapsed
        format: unix_time
      - name: time_since_z
        type: time_elapsed
        format: "%d/%m/%Y %H:%M:%S"
    ```
    !!! note
        The code `%z` (lower case) is not supported on Windows.
    """
    time_format = modifiers.pop('format', 'native')
    if not isinstance(time_format, str):
        raise ValueError('the `format` parameter must be a string')

    gauge = transformers['gauge'](transformers, column_name, **modifiers)

    if time_format == 'native':

        def time_elapsed(_, value, **kwargs):
            # type: (List, str, Dict[str, Any]) -> None
            value = ensure_aware_datetime(value)
            gauge(_, (datetime.now(value.tzinfo) - value).total_seconds(), **kwargs)

    elif time_format == 'unix_time':

        def time_elapsed(_, value, **kwargs):
            gauge(_, time.time() - value, **kwargs)

    else:

        def time_elapsed(_, value, **kwargs):
            # type: (List, str, Dict[str, Any]) -> None
            value = ensure_aware_datetime(datetime.strptime(value, time_format))
            gauge(_, (datetime.now(value.tzinfo) - value).total_seconds(), **kwargs)

    return time_elapsed

datadog_checks.base.utils.db.transform.ExtraTransformers

Every column transformer (except tag) is supported at this level, the only difference being one must set a source to retrieve the desired value.

So for example here:

columns:
  - name: foo.bar
    type: rate
extras:
  - name: foo.current
    type: gauge
    source: foo.bar

the metric foo.current will be sent as a gauge will the value of foo.bar.

expression(transformers, name, **modifiers)

This allows the evaluation of a limited subset of Python syntax and built-in functions.

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: disk.free
    expression: disk.total - disk.used
    submit_type: gauge

For brevity, if the expression attribute exists and type does not then it is assumed the type is expression. The submit_type can be any transformer and any extra options are passed down to it.

The result of every expression is stored, so in lieu of a submit_type the above example could also be written as:

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: free
    expression: disk.total - disk.used
  - name: disk.free
    type: gauge
    source: free

The order matters though, so for example the following will fail:

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: disk.free
    type: gauge
    source: free
  - name: free
    expression: disk.total - disk.used

since the source free does not yet exist.

Source code in
def get_expression(transformers, name, **modifiers):
    # type: (Dict[str, Transformer], str, Dict[str, Any]) -> Transformer
    """
    This allows the evaluation of a limited subset of Python syntax and built-in functions.

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: disk.free
        expression: disk.total - disk.used
        submit_type: gauge
    ```

    For brevity, if the `expression` attribute exists and `type` does not then it is assumed the type is
    `expression`. The `submit_type` can be any transformer and any extra options are passed down to it.

    The result of every expression is stored, so in lieu of a `submit_type` the above example could also be written as:

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: free
        expression: disk.total - disk.used
      - name: disk.free
        type: gauge
        source: free
    ```

    The order matters though, so for example the following will fail:

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: disk.free
        type: gauge
        source: free
      - name: free
        expression: disk.total - disk.used
    ```

    since the source `free` does not yet exist.
    """
    available_sources = modifiers.pop('sources')

    expression = modifiers.pop('expression', None)
    if expression is None:
        raise ValueError('the `expression` parameter is required')
    elif not isinstance(expression, str):
        raise ValueError('the `expression` parameter must be a string')
    elif not expression:
        raise ValueError('the `expression` parameter must not be empty')

    if not modifiers.pop('verbose', False):
        # Sort the sources in reverse order of length to prevent greedy matching
        available_sources = sorted(available_sources, key=lambda s: -len(s))

        # Escape special characters, mostly for the possible dots in metric names
        available_sources = list(map(re.escape, available_sources))

        # Finally, utilize the order by relying on the guarantees provided by the alternation operator
        available_sources = '|'.join(available_sources)

        expression = re.sub(
            SOURCE_PATTERN.format(available_sources),
            # Replace by the particular source that matched
            lambda match_obj: 'SOURCES["{}"]'.format(match_obj.group(1)),
            expression,
        )

    expression = compile(expression, filename=name, mode='eval')

    del available_sources

    if 'submit_type' in modifiers:
        if modifiers['submit_type'] not in transformers:
            raise ValueError('unknown submit_type `{}`'.format(modifiers['submit_type']))

        submit_method = transformers[modifiers.pop('submit_type')](transformers, name, **modifiers)  # type: Transformer
        submit_method = create_extra_transformer(submit_method)  # type: Callable

        def execute_expression(sources, **kwargs):
            # type: (Dict[str, Any], Dict[str, Any]) -> float
            result = eval(expression, ALLOWED_GLOBALS, {'SOURCES': sources})  # type: float
            submit_method(sources, result, **kwargs)
            return result

    else:

        def execute_expression(sources, **kwargs):
            # type: (Dict[str, Any], Dict[str, Any]) -> Any
            return eval(expression, ALLOWED_GLOBALS, {'SOURCES': sources})

    return execute_expression

percent(transformers, name, **modifiers)

Send a percentage based on 2 sources as a gauge.

The required modifiers are part and total.

For example, if you have this configuration:

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: disk.utilized
    type: percent
    part: disk.used
    total: disk.total

then the extra metric disk.utilized would be sent as a gauge calculated as disk.used / disk.total * 100.

If the source of total is 0, then the submitted value will always be sent as 0 too.

Source code in
def get_percent(transformers, name, **modifiers):
    # type: (Dict[str, Callable], str, Any) -> Transformer
    """
    Send a percentage based on 2 sources as a `gauge`.

    The required modifiers are `part` and `total`.

    For example, if you have this configuration:

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: disk.utilized
        type: percent
        part: disk.used
        total: disk.total
    ```

    then the extra metric `disk.utilized` would be sent as a `gauge` calculated as `disk.used / disk.total * 100`.

    If the source of `total` is `0`, then the submitted value will always be sent as `0` too.
    """
    available_sources = modifiers.pop('sources')

    part = modifiers.pop('part', None)
    if part is None:
        raise ValueError('the `part` parameter is required')
    elif not isinstance(part, str):
        raise ValueError('the `part` parameter must be a string')
    elif part not in available_sources:
        raise ValueError('the `part` parameter `{}` is not an available source'.format(part))

    total = modifiers.pop('total', None)
    if total is None:
        raise ValueError('the `total` parameter is required')
    elif not isinstance(total, str):
        raise ValueError('the `total` parameter must be a string')
    elif total not in available_sources:
        raise ValueError('the `total` parameter `{}` is not an available source'.format(total))

    del available_sources
    gauge = transformers['gauge'](transformers, name, **modifiers)
    gauge = create_extra_transformer(gauge)

    def percent(sources, **kwargs):
        gauge(sources, compute_percent(sources[part], sources[total]), **kwargs)

    return percent

Last update: May 15, 2020