Skip to content

Databases


No matter the database you wish to monitor, the base package provides a standard way to define and collect data from arbitrary queries.

The core premise is that you define a function that accepts a query (usually a str) and it returns a sequence of equal length results.

Interface

All the functionality is exposed by the Query and QueryManager classes.

datadog_checks.base.utils.db.query.Query

This class accepts a single dict argument which is necessary to run the query. The representation is based on our custom_queries format originally designed and implemented in #1528.

It is now part of all our database integrations and other products have since adopted this format.

Source code in datadog_checks_base/datadog_checks/base/utils/db/query.py
class Query(object):
    """
    This class accepts a single `dict` argument which is necessary to run the query. The representation
    is based on our `custom_queries` format originally designed and implemented in !1528.

    It is now part of all our database integrations and
    [other](https://cloud.google.com/solutions/sap/docs/sap-hana-monitoring-agent-planning-guide#defining_custom_queries)
    products have since adopted this format.
    """

    def __init__(self, query_data):
        '''
        Parameters:
            query_data (Dict[str, Any]): The query data to run the query. It should contain the following fields:
                - name (str): The name of the query.
                - query (str): The query to run.
                - columns (List[Dict[str, Any]]): Each column should contain the following fields:
                    - name (str): The name of the column.
                    - type (str): The type of the column.
                    - (Optional) Any other field that the column transformer for the type requires.
                - (Optional) extras (List[Dict[str, Any]]): Each extra transformer should contain the following fields:
                    - name (str): The name of the extra transformer.
                    - type (str): The type of the extra transformer.
                    - (Optional) Any other field that the extra transformer for the type requires.
                - (Optional) tags (List[str]): The tags to add to the query result.
                - (Optional) collection_interval (int): The collection interval (in seconds) of the query.
                    Note:
                        If collection_interval is None, the query will be run every check run.
                        If the collection interval is less than check collection interval,
                        the query will be run every check run.
                        If the collection interval is greater than check collection interval,
                        the query will NOT BE RUN exactly at the collection interval.
                        The query will be run at the next check run after the collection interval has passed.
                - (Optional) metric_prefix (str): The prefix to add to the metric name.
                    Note: If the metric prefix is None, the default metric prefix `<INTEGRATION>.` will be used.
        '''
        # Contains the data to fill the rest of the attributes
        self.query_data = deepcopy(query_data or {})  # type: Dict[str, Any]
        self.name = None  # type: str
        # The actual query
        self.query = None  # type: str
        # Contains a mapping of column_name -> column_type, transformer
        self.column_transformers = None  # type: Tuple[Tuple[str, Tuple[str, Transformer]]]
        # These transformers are used to collect extra metrics calculated from the query result
        self.extra_transformers = None  # type: List[Tuple[str, Transformer]]
        # Contains the tags defined in query_data, more tags can be added later from the query result
        self.base_tags = None  # type: List[str]
        # The collecton interval (in seconds) of the query. If None, the query will be run every check run.
        self.collection_interval = None  # type: int
        # The last time the query was executed. If None, the query has never been executed.
        # This is only used when the collection_interval is not None.
        self.__last_execution_time = None  # type: float
        # whether to ignore any defined namespace prefix. True when `metric_prefix` is defined.
        self.metric_name_raw = False  # type: bool

    def compile(
        self,
        column_transformers,  # type: Dict[str, TransformerFactory]
        extra_transformers,  # type: Dict[str, TransformerFactory]
    ):
        # type: (...) -> None

        """
        This idempotent method will be called by `QueryManager.compile_queries` so you
        should never need to call it directly.
        """
        # Check for previous compilation
        if self.name is not None:
            return

        query_name = self.query_data.get('name')
        if not query_name:
            raise ValueError('query field `name` is required')
        elif not isinstance(query_name, str):
            raise ValueError('query field `name` must be a string')

        metric_prefix = self.query_data.get('metric_prefix')
        if metric_prefix is not None:
            if not isinstance(metric_prefix, str):
                raise ValueError('field `metric_prefix` for {} must be a string'.format(query_name))
            elif not metric_prefix:
                raise ValueError('field `metric_prefix` for {} must not be empty'.format(query_name))

        query = self.query_data.get('query')
        if not query:
            raise ValueError('field `query` for {} is required'.format(query_name))
        elif query_name.startswith('custom query #') and not isinstance(query, str):
            raise ValueError('field `query` for {} must be a string'.format(query_name))

        columns = self.query_data.get('columns')
        if not columns:
            raise ValueError('field `columns` for {} is required'.format(query_name))
        elif not isinstance(columns, list):
            raise ValueError('field `columns` for {} must be a list'.format(query_name))

        tags = self.query_data.get('tags', [])
        if tags is not None and not isinstance(tags, list):
            raise ValueError('field `tags` for {} must be a list'.format(query_name))

        # Keep track of all defined names
        sources = {}

        column_data = []
        for i, column in enumerate(columns, 1):
            # Columns can be ignored via configuration.
            if not column:
                column_data.append((None, None))
                continue
            elif not isinstance(column, dict):
                raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))

            column_name = column.get('name')
            if not column_name:
                raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
            elif not isinstance(column_name, str):
                raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))
            elif column_name in sources:
                raise ValueError(
                    'the name {} of {} was already defined in {} #{}'.format(
                        column_name, query_name, sources[column_name]['type'], sources[column_name]['index']
                    )
                )

            sources[column_name] = {'type': 'column', 'index': i}

            column_type = column.get('type')
            if not column_type:
                raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
            elif not isinstance(column_type, str):
                raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
            elif column_type == 'source':
                column_data.append((column_name, (None, None)))
                continue
            elif column_type not in column_transformers:
                raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))

            __column_type_is_tag = column_type in ('tag', 'tag_list', 'tag_not_null')
            modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}

            try:
                if not __column_type_is_tag and metric_prefix:
                    # if metric_prefix is defined, we prepend it to the column name
                    column_name = "{}.{}".format(metric_prefix, column_name)
                transformer = column_transformers[column_type](column_transformers, column_name, **modifiers)
            except Exception as e:
                error = 'error compiling type `{}` for column {} of {}: {}'.format(
                    column_type, column_name, query_name, e
                )

                # Prepend helpful error text.
                #
                # When an exception is raised in the context of another one, both will be printed. To avoid
                # this we set the context to None. https://www.python.org/dev/peps/pep-0409/
                raise_from(type(e)(error), None)
            else:
                if __column_type_is_tag:
                    column_data.append((column_name, (column_type, transformer)))
                else:
                    # All these would actually submit data. As that is the default case, we represent it as
                    # a reference to None since if we use e.g. `value` it would never be checked anyway.
                    column_data.append((column_name, (None, transformer)))

        submission_transformers = column_transformers.copy()  # type: Dict[str, Transformer]
        submission_transformers.pop('tag')
        submission_transformers.pop('tag_list')
        submission_transformers.pop('tag_not_null')

        extras = self.query_data.get('extras', [])  # type: List[Dict[str, Any]]
        if not isinstance(extras, list):
            raise ValueError('field `extras` for {} must be a list'.format(query_name))

        extra_data = []  # type: List[Tuple[str, Transformer]]
        for i, extra in enumerate(extras, 1):
            if not isinstance(extra, dict):
                raise ValueError('extra #{} of {} is not a mapping'.format(i, query_name))

            extra_name = extra.get('name')  # type: str
            if not extra_name:
                raise ValueError('field `name` for extra #{} of {} is required'.format(i, query_name))
            elif not isinstance(extra_name, str):
                raise ValueError('field `name` for extra #{} of {} must be a string'.format(i, query_name))
            elif extra_name in sources:
                raise ValueError(
                    'the name {} of {} was already defined in {} #{}'.format(
                        extra_name, query_name, sources[extra_name]['type'], sources[extra_name]['index']
                    )
                )

            sources[extra_name] = {'type': 'extra', 'index': i}

            extra_type = extra.get('type')  # type: str  # Is the key in a transformers dict
            if not extra_type:
                if 'expression' in extra:
                    extra_type = 'expression'
                else:
                    raise ValueError('field `type` for extra {} of {} is required'.format(extra_name, query_name))
            elif not isinstance(extra_type, str):
                raise ValueError('field `type` for extra {} of {} must be a string'.format(extra_name, query_name))
            elif extra_type not in extra_transformers and extra_type not in submission_transformers:
                raise ValueError('unknown type `{}` for extra {} of {}'.format(extra_type, extra_name, query_name))

            transformer_factory = extra_transformers.get(
                extra_type, submission_transformers.get(extra_type)
            )  # type: TransformerFactory

            extra_source = extra.get('source')
            if extra_type in submission_transformers:
                if not extra_source:
                    raise ValueError('field `source` for extra {} of {} is required'.format(extra_name, query_name))

                modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type', 'source')}
            else:
                modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type')}
                modifiers['sources'] = sources

            try:
                transformer = transformer_factory(submission_transformers, extra_name, **modifiers)
            except Exception as e:
                error = 'error compiling type `{}` for extra {} of {}: {}'.format(extra_type, extra_name, query_name, e)

                raise_from(type(e)(error), None)
            else:
                if extra_type in submission_transformers:
                    transformer = create_extra_transformer(transformer, extra_source)

                extra_data.append((extra_name, transformer))

        collection_interval = self.query_data.get('collection_interval')
        if collection_interval is not None:
            if not isinstance(collection_interval, (int, float)):
                raise ValueError('field `collection_interval` for {} must be a number'.format(query_name))
            elif int(collection_interval) <= 0:
                raise ValueError(
                    'field `collection_interval` for {} must be a positive number after rounding'.format(query_name)
                )
            collection_interval = int(collection_interval)

        self.name = query_name
        self.query = query
        self.column_transformers = tuple(column_data)
        self.extra_transformers = tuple(extra_data)
        self.base_tags = tags
        self.collection_interval = collection_interval
        self.metric_name_raw = metric_prefix is not None
        del self.query_data

    def should_execute(self):
        '''
        Check if the query should be executed based on the collection interval.

        :return: True if the query should be executed, False otherwise.
        '''
        if self.collection_interval is None:
            # if the collection interval is None, the query should always be executed.
            return True

        now = get_timestamp()
        if self.__last_execution_time is None or now - self.__last_execution_time >= self.collection_interval:
            # if the last execution time is None (the query has never been executed),
            # if the time since the last execution is greater than or equal to the collection interval,
            # the query should be executed.
            self.__last_execution_time = now
            return True

        return False

__init__(query_data)

Parameters:

Name Type Description Default
query_data Dict[str, Any]

The query data to run the query. It should contain the following fields: - name (str): The name of the query. - query (str): The query to run. - columns (List[Dict[str, Any]]): Each column should contain the following fields: - name (str): The name of the column. - type (str): The type of the column. - (Optional) Any other field that the column transformer for the type requires. - (Optional) extras (List[Dict[str, Any]]): Each extra transformer should contain the following fields: - name (str): The name of the extra transformer. - type (str): The type of the extra transformer. - (Optional) Any other field that the extra transformer for the type requires. - (Optional) tags (List[str]): The tags to add to the query result. - (Optional) collection_interval (int): The collection interval (in seconds) of the query. Note: If collection_interval is None, the query will be run every check run. If the collection interval is less than check collection interval, the query will be run every check run. If the collection interval is greater than check collection interval, the query will NOT BE RUN exactly at the collection interval. The query will be run at the next check run after the collection interval has passed. - (Optional) metric_prefix (str): The prefix to add to the metric name. Note: If the metric prefix is None, the default metric prefix <INTEGRATION>. will be used.

required
Source code in datadog_checks_base/datadog_checks/base/utils/db/query.py
def __init__(self, query_data):
    '''
    Parameters:
        query_data (Dict[str, Any]): The query data to run the query. It should contain the following fields:
            - name (str): The name of the query.
            - query (str): The query to run.
            - columns (List[Dict[str, Any]]): Each column should contain the following fields:
                - name (str): The name of the column.
                - type (str): The type of the column.
                - (Optional) Any other field that the column transformer for the type requires.
            - (Optional) extras (List[Dict[str, Any]]): Each extra transformer should contain the following fields:
                - name (str): The name of the extra transformer.
                - type (str): The type of the extra transformer.
                - (Optional) Any other field that the extra transformer for the type requires.
            - (Optional) tags (List[str]): The tags to add to the query result.
            - (Optional) collection_interval (int): The collection interval (in seconds) of the query.
                Note:
                    If collection_interval is None, the query will be run every check run.
                    If the collection interval is less than check collection interval,
                    the query will be run every check run.
                    If the collection interval is greater than check collection interval,
                    the query will NOT BE RUN exactly at the collection interval.
                    The query will be run at the next check run after the collection interval has passed.
            - (Optional) metric_prefix (str): The prefix to add to the metric name.
                Note: If the metric prefix is None, the default metric prefix `<INTEGRATION>.` will be used.
    '''
    # Contains the data to fill the rest of the attributes
    self.query_data = deepcopy(query_data or {})  # type: Dict[str, Any]
    self.name = None  # type: str
    # The actual query
    self.query = None  # type: str
    # Contains a mapping of column_name -> column_type, transformer
    self.column_transformers = None  # type: Tuple[Tuple[str, Tuple[str, Transformer]]]
    # These transformers are used to collect extra metrics calculated from the query result
    self.extra_transformers = None  # type: List[Tuple[str, Transformer]]
    # Contains the tags defined in query_data, more tags can be added later from the query result
    self.base_tags = None  # type: List[str]
    # The collecton interval (in seconds) of the query. If None, the query will be run every check run.
    self.collection_interval = None  # type: int
    # The last time the query was executed. If None, the query has never been executed.
    # This is only used when the collection_interval is not None.
    self.__last_execution_time = None  # type: float
    # whether to ignore any defined namespace prefix. True when `metric_prefix` is defined.
    self.metric_name_raw = False  # type: bool

compile(column_transformers, extra_transformers)

This idempotent method will be called by QueryManager.compile_queries so you should never need to call it directly.

Source code in datadog_checks_base/datadog_checks/base/utils/db/query.py
def compile(
    self,
    column_transformers,  # type: Dict[str, TransformerFactory]
    extra_transformers,  # type: Dict[str, TransformerFactory]
):
    # type: (...) -> None

    """
    This idempotent method will be called by `QueryManager.compile_queries` so you
    should never need to call it directly.
    """
    # Check for previous compilation
    if self.name is not None:
        return

    query_name = self.query_data.get('name')
    if not query_name:
        raise ValueError('query field `name` is required')
    elif not isinstance(query_name, str):
        raise ValueError('query field `name` must be a string')

    metric_prefix = self.query_data.get('metric_prefix')
    if metric_prefix is not None:
        if not isinstance(metric_prefix, str):
            raise ValueError('field `metric_prefix` for {} must be a string'.format(query_name))
        elif not metric_prefix:
            raise ValueError('field `metric_prefix` for {} must not be empty'.format(query_name))

    query = self.query_data.get('query')
    if not query:
        raise ValueError('field `query` for {} is required'.format(query_name))
    elif query_name.startswith('custom query #') and not isinstance(query, str):
        raise ValueError('field `query` for {} must be a string'.format(query_name))

    columns = self.query_data.get('columns')
    if not columns:
        raise ValueError('field `columns` for {} is required'.format(query_name))
    elif not isinstance(columns, list):
        raise ValueError('field `columns` for {} must be a list'.format(query_name))

    tags = self.query_data.get('tags', [])
    if tags is not None and not isinstance(tags, list):
        raise ValueError('field `tags` for {} must be a list'.format(query_name))

    # Keep track of all defined names
    sources = {}

    column_data = []
    for i, column in enumerate(columns, 1):
        # Columns can be ignored via configuration.
        if not column:
            column_data.append((None, None))
            continue
        elif not isinstance(column, dict):
            raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))

        column_name = column.get('name')
        if not column_name:
            raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
        elif not isinstance(column_name, str):
            raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))
        elif column_name in sources:
            raise ValueError(
                'the name {} of {} was already defined in {} #{}'.format(
                    column_name, query_name, sources[column_name]['type'], sources[column_name]['index']
                )
            )

        sources[column_name] = {'type': 'column', 'index': i}

        column_type = column.get('type')
        if not column_type:
            raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
        elif not isinstance(column_type, str):
            raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
        elif column_type == 'source':
            column_data.append((column_name, (None, None)))
            continue
        elif column_type not in column_transformers:
            raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))

        __column_type_is_tag = column_type in ('tag', 'tag_list', 'tag_not_null')
        modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}

        try:
            if not __column_type_is_tag and metric_prefix:
                # if metric_prefix is defined, we prepend it to the column name
                column_name = "{}.{}".format(metric_prefix, column_name)
            transformer = column_transformers[column_type](column_transformers, column_name, **modifiers)
        except Exception as e:
            error = 'error compiling type `{}` for column {} of {}: {}'.format(
                column_type, column_name, query_name, e
            )

            # Prepend helpful error text.
            #
            # When an exception is raised in the context of another one, both will be printed. To avoid
            # this we set the context to None. https://www.python.org/dev/peps/pep-0409/
            raise_from(type(e)(error), None)
        else:
            if __column_type_is_tag:
                column_data.append((column_name, (column_type, transformer)))
            else:
                # All these would actually submit data. As that is the default case, we represent it as
                # a reference to None since if we use e.g. `value` it would never be checked anyway.
                column_data.append((column_name, (None, transformer)))

    submission_transformers = column_transformers.copy()  # type: Dict[str, Transformer]
    submission_transformers.pop('tag')
    submission_transformers.pop('tag_list')
    submission_transformers.pop('tag_not_null')

    extras = self.query_data.get('extras', [])  # type: List[Dict[str, Any]]
    if not isinstance(extras, list):
        raise ValueError('field `extras` for {} must be a list'.format(query_name))

    extra_data = []  # type: List[Tuple[str, Transformer]]
    for i, extra in enumerate(extras, 1):
        if not isinstance(extra, dict):
            raise ValueError('extra #{} of {} is not a mapping'.format(i, query_name))

        extra_name = extra.get('name')  # type: str
        if not extra_name:
            raise ValueError('field `name` for extra #{} of {} is required'.format(i, query_name))
        elif not isinstance(extra_name, str):
            raise ValueError('field `name` for extra #{} of {} must be a string'.format(i, query_name))
        elif extra_name in sources:
            raise ValueError(
                'the name {} of {} was already defined in {} #{}'.format(
                    extra_name, query_name, sources[extra_name]['type'], sources[extra_name]['index']
                )
            )

        sources[extra_name] = {'type': 'extra', 'index': i}

        extra_type = extra.get('type')  # type: str  # Is the key in a transformers dict
        if not extra_type:
            if 'expression' in extra:
                extra_type = 'expression'
            else:
                raise ValueError('field `type` for extra {} of {} is required'.format(extra_name, query_name))
        elif not isinstance(extra_type, str):
            raise ValueError('field `type` for extra {} of {} must be a string'.format(extra_name, query_name))
        elif extra_type not in extra_transformers and extra_type not in submission_transformers:
            raise ValueError('unknown type `{}` for extra {} of {}'.format(extra_type, extra_name, query_name))

        transformer_factory = extra_transformers.get(
            extra_type, submission_transformers.get(extra_type)
        )  # type: TransformerFactory

        extra_source = extra.get('source')
        if extra_type in submission_transformers:
            if not extra_source:
                raise ValueError('field `source` for extra {} of {} is required'.format(extra_name, query_name))

            modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type', 'source')}
        else:
            modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type')}
            modifiers['sources'] = sources

        try:
            transformer = transformer_factory(submission_transformers, extra_name, **modifiers)
        except Exception as e:
            error = 'error compiling type `{}` for extra {} of {}: {}'.format(extra_type, extra_name, query_name, e)

            raise_from(type(e)(error), None)
        else:
            if extra_type in submission_transformers:
                transformer = create_extra_transformer(transformer, extra_source)

            extra_data.append((extra_name, transformer))

    collection_interval = self.query_data.get('collection_interval')
    if collection_interval is not None:
        if not isinstance(collection_interval, (int, float)):
            raise ValueError('field `collection_interval` for {} must be a number'.format(query_name))
        elif int(collection_interval) <= 0:
            raise ValueError(
                'field `collection_interval` for {} must be a positive number after rounding'.format(query_name)
            )
        collection_interval = int(collection_interval)

    self.name = query_name
    self.query = query
    self.column_transformers = tuple(column_data)
    self.extra_transformers = tuple(extra_data)
    self.base_tags = tags
    self.collection_interval = collection_interval
    self.metric_name_raw = metric_prefix is not None
    del self.query_data

datadog_checks.base.utils.db.core.QueryManager

This class is in charge of running any number of Query instances for a single Check instance.

You will most often see it created during Check initialization like this:

self._query_manager = QueryManager(
    self,
    self.execute_query,
    queries=[
        queries.SomeQuery1,
        queries.SomeQuery2,
        queries.SomeQuery3,
        queries.SomeQuery4,
        queries.SomeQuery5,
    ],
    tags=self.instance.get('tags', []),
    error_handler=self._error_sanitizer,
)
self.check_initializations.append(self._query_manager.compile_queries)

Note: This class is not in charge of opening or closing connections, just running queries.

Source code in datadog_checks_base/datadog_checks/base/utils/db/core.py
class QueryManager(QueryExecutor):
    """
    This class is in charge of running any number of `Query` instances for a single Check instance.

    You will most often see it created during Check initialization like this:

    ```python
    self._query_manager = QueryManager(
        self,
        self.execute_query,
        queries=[
            queries.SomeQuery1,
            queries.SomeQuery2,
            queries.SomeQuery3,
            queries.SomeQuery4,
            queries.SomeQuery5,
        ],
        tags=self.instance.get('tags', []),
        error_handler=self._error_sanitizer,
    )
    self.check_initializations.append(self._query_manager.compile_queries)
    ```

    Note: This class is not in charge of opening or closing connections, just running queries.
    """

    def __init__(
        self,
        check,  # type: AgentCheck
        executor,  # type:  QueriesExecutor
        queries=None,  # type: List[Dict[str, Any]]
        tags=None,  # type: List[str]
        error_handler=None,  # type: Callable[[str], str]
        hostname=None,  # type: str
    ):  # type: (...) -> QueryManager
        """
        - **check** (_AgentCheck_) - an instance of a Check
        - **executor** (_callable_) - a callable accepting a `str` query as its sole argument and returning
          a sequence representing either the full result set or an iterator over the result set
        - **queries** (_List[Dict]_) - a list of queries in dict format
        - **tags** (_List[str]_) - a list of tags to associate with every submission
        - **error_handler** (_callable_) - a callable accepting a `str` error as its sole argument and returning
          a sanitized string, useful for scrubbing potentially sensitive information libraries emit
        """
        super(QueryManager, self).__init__(
            executor=executor,
            submitter=check,
            queries=queries,
            tags=tags,
            error_handler=error_handler,
            hostname=hostname,
            logger=check.log,
        )
        self.check = check  # type: AgentCheck

        only_custom_queries = is_affirmative(self.check.instance.get('only_custom_queries', False))  # type: bool
        custom_queries = list(self.check.instance.get('custom_queries', []))  # type: List[str]
        use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True)  # type: str

        # Handle overrides
        if use_global_custom_queries == 'extend':
            custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
        elif (
            not custom_queries
            and 'global_custom_queries' in self.check.init_config
            and is_affirmative(use_global_custom_queries)
        ):
            custom_queries = self.check.init_config.get('global_custom_queries', [])

        # Override statement queries if only running custom queries
        if only_custom_queries:
            self.queries = []

        # Deduplicate
        for i, custom_query in enumerate(iter_unique(custom_queries), 1):
            query = Query(custom_query)
            query.query_data.setdefault('name', 'custom query #{}'.format(i))
            self.queries.append(query)

        if len(self.queries) == 0:
            self.logger.warning('QueryManager initialized with no query')

    def execute(self, extra_tags=None):
        # This needs to stay here b/c when we construct a QueryManager in a check's __init__
        # there is no check ID at that point
        self.logger = self.check.log

        return super(QueryManager, self).execute(extra_tags)

__init__(check, executor, queries=None, tags=None, error_handler=None, hostname=None)

  • check (AgentCheck) - an instance of a Check
  • executor (callable) - a callable accepting a str query as its sole argument and returning a sequence representing either the full result set or an iterator over the result set
  • queries (List[Dict]) - a list of queries in dict format
  • tags (List[str]) - a list of tags to associate with every submission
  • error_handler (callable) - a callable accepting a str error as its sole argument and returning a sanitized string, useful for scrubbing potentially sensitive information libraries emit
Source code in datadog_checks_base/datadog_checks/base/utils/db/core.py
def __init__(
    self,
    check,  # type: AgentCheck
    executor,  # type:  QueriesExecutor
    queries=None,  # type: List[Dict[str, Any]]
    tags=None,  # type: List[str]
    error_handler=None,  # type: Callable[[str], str]
    hostname=None,  # type: str
):  # type: (...) -> QueryManager
    """
    - **check** (_AgentCheck_) - an instance of a Check
    - **executor** (_callable_) - a callable accepting a `str` query as its sole argument and returning
      a sequence representing either the full result set or an iterator over the result set
    - **queries** (_List[Dict]_) - a list of queries in dict format
    - **tags** (_List[str]_) - a list of tags to associate with every submission
    - **error_handler** (_callable_) - a callable accepting a `str` error as its sole argument and returning
      a sanitized string, useful for scrubbing potentially sensitive information libraries emit
    """
    super(QueryManager, self).__init__(
        executor=executor,
        submitter=check,
        queries=queries,
        tags=tags,
        error_handler=error_handler,
        hostname=hostname,
        logger=check.log,
    )
    self.check = check  # type: AgentCheck

    only_custom_queries = is_affirmative(self.check.instance.get('only_custom_queries', False))  # type: bool
    custom_queries = list(self.check.instance.get('custom_queries', []))  # type: List[str]
    use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True)  # type: str

    # Handle overrides
    if use_global_custom_queries == 'extend':
        custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
    elif (
        not custom_queries
        and 'global_custom_queries' in self.check.init_config
        and is_affirmative(use_global_custom_queries)
    ):
        custom_queries = self.check.init_config.get('global_custom_queries', [])

    # Override statement queries if only running custom queries
    if only_custom_queries:
        self.queries = []

    # Deduplicate
    for i, custom_query in enumerate(iter_unique(custom_queries), 1):
        query = Query(custom_query)
        query.query_data.setdefault('name', 'custom query #{}'.format(i))
        self.queries.append(query)

    if len(self.queries) == 0:
        self.logger.warning('QueryManager initialized with no query')

execute(extra_tags=None)

Source code in datadog_checks_base/datadog_checks/base/utils/db/core.py
def execute(self, extra_tags=None):
    # This needs to stay here b/c when we construct a QueryManager in a check's __init__
    # there is no check ID at that point
    self.logger = self.check.log

    return super(QueryManager, self).execute(extra_tags)

Transformers


Column

match

This is used for querying unstructured data.

For example, say you want to collect the fields named foo and bar. Typically, they would be stored like:

foo bar
4 2

and would be queried like:

SELECT foo, bar FROM ...

Often, you will instead find data stored in the following format:

metric value
foo 4
bar 2

and would be queried like:

SELECT metric, value FROM ...

In this case, the metric column stores the name with which to match on and its value is stored in a separate column.

The required items modifier is a mapping of matched names to column data values. Consider the values to be exactly the same as the entries in the columns top level field. You must also define a source modifier either for this transformer itself or in the values of items (which will take precedence). The source will be treated as the value of the match.

Say this is your configuration:

query: SELECT source1, source2, metric FROM TABLE
columns:
  - name: value1
    type: source
  - name: value2
    type: source
  - name: metric_name
    type: match
    source: value1
    items:
      foo:
        name: test.foo
        type: gauge
        source: value2
      bar:
        name: test.bar
        type: monotonic_gauge

and the result set is:

source1 source2 metric
1 2 foo
3 4 baz
5 6 bar

Here's what would be submitted:

  • foo - test.foo as a gauge with a value of 2
  • bar - test.bar.total as a gauge and test.bar.count as a monotonic_count, both with a value of 5
  • baz - nothing since it was not defined as a match item
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_match(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    This is used for querying unstructured data.

    For example, say you want to collect the fields named `foo` and `bar`. Typically, they would be stored like:

    | foo | bar |
    | --- | --- |
    | 4   | 2   |

    and would be queried like:

    ```sql
    SELECT foo, bar FROM ...
    ```

    Often, you will instead find data stored in the following format:

    | metric | value |
    | ------ | ----- |
    | foo    | 4     |
    | bar    | 2     |

    and would be queried like:

    ```sql
    SELECT metric, value FROM ...
    ```

    In this case, the `metric` column stores the name with which to match on and its `value` is
    stored in a separate column.

    The required `items` modifier is a mapping of matched names to column data values. Consider the values
    to be exactly the same as the entries in the `columns` top level field. You must also define a `source`
    modifier either for this transformer itself or in the values of `items` (which will take precedence).
    The source will be treated as the value of the match.

    Say this is your configuration:

    ```yaml
    query: SELECT source1, source2, metric FROM TABLE
    columns:
      - name: value1
        type: source
      - name: value2
        type: source
      - name: metric_name
        type: match
        source: value1
        items:
          foo:
            name: test.foo
            type: gauge
            source: value2
          bar:
            name: test.bar
            type: monotonic_gauge
    ```

    and the result set is:

    | source1 | source2 | metric |
    | ------- | ------- | ------ |
    | 1       | 2       | foo    |
    | 3       | 4       | baz    |
    | 5       | 6       | bar    |

    Here's what would be submitted:

    - `foo` - `test.foo` as a `gauge` with a value of `2`
    - `bar` - `test.bar.total` as a `gauge` and `test.bar.count` as a `monotonic_count`, both with a value of `5`
    - `baz` - nothing since it was not defined as a match item
    """
    # Do work in a separate function to avoid having to `del` a bunch of variables
    compiled_items = _compile_match_items(transformers, modifiers)  # type: Dict[str, Tuple[str, Transformer]]

    def match(sources, value, **kwargs):
        # type: (Dict[str, Any], str, Dict[str, Any]) -> None
        if value in compiled_items:
            source, transformer = compiled_items[value]  # type: str, Transformer
            transformer(sources, sources[source], **kwargs)

    return match

temporal_percent

Send the result as percentage of time since the last check run as a rate.

For example, say the result is a forever increasing counter representing the total time spent pausing for garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent pausing since the previous collection interval it becomes a useful metric.

There is one required parameter called scale that indicates what unit of time the result should be considered. Valid values are:

  • second
  • millisecond
  • microsecond
  • nanosecond

You may also define the unit as an integer number of parts compared to seconds e.g. millisecond is equivalent to 1000.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_temporal_percent(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Send the result as percentage of time since the last check run as a `rate`.

    For example, say the result is a forever increasing counter representing the total time spent pausing for
    garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent
    pausing since the previous collection interval it becomes a useful metric.

    There is one required parameter called `scale` that indicates what unit of time the result should be considered.
    Valid values are:

    - `second`
    - `millisecond`
    - `microsecond`
    - `nanosecond`

    You may also define the unit as an integer number of parts compared to seconds e.g. `millisecond` is
    equivalent to `1000`.
    """
    scale = modifiers.pop('scale', None)
    if scale is None:
        raise ValueError('the `scale` parameter is required')

    if isinstance(scale, str):
        scale = constants.TIME_UNITS.get(scale.lower())
        if scale is None:
            raise ValueError(
                'the `scale` parameter must be one of: {}'.format(' | '.join(sorted(constants.TIME_UNITS)))
            )
    elif not isinstance(scale, int):
        raise ValueError(
            'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond'
        )

    rate = transformers['rate'](transformers, column_name, **modifiers)  # type: Callable

    def temporal_percent(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> None
        rate(_, total_time_to_temporal_percent(float(value), scale=scale), **kwargs)

    return temporal_percent

time_elapsed

Send the number of seconds elapsed from a time in the past as a gauge.

For example, if the result is an instance of datetime.datetime representing 5 seconds ago, then this would submit with a value of 5.

The optional modifier format indicates what format the result is in. By default it is native, assuming the underlying library provides timestamps as datetime objects.

If the value is a UNIX timestamp you can set the format modifier to unix_time.

If the value is a string representation of a date, you must provide the expected timestamp format using the supported codes.

Example:

columns:
  - name: time_since_x
    type: time_elapsed
    format: native  # default value and can be omitted
  - name: time_since_y
    type: time_elapsed
    format: unix_time
  - name: time_since_z
    type: time_elapsed
    format: "%d/%m/%Y %H:%M:%S"

Note

The code %z (lower case) is not supported on Windows.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_time_elapsed(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Send the number of seconds elapsed from a time in the past as a `gauge`.

    For example, if the result is an instance of
    [datetime.datetime](https://docs.python.org/3/library/datetime.html#datetime.datetime) representing 5 seconds ago,
    then this would submit with a value of `5`.

    The optional modifier `format` indicates what format the result is in. By default it is `native`, assuming the
    underlying library provides timestamps as `datetime` objects.

    If the value is a UNIX timestamp you can set the `format` modifier to `unix_time`.

    If the value is a string representation of a date, you must provide the expected timestamp format using the
    [supported codes](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).

    Example:

    ```yaml
    columns:
      - name: time_since_x
        type: time_elapsed
        format: native  # default value and can be omitted
      - name: time_since_y
        type: time_elapsed
        format: unix_time
      - name: time_since_z
        type: time_elapsed
        format: "%d/%m/%Y %H:%M:%S"
    ```
    !!! note
        The code `%z` (lower case) is not supported on Windows.
    """
    time_format = modifiers.pop('format', 'native')
    if not isinstance(time_format, str):
        raise ValueError('the `format` parameter must be a string')

    gauge = transformers['gauge'](transformers, column_name, **modifiers)

    if time_format == 'native':

        def time_elapsed(_, value, **kwargs):
            # type: (List, str, Dict[str, Any]) -> None
            value = ensure_aware_datetime(value)
            gauge(_, (datetime.now(value.tzinfo) - value).total_seconds(), **kwargs)

    elif time_format == 'unix_time':

        def time_elapsed(_, value, **kwargs):
            gauge(_, time.time() - value, **kwargs)

    else:

        def time_elapsed(_, value, **kwargs):
            # type: (List, str, Dict[str, Any]) -> None
            value = ensure_aware_datetime(datetime.strptime(value, time_format))
            gauge(_, (datetime.now(value.tzinfo) - value).total_seconds(), **kwargs)

    return time_elapsed

monotonic_gauge

Send the result as both a gauge suffixed by .total and a monotonic_count suffixed by .count.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_monotonic_gauge(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Send the result as both a `gauge` suffixed by `.total` and a `monotonic_count` suffixed by `.count`.
    """
    gauge = transformers['gauge'](transformers, '{}.total'.format(column_name), **modifiers)  # type: Callable
    monotonic_count = transformers['monotonic_count'](
        transformers, '{}.count'.format(column_name), **modifiers
    )  # type: Callable

    def monotonic_gauge(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> None
        gauge(_, value, **kwargs)
        monotonic_count(_, value, **kwargs)

    return monotonic_gauge

service_check

Submit a service check.

The required modifier status_map is a mapping of values to statuses. Valid statuses include:

  • OK
  • WARNING
  • CRITICAL
  • UNKNOWN

Any encountered values that are not defined will be sent as UNKNOWN.

In addition, a message modifier can be passed which can contain placeholders (based on Python's str.format) for other column names from the same query to add a message dynamically to the service_check.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_service_check(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Submit a service check.

    The required modifier `status_map` is a mapping of values to statuses. Valid statuses include:

    - `OK`
    - `WARNING`
    - `CRITICAL`
    - `UNKNOWN`

    Any encountered values that are not defined will be sent as `UNKNOWN`.

    In addition, a `message` modifier can be passed which can contain placeholders
    (based on Python's str.format) for other column names from the same query to add a message
    dynamically to the service_check.
    """
    # Do work in a separate function to avoid having to `del` a bunch of variables
    status_map = _compile_service_check_statuses(modifiers)
    message_field = modifiers.pop('message', None)

    service_check_method = transformers['__service_check'](transformers, column_name, **modifiers)  # type: Callable

    def service_check(sources, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> None
        check_status = status_map.get(value, ServiceCheck.UNKNOWN)
        if not message_field or check_status == ServiceCheck.OK:
            message = None
        else:
            message = message_field.format(**sources)

        service_check_method(sources, check_status, message=message, **kwargs)

    return service_check

tag

Convert a column to a tag that will be used in every subsequent submission.

For example, if you named the column env and the column returned the value prod1, all submissions from that row will be tagged by env:prod1.

This also accepts an optional modifier called boolean that when set to true will transform the result to the string true or false. So for example if you named the column alive and the result was the number 0 the tag will be alive:false.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_tag(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Convert a column to a tag that will be used in every subsequent submission.

    For example, if you named the column `env` and the column returned the value `prod1`, all submissions
    from that row will be tagged by `env:prod1`.

    This also accepts an optional modifier called `boolean` that when set to `true` will transform the result
    to the string `true` or `false`. So for example if you named the column `alive` and the result was the
    number `0` the tag will be `alive:false`.
    """
    template = '{}:{{}}'.format(column_name)
    boolean = is_affirmative(modifiers.pop('boolean', None))

    def tag(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> str
        if boolean:
            value = str(is_affirmative(value)).lower()

        return template.format(value)

    return tag

tag_list

Convert a column to a list of tags that will be used in every submission.

Tag name is determined by column_name. The column value represents a list of values. It is expected to be either a list of strings, or a comma-separated string.

For example, if the column is named server_tag and the column returned the value us,primary, then all submissions for that row will be tagged by server_tag:us and server_tag:primary.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_tag_list(transformers, column_name, **modifiers):
    # type: (Dict[str, Transformer], str, Any) -> Transformer
    """
    Convert a column to a list of tags that will be used in every submission.

    Tag name is determined by `column_name`. The column value represents a list of values. It is expected to be either
    a list of strings, or a comma-separated string.

    For example, if the column is named `server_tag` and the column returned the value `us,primary`, then all
    submissions for that row will be tagged by `server_tag:us` and `server_tag:primary`.
    """
    template = '%s:{}' % column_name

    def tag_list(_, value, **kwargs):
        # type: (List, str, Dict[str, Any]) -> List[str]
        if isinstance(value, str):
            value = [v.strip() for v in value.split(',')]

        return [template.format(v) for v in value]

    return tag_list

Extra

Every column transformer (except tag) is supported at this level, the only difference being one must set a source to retrieve the desired value.

So for example here:

columns:
  - name: foo.bar
    type: rate
extras:
  - name: foo.current
    type: gauge
    source: foo.bar

the metric foo.current will be sent as a gauge with the value of foo.bar.

percent

Send a percentage based on 2 sources as a gauge.

The required modifiers are part and total.

For example, if you have this configuration:

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: disk.utilized
    type: percent
    part: disk.used
    total: disk.total

then the extra metric disk.utilized would be sent as a gauge calculated as disk.used / disk.total * 100.

If the source of total is 0, then the submitted value will always be sent as 0 too.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_percent(transformers, name, **modifiers):
    # type: (Dict[str, Callable], str, Any) -> Transformer
    """
    Send a percentage based on 2 sources as a `gauge`.

    The required modifiers are `part` and `total`.

    For example, if you have this configuration:

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: disk.utilized
        type: percent
        part: disk.used
        total: disk.total
    ```

    then the extra metric `disk.utilized` would be sent as a `gauge` calculated as `disk.used / disk.total * 100`.

    If the source of `total` is `0`, then the submitted value will always be sent as `0` too.
    """
    available_sources = modifiers.pop('sources')

    part = modifiers.pop('part', None)
    if part is None:
        raise ValueError('the `part` parameter is required')
    elif not isinstance(part, str):
        raise ValueError('the `part` parameter must be a string')
    elif part not in available_sources:
        raise ValueError('the `part` parameter `{}` is not an available source'.format(part))

    total = modifiers.pop('total', None)
    if total is None:
        raise ValueError('the `total` parameter is required')
    elif not isinstance(total, str):
        raise ValueError('the `total` parameter must be a string')
    elif total not in available_sources:
        raise ValueError('the `total` parameter `{}` is not an available source'.format(total))

    del available_sources
    gauge = transformers['gauge'](transformers, name, **modifiers)
    gauge = create_extra_transformer(gauge)

    def percent(sources, **kwargs):
        gauge(sources, compute_percent(sources[part], sources[total]), **kwargs)

    return percent

expression

This allows the evaluation of a limited subset of Python syntax and built-in functions.

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: disk.free
    expression: disk.total - disk.used
    submit_type: gauge

For brevity, if the expression attribute exists and type does not then it is assumed the type is expression. The submit_type can be any transformer and any extra options are passed down to it.

The result of every expression is stored, so in lieu of a submit_type the above example could also be written as:

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: free
    expression: disk.total - disk.used
  - name: disk.free
    type: gauge
    source: free

The order matters though, so for example the following will fail:

columns:
  - name: disk.total
    type: gauge
  - name: disk.used
    type: gauge
extras:
  - name: disk.free
    type: gauge
    source: free
  - name: free
    expression: disk.total - disk.used

since the source free does not yet exist.

Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_expression(transformers, name, **modifiers):
    # type: (Dict[str, Transformer], str, Dict[str, Any]) -> Transformer
    """
    This allows the evaluation of a limited subset of Python syntax and built-in functions.

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: disk.free
        expression: disk.total - disk.used
        submit_type: gauge
    ```

    For brevity, if the `expression` attribute exists and `type` does not then it is assumed the type is
    `expression`. The `submit_type` can be any transformer and any extra options are passed down to it.

    The result of every expression is stored, so in lieu of a `submit_type` the above example could also be written as:

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: free
        expression: disk.total - disk.used
      - name: disk.free
        type: gauge
        source: free
    ```

    The order matters though, so for example the following will fail:

    ```yaml
    columns:
      - name: disk.total
        type: gauge
      - name: disk.used
        type: gauge
    extras:
      - name: disk.free
        type: gauge
        source: free
      - name: free
        expression: disk.total - disk.used
    ```

    since the source `free` does not yet exist.
    """
    available_sources = modifiers.pop('sources')

    expression = modifiers.pop('expression', None)
    if expression is None:
        raise ValueError('the `expression` parameter is required')
    elif not isinstance(expression, str):
        raise ValueError('the `expression` parameter must be a string')
    elif not expression:
        raise ValueError('the `expression` parameter must not be empty')

    if not modifiers.pop('verbose', False):
        # Sort the sources in reverse order of length to prevent greedy matching
        available_sources = sorted(available_sources, key=lambda s: -len(s))

        # Escape special characters, mostly for the possible dots in metric names
        available_sources = list(map(re.escape, available_sources))

        # Finally, utilize the order by relying on the guarantees provided by the alternation operator
        available_sources = '|'.join(available_sources)

        expression = re.sub(
            SOURCE_PATTERN.format(available_sources),
            # Replace by the particular source that matched
            lambda match_obj: 'SOURCES["{}"]'.format(match_obj.group(1)),
            expression,
        )

    expression = compile(expression, filename=name, mode='eval')

    del available_sources

    if 'submit_type' in modifiers:
        if modifiers['submit_type'] not in transformers:
            raise ValueError('unknown submit_type `{}`'.format(modifiers['submit_type']))

        submit_method = transformers[modifiers.pop('submit_type')](transformers, name, **modifiers)  # type: Transformer
        submit_method = create_extra_transformer(submit_method)  # type: Callable

        def execute_expression(sources, **kwargs):
            # type: (Dict[str, Any], Dict[str, Any]) -> float
            result = eval(expression, ALLOWED_GLOBALS, {'SOURCES': sources})  # type: float
            submit_method(sources, result, **kwargs)
            return result

    else:

        def execute_expression(sources, **kwargs):
            # type: (Dict[str, Any], Dict[str, Any]) -> Any
            return eval(expression, ALLOWED_GLOBALS, {'SOURCES': sources})

    return execute_expression

Last update: September 15, 2023