Databases¶
No matter the database you wish to monitor, the base package provides a standard way to define and collect data from arbitrary queries.
The core premise is that you define a function that accepts a query (usually a str
) and it returns a sequence of equal length results.
Interface¶
All the functionality is exposed by the Query
and QueryManager
classes.
datadog_checks.base.utils.db.query.Query
¶
This class accepts a single dict
argument which is necessary to run the query. The representation is based on our custom_queries
format originally designed and implemented in #1528.
It is now part of all our database integrations and other products have since adopted this format.
Source code in datadog_checks_base/datadog_checks/base/utils/db/query.py
class Query(object):
"""
This class accepts a single `dict` argument which is necessary to run the query. The representation
is based on our `custom_queries` format originally designed and implemented in !1528.
It is now part of all our database integrations and
[other](https://cloud.google.com/solutions/sap/docs/sap-hana-monitoring-agent-planning-guide#defining_custom_queries)
products have since adopted this format.
"""
def __init__(self, query_data):
'''
Parameters:
query_data (Dict[str, Any]): The query data to run the query. It should contain the following fields:
- name (str): The name of the query.
- query (str): The query to run.
- columns (List[Dict[str, Any]]): Each column should contain the following fields:
- name (str): The name of the column.
- type (str): The type of the column.
- (Optional) Any other field that the column transformer for the type requires.
- (Optional) extras (List[Dict[str, Any]]): Each extra transformer should contain the following fields:
- name (str): The name of the extra transformer.
- type (str): The type of the extra transformer.
- (Optional) Any other field that the extra transformer for the type requires.
- (Optional) tags (List[str]): The tags to add to the query result.
- (Optional) collection_interval (int): The collection interval (in seconds) of the query.
Note:
If collection_interval is None, the query will be run every check run.
If the collection interval is less than check collection interval,
the query will be run every check run.
If the collection interval is greater than check collection interval,
the query will NOT BE RUN exactly at the collection interval.
The query will be run at the next check run after the collection interval has passed.
- (Optional) metric_prefix (str): The prefix to add to the metric name.
Note: If the metric prefix is None, the default metric prefix `<INTEGRATION>.` will be used.
'''
# Contains the data to fill the rest of the attributes
self.query_data = deepcopy(query_data or {}) # type: Dict[str, Any]
self.name = None # type: str
# The actual query
self.query = None # type: str
# Contains a mapping of column_name -> column_type, transformer
self.column_transformers = None # type: Tuple[Tuple[str, Tuple[str, Transformer]]]
# These transformers are used to collect extra metrics calculated from the query result
self.extra_transformers = None # type: List[Tuple[str, Transformer]]
# Contains the tags defined in query_data, more tags can be added later from the query result
self.base_tags = None # type: List[str]
# The collecton interval (in seconds) of the query. If None, the query will be run every check run.
self.collection_interval = None # type: int
# The last time the query was executed. If None, the query has never been executed.
# This is only used when the collection_interval is not None.
self.__last_execution_time = None # type: float
# whether to ignore any defined namespace prefix. True when `metric_prefix` is defined.
self.metric_name_raw = False # type: bool
def compile(
self,
column_transformers, # type: Dict[str, TransformerFactory]
extra_transformers, # type: Dict[str, TransformerFactory]
):
# type: (...) -> None
"""
This idempotent method will be called by `QueryManager.compile_queries` so you
should never need to call it directly.
"""
# Check for previous compilation
if self.name is not None:
return
query_name = self.query_data.get('name')
if not query_name:
raise ValueError('query field `name` is required')
elif not isinstance(query_name, str):
raise ValueError('query field `name` must be a string')
metric_prefix = self.query_data.get('metric_prefix')
if metric_prefix is not None:
if not isinstance(metric_prefix, str):
raise ValueError('field `metric_prefix` for {} must be a string'.format(query_name))
elif not metric_prefix:
raise ValueError('field `metric_prefix` for {} must not be empty'.format(query_name))
query = self.query_data.get('query')
if not query:
raise ValueError('field `query` for {} is required'.format(query_name))
elif query_name.startswith('custom query #') and not isinstance(query, str):
raise ValueError('field `query` for {} must be a string'.format(query_name))
columns = self.query_data.get('columns')
if not columns:
raise ValueError('field `columns` for {} is required'.format(query_name))
elif not isinstance(columns, list):
raise ValueError('field `columns` for {} must be a list'.format(query_name))
tags = self.query_data.get('tags', [])
if tags is not None and not isinstance(tags, list):
raise ValueError('field `tags` for {} must be a list'.format(query_name))
# Keep track of all defined names
sources = {}
column_data = []
for i, column in enumerate(columns, 1):
# Columns can be ignored via configuration.
if not column:
column_data.append((None, None))
continue
elif not isinstance(column, dict):
raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))
column_name = column.get('name')
if not column_name:
raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
elif not isinstance(column_name, str):
raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))
elif column_name in sources:
raise ValueError(
'the name {} of {} was already defined in {} #{}'.format(
column_name, query_name, sources[column_name]['type'], sources[column_name]['index']
)
)
sources[column_name] = {'type': 'column', 'index': i}
column_type = column.get('type')
if not column_type:
raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
elif not isinstance(column_type, str):
raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
elif column_type == 'source':
column_data.append((column_name, (None, None)))
continue
elif column_type not in column_transformers:
raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))
__column_type_is_tag = column_type in ('tag', 'tag_list', 'tag_not_null')
modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}
try:
if not __column_type_is_tag and metric_prefix:
# if metric_prefix is defined, we prepend it to the column name
column_name = "{}.{}".format(metric_prefix, column_name)
transformer = column_transformers[column_type](column_transformers, column_name, **modifiers)
except Exception as e:
error = 'error compiling type `{}` for column {} of {}: {}'.format(
column_type, column_name, query_name, e
)
# Prepend helpful error text.
#
# When an exception is raised in the context of another one, both will be printed. To avoid
# this we set the context to None. https://www.python.org/dev/peps/pep-0409/
raise type(e)(error) from None
else:
if __column_type_is_tag:
column_data.append((column_name, (column_type, transformer)))
else:
# All these would actually submit data. As that is the default case, we represent it as
# a reference to None since if we use e.g. `value` it would never be checked anyway.
column_data.append((column_name, (None, transformer)))
submission_transformers = column_transformers.copy() # type: Dict[str, Transformer]
submission_transformers.pop('tag')
submission_transformers.pop('tag_list')
submission_transformers.pop('tag_not_null')
extras = self.query_data.get('extras', []) # type: List[Dict[str, Any]]
if not isinstance(extras, list):
raise ValueError('field `extras` for {} must be a list'.format(query_name))
extra_data = [] # type: List[Tuple[str, Transformer]]
for i, extra in enumerate(extras, 1):
if not isinstance(extra, dict):
raise ValueError('extra #{} of {} is not a mapping'.format(i, query_name))
extra_type = extra.get('type') # type: str
extra_name = extra.get('name') # type: str
if extra_type == 'log':
# The name is unused
extra_name = 'log'
elif not extra_name:
raise ValueError('field `name` for extra #{} of {} is required'.format(i, query_name))
elif not isinstance(extra_name, str):
raise ValueError('field `name` for extra #{} of {} must be a string'.format(i, query_name))
elif extra_name in sources:
raise ValueError(
'the name {} of {} was already defined in {} #{}'.format(
extra_name, query_name, sources[extra_name]['type'], sources[extra_name]['index']
)
)
sources[extra_name] = {'type': 'extra', 'index': i}
if not extra_type:
if 'expression' in extra:
extra_type = 'expression'
else:
raise ValueError('field `type` for extra {} of {} is required'.format(extra_name, query_name))
elif not isinstance(extra_type, str):
raise ValueError('field `type` for extra {} of {} must be a string'.format(extra_name, query_name))
elif extra_type not in extra_transformers and extra_type not in submission_transformers:
raise ValueError('unknown type `{}` for extra {} of {}'.format(extra_type, extra_name, query_name))
transformer_factory = extra_transformers.get(
extra_type, submission_transformers.get(extra_type)
) # type: TransformerFactory
extra_source = extra.get('source')
if extra_type in submission_transformers:
if not extra_source:
raise ValueError('field `source` for extra {} of {} is required'.format(extra_name, query_name))
modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type', 'source')}
else:
modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type')}
modifiers['sources'] = sources
try:
transformer = transformer_factory(submission_transformers, extra_name, **modifiers)
except Exception as e:
error = 'error compiling type `{}` for extra {} of {}: {}'.format(extra_type, extra_name, query_name, e)
raise type(e)(error) from None
else:
if extra_type in submission_transformers:
transformer = create_extra_transformer(transformer, extra_source)
extra_data.append((extra_name, transformer))
collection_interval = self.query_data.get('collection_interval')
if collection_interval is not None:
if not isinstance(collection_interval, (int, float)):
raise ValueError('field `collection_interval` for {} must be a number'.format(query_name))
elif int(collection_interval) <= 0:
raise ValueError(
'field `collection_interval` for {} must be a positive number after rounding'.format(query_name)
)
collection_interval = int(collection_interval)
self.name = query_name
self.query = query
self.column_transformers = tuple(column_data)
self.extra_transformers = tuple(extra_data)
self.base_tags = tags
self.collection_interval = collection_interval
self.metric_name_raw = metric_prefix is not None
del self.query_data
def should_execute(self):
'''
Check if the query should be executed based on the collection interval.
:return: True if the query should be executed, False otherwise.
'''
if self.collection_interval is None:
# if the collection interval is None, the query should always be executed.
return True
now = get_timestamp()
if self.__last_execution_time is None or now - self.__last_execution_time >= self.collection_interval:
# if the last execution time is None (the query has never been executed),
# if the time since the last execution is greater than or equal to the collection interval,
# the query should be executed.
self.__last_execution_time = now
return True
return False
__init__(query_data)
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query_data | Dict[str, Any] | The query data to run the query. It should contain the following fields: - name (str): The name of the query. - query (str): The query to run. - columns (List[Dict[str, Any]]): Each column should contain the following fields: - name (str): The name of the column. - type (str): The type of the column. - (Optional) Any other field that the column transformer for the type requires. - (Optional) extras (List[Dict[str, Any]]): Each extra transformer should contain the following fields: - name (str): The name of the extra transformer. - type (str): The type of the extra transformer. - (Optional) Any other field that the extra transformer for the type requires. - (Optional) tags (List[str]): The tags to add to the query result. - (Optional) collection_interval (int): The collection interval (in seconds) of the query. Note: If collection_interval is None, the query will be run every check run. If the collection interval is less than check collection interval, the query will be run every check run. If the collection interval is greater than check collection interval, the query will NOT BE RUN exactly at the collection interval. The query will be run at the next check run after the collection interval has passed. - (Optional) metric_prefix (str): The prefix to add to the metric name. Note: If the metric prefix is None, the default metric prefix | required |
Source code in datadog_checks_base/datadog_checks/base/utils/db/query.py
def __init__(self, query_data):
'''
Parameters:
query_data (Dict[str, Any]): The query data to run the query. It should contain the following fields:
- name (str): The name of the query.
- query (str): The query to run.
- columns (List[Dict[str, Any]]): Each column should contain the following fields:
- name (str): The name of the column.
- type (str): The type of the column.
- (Optional) Any other field that the column transformer for the type requires.
- (Optional) extras (List[Dict[str, Any]]): Each extra transformer should contain the following fields:
- name (str): The name of the extra transformer.
- type (str): The type of the extra transformer.
- (Optional) Any other field that the extra transformer for the type requires.
- (Optional) tags (List[str]): The tags to add to the query result.
- (Optional) collection_interval (int): The collection interval (in seconds) of the query.
Note:
If collection_interval is None, the query will be run every check run.
If the collection interval is less than check collection interval,
the query will be run every check run.
If the collection interval is greater than check collection interval,
the query will NOT BE RUN exactly at the collection interval.
The query will be run at the next check run after the collection interval has passed.
- (Optional) metric_prefix (str): The prefix to add to the metric name.
Note: If the metric prefix is None, the default metric prefix `<INTEGRATION>.` will be used.
'''
# Contains the data to fill the rest of the attributes
self.query_data = deepcopy(query_data or {}) # type: Dict[str, Any]
self.name = None # type: str
# The actual query
self.query = None # type: str
# Contains a mapping of column_name -> column_type, transformer
self.column_transformers = None # type: Tuple[Tuple[str, Tuple[str, Transformer]]]
# These transformers are used to collect extra metrics calculated from the query result
self.extra_transformers = None # type: List[Tuple[str, Transformer]]
# Contains the tags defined in query_data, more tags can be added later from the query result
self.base_tags = None # type: List[str]
# The collecton interval (in seconds) of the query. If None, the query will be run every check run.
self.collection_interval = None # type: int
# The last time the query was executed. If None, the query has never been executed.
# This is only used when the collection_interval is not None.
self.__last_execution_time = None # type: float
# whether to ignore any defined namespace prefix. True when `metric_prefix` is defined.
self.metric_name_raw = False # type: bool
compile(column_transformers, extra_transformers)
¶
This idempotent method will be called by QueryManager.compile_queries
so you should never need to call it directly.
Source code in datadog_checks_base/datadog_checks/base/utils/db/query.py
def compile(
self,
column_transformers, # type: Dict[str, TransformerFactory]
extra_transformers, # type: Dict[str, TransformerFactory]
):
# type: (...) -> None
"""
This idempotent method will be called by `QueryManager.compile_queries` so you
should never need to call it directly.
"""
# Check for previous compilation
if self.name is not None:
return
query_name = self.query_data.get('name')
if not query_name:
raise ValueError('query field `name` is required')
elif not isinstance(query_name, str):
raise ValueError('query field `name` must be a string')
metric_prefix = self.query_data.get('metric_prefix')
if metric_prefix is not None:
if not isinstance(metric_prefix, str):
raise ValueError('field `metric_prefix` for {} must be a string'.format(query_name))
elif not metric_prefix:
raise ValueError('field `metric_prefix` for {} must not be empty'.format(query_name))
query = self.query_data.get('query')
if not query:
raise ValueError('field `query` for {} is required'.format(query_name))
elif query_name.startswith('custom query #') and not isinstance(query, str):
raise ValueError('field `query` for {} must be a string'.format(query_name))
columns = self.query_data.get('columns')
if not columns:
raise ValueError('field `columns` for {} is required'.format(query_name))
elif not isinstance(columns, list):
raise ValueError('field `columns` for {} must be a list'.format(query_name))
tags = self.query_data.get('tags', [])
if tags is not None and not isinstance(tags, list):
raise ValueError('field `tags` for {} must be a list'.format(query_name))
# Keep track of all defined names
sources = {}
column_data = []
for i, column in enumerate(columns, 1):
# Columns can be ignored via configuration.
if not column:
column_data.append((None, None))
continue
elif not isinstance(column, dict):
raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))
column_name = column.get('name')
if not column_name:
raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
elif not isinstance(column_name, str):
raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))
elif column_name in sources:
raise ValueError(
'the name {} of {} was already defined in {} #{}'.format(
column_name, query_name, sources[column_name]['type'], sources[column_name]['index']
)
)
sources[column_name] = {'type': 'column', 'index': i}
column_type = column.get('type')
if not column_type:
raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
elif not isinstance(column_type, str):
raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
elif column_type == 'source':
column_data.append((column_name, (None, None)))
continue
elif column_type not in column_transformers:
raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))
__column_type_is_tag = column_type in ('tag', 'tag_list', 'tag_not_null')
modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}
try:
if not __column_type_is_tag and metric_prefix:
# if metric_prefix is defined, we prepend it to the column name
column_name = "{}.{}".format(metric_prefix, column_name)
transformer = column_transformers[column_type](column_transformers, column_name, **modifiers)
except Exception as e:
error = 'error compiling type `{}` for column {} of {}: {}'.format(
column_type, column_name, query_name, e
)
# Prepend helpful error text.
#
# When an exception is raised in the context of another one, both will be printed. To avoid
# this we set the context to None. https://www.python.org/dev/peps/pep-0409/
raise type(e)(error) from None
else:
if __column_type_is_tag:
column_data.append((column_name, (column_type, transformer)))
else:
# All these would actually submit data. As that is the default case, we represent it as
# a reference to None since if we use e.g. `value` it would never be checked anyway.
column_data.append((column_name, (None, transformer)))
submission_transformers = column_transformers.copy() # type: Dict[str, Transformer]
submission_transformers.pop('tag')
submission_transformers.pop('tag_list')
submission_transformers.pop('tag_not_null')
extras = self.query_data.get('extras', []) # type: List[Dict[str, Any]]
if not isinstance(extras, list):
raise ValueError('field `extras` for {} must be a list'.format(query_name))
extra_data = [] # type: List[Tuple[str, Transformer]]
for i, extra in enumerate(extras, 1):
if not isinstance(extra, dict):
raise ValueError('extra #{} of {} is not a mapping'.format(i, query_name))
extra_type = extra.get('type') # type: str
extra_name = extra.get('name') # type: str
if extra_type == 'log':
# The name is unused
extra_name = 'log'
elif not extra_name:
raise ValueError('field `name` for extra #{} of {} is required'.format(i, query_name))
elif not isinstance(extra_name, str):
raise ValueError('field `name` for extra #{} of {} must be a string'.format(i, query_name))
elif extra_name in sources:
raise ValueError(
'the name {} of {} was already defined in {} #{}'.format(
extra_name, query_name, sources[extra_name]['type'], sources[extra_name]['index']
)
)
sources[extra_name] = {'type': 'extra', 'index': i}
if not extra_type:
if 'expression' in extra:
extra_type = 'expression'
else:
raise ValueError('field `type` for extra {} of {} is required'.format(extra_name, query_name))
elif not isinstance(extra_type, str):
raise ValueError('field `type` for extra {} of {} must be a string'.format(extra_name, query_name))
elif extra_type not in extra_transformers and extra_type not in submission_transformers:
raise ValueError('unknown type `{}` for extra {} of {}'.format(extra_type, extra_name, query_name))
transformer_factory = extra_transformers.get(
extra_type, submission_transformers.get(extra_type)
) # type: TransformerFactory
extra_source = extra.get('source')
if extra_type in submission_transformers:
if not extra_source:
raise ValueError('field `source` for extra {} of {} is required'.format(extra_name, query_name))
modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type', 'source')}
else:
modifiers = {key: value for key, value in extra.items() if key not in ('name', 'type')}
modifiers['sources'] = sources
try:
transformer = transformer_factory(submission_transformers, extra_name, **modifiers)
except Exception as e:
error = 'error compiling type `{}` for extra {} of {}: {}'.format(extra_type, extra_name, query_name, e)
raise type(e)(error) from None
else:
if extra_type in submission_transformers:
transformer = create_extra_transformer(transformer, extra_source)
extra_data.append((extra_name, transformer))
collection_interval = self.query_data.get('collection_interval')
if collection_interval is not None:
if not isinstance(collection_interval, (int, float)):
raise ValueError('field `collection_interval` for {} must be a number'.format(query_name))
elif int(collection_interval) <= 0:
raise ValueError(
'field `collection_interval` for {} must be a positive number after rounding'.format(query_name)
)
collection_interval = int(collection_interval)
self.name = query_name
self.query = query
self.column_transformers = tuple(column_data)
self.extra_transformers = tuple(extra_data)
self.base_tags = tags
self.collection_interval = collection_interval
self.metric_name_raw = metric_prefix is not None
del self.query_data
datadog_checks.base.utils.db.core.QueryManager
¶
This class is in charge of running any number of Query
instances for a single Check instance.
You will most often see it created during Check initialization like this:
self._query_manager = QueryManager(
self,
self.execute_query,
queries=[
queries.SomeQuery1,
queries.SomeQuery2,
queries.SomeQuery3,
queries.SomeQuery4,
queries.SomeQuery5,
],
tags=self.instance.get('tags', []),
error_handler=self._error_sanitizer,
)
self.check_initializations.append(self._query_manager.compile_queries)
Note: This class is not in charge of opening or closing connections, just running queries.
Source code in datadog_checks_base/datadog_checks/base/utils/db/core.py
class QueryManager(QueryExecutor):
"""
This class is in charge of running any number of `Query` instances for a single Check instance.
You will most often see it created during Check initialization like this:
```python
self._query_manager = QueryManager(
self,
self.execute_query,
queries=[
queries.SomeQuery1,
queries.SomeQuery2,
queries.SomeQuery3,
queries.SomeQuery4,
queries.SomeQuery5,
],
tags=self.instance.get('tags', []),
error_handler=self._error_sanitizer,
)
self.check_initializations.append(self._query_manager.compile_queries)
```
Note: This class is not in charge of opening or closing connections, just running queries.
"""
def __init__(
self,
check, # type: AgentCheck
executor, # type: QueriesExecutor
queries=None, # type: List[Dict[str, Any]]
tags=None, # type: List[str]
error_handler=None, # type: Callable[[str], str]
hostname=None, # type: str
): # type: (...) -> QueryManager
"""
- **check** (_AgentCheck_) - an instance of a Check
- **executor** (_callable_) - a callable accepting a `str` query as its sole argument and returning
a sequence representing either the full result set or an iterator over the result set
- **queries** (_List[Dict]_) - a list of queries in dict format
- **tags** (_List[str]_) - a list of tags to associate with every submission
- **error_handler** (_callable_) - a callable accepting a `str` error as its sole argument and returning
a sanitized string, useful for scrubbing potentially sensitive information libraries emit
"""
super(QueryManager, self).__init__(
executor=executor,
submitter=check,
queries=queries,
tags=tags,
error_handler=error_handler,
hostname=hostname,
logger=check.log,
)
self.check = check # type: AgentCheck
only_custom_queries = is_affirmative(self.check.instance.get('only_custom_queries', False)) # type: bool
custom_queries = list(self.check.instance.get('custom_queries', [])) # type: List[str]
use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True) # type: str
# Handle overrides
if use_global_custom_queries == 'extend':
custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
elif (
not custom_queries
and 'global_custom_queries' in self.check.init_config
and is_affirmative(use_global_custom_queries)
):
custom_queries = self.check.init_config.get('global_custom_queries', [])
# Override statement queries if only running custom queries
if only_custom_queries:
self.queries = []
# Deduplicate
for i, custom_query in enumerate(iter_unique(custom_queries), 1):
query = Query(custom_query)
query.query_data.setdefault('name', 'custom query #{}'.format(i))
self.queries.append(query)
if len(self.queries) == 0:
self.logger.warning('QueryManager initialized with no query')
def execute(self, extra_tags=None):
# This needs to stay here b/c when we construct a QueryManager in a check's __init__
# there is no check ID at that point
self.logger = self.check.log
return super(QueryManager, self).execute(extra_tags)
__init__(check, executor, queries=None, tags=None, error_handler=None, hostname=None)
¶
- check (AgentCheck) - an instance of a Check
- executor (callable) - a callable accepting a
str
query as its sole argument and returning a sequence representing either the full result set or an iterator over the result set - queries (List[Dict]) - a list of queries in dict format
- tags (List[str]) - a list of tags to associate with every submission
- error_handler (callable) - a callable accepting a
str
error as its sole argument and returning a sanitized string, useful for scrubbing potentially sensitive information libraries emit
Source code in datadog_checks_base/datadog_checks/base/utils/db/core.py
def __init__(
self,
check, # type: AgentCheck
executor, # type: QueriesExecutor
queries=None, # type: List[Dict[str, Any]]
tags=None, # type: List[str]
error_handler=None, # type: Callable[[str], str]
hostname=None, # type: str
): # type: (...) -> QueryManager
"""
- **check** (_AgentCheck_) - an instance of a Check
- **executor** (_callable_) - a callable accepting a `str` query as its sole argument and returning
a sequence representing either the full result set or an iterator over the result set
- **queries** (_List[Dict]_) - a list of queries in dict format
- **tags** (_List[str]_) - a list of tags to associate with every submission
- **error_handler** (_callable_) - a callable accepting a `str` error as its sole argument and returning
a sanitized string, useful for scrubbing potentially sensitive information libraries emit
"""
super(QueryManager, self).__init__(
executor=executor,
submitter=check,
queries=queries,
tags=tags,
error_handler=error_handler,
hostname=hostname,
logger=check.log,
)
self.check = check # type: AgentCheck
only_custom_queries = is_affirmative(self.check.instance.get('only_custom_queries', False)) # type: bool
custom_queries = list(self.check.instance.get('custom_queries', [])) # type: List[str]
use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True) # type: str
# Handle overrides
if use_global_custom_queries == 'extend':
custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
elif (
not custom_queries
and 'global_custom_queries' in self.check.init_config
and is_affirmative(use_global_custom_queries)
):
custom_queries = self.check.init_config.get('global_custom_queries', [])
# Override statement queries if only running custom queries
if only_custom_queries:
self.queries = []
# Deduplicate
for i, custom_query in enumerate(iter_unique(custom_queries), 1):
query = Query(custom_query)
query.query_data.setdefault('name', 'custom query #{}'.format(i))
self.queries.append(query)
if len(self.queries) == 0:
self.logger.warning('QueryManager initialized with no query')
execute(extra_tags=None)
¶
Source code in datadog_checks_base/datadog_checks/base/utils/db/core.py
def execute(self, extra_tags=None):
# This needs to stay here b/c when we construct a QueryManager in a check's __init__
# there is no check ID at that point
self.logger = self.check.log
return super(QueryManager, self).execute(extra_tags)
Transformers¶
Column¶
match¶
This is used for querying unstructured data.
For example, say you want to collect the fields named foo
and bar
. Typically, they would be stored like:
foo | bar |
---|---|
4 | 2 |
and would be queried like:
SELECT foo, bar FROM ...
Often, you will instead find data stored in the following format:
metric | value |
---|---|
foo | 4 |
bar | 2 |
and would be queried like:
SELECT metric, value FROM ...
In this case, the metric
column stores the name with which to match on and its value
is stored in a separate column.
The required items
modifier is a mapping of matched names to column data values. Consider the values to be exactly the same as the entries in the columns
top level field. You must also define a source
modifier either for this transformer itself or in the values of items
(which will take precedence). The source will be treated as the value of the match.
Say this is your configuration:
query: SELECT source1, source2, metric FROM TABLE
columns:
- name: value1
type: source
- name: value2
type: source
- name: metric_name
type: match
source: value1
items:
foo:
name: test.foo
type: gauge
source: value2
bar:
name: test.bar
type: monotonic_gauge
and the result set is:
source1 | source2 | metric |
---|---|---|
1 | 2 | foo |
3 | 4 | baz |
5 | 6 | bar |
Here's what would be submitted:
foo
-test.foo
as agauge
with a value of2
bar
-test.bar.total
as agauge
andtest.bar.count
as amonotonic_count
, both with a value of5
baz
- nothing since it was not defined as a match item
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_match(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
This is used for querying unstructured data.
For example, say you want to collect the fields named `foo` and `bar`. Typically, they would be stored like:
| foo | bar |
| --- | --- |
| 4 | 2 |
and would be queried like:
```sql
SELECT foo, bar FROM ...
```
Often, you will instead find data stored in the following format:
| metric | value |
| ------ | ----- |
| foo | 4 |
| bar | 2 |
and would be queried like:
```sql
SELECT metric, value FROM ...
```
In this case, the `metric` column stores the name with which to match on and its `value` is
stored in a separate column.
The required `items` modifier is a mapping of matched names to column data values. Consider the values
to be exactly the same as the entries in the `columns` top level field. You must also define a `source`
modifier either for this transformer itself or in the values of `items` (which will take precedence).
The source will be treated as the value of the match.
Say this is your configuration:
```yaml
query: SELECT source1, source2, metric FROM TABLE
columns:
- name: value1
type: source
- name: value2
type: source
- name: metric_name
type: match
source: value1
items:
foo:
name: test.foo
type: gauge
source: value2
bar:
name: test.bar
type: monotonic_gauge
```
and the result set is:
| source1 | source2 | metric |
| ------- | ------- | ------ |
| 1 | 2 | foo |
| 3 | 4 | baz |
| 5 | 6 | bar |
Here's what would be submitted:
- `foo` - `test.foo` as a `gauge` with a value of `2`
- `bar` - `test.bar.total` as a `gauge` and `test.bar.count` as a `monotonic_count`, both with a value of `5`
- `baz` - nothing since it was not defined as a match item
"""
# Do work in a separate function to avoid having to `del` a bunch of variables
compiled_items = _compile_match_items(transformers, modifiers) # type: Dict[str, Tuple[str, Transformer]]
def match(sources, value, **kwargs):
# type: (Dict[str, Any], str, Dict[str, Any]) -> None
if value in compiled_items:
source, transformer = compiled_items[value] # type: str, Transformer
transformer(sources, sources[source], **kwargs)
return match
temporal_percent¶
Send the result as percentage of time since the last check run as a rate
.
For example, say the result is a forever increasing counter representing the total time spent pausing for garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent pausing since the previous collection interval it becomes a useful metric.
There is one required parameter called scale
that indicates what unit of time the result should be considered. Valid values are:
second
millisecond
microsecond
nanosecond
You may also define the unit as an integer number of parts compared to seconds e.g. millisecond
is equivalent to 1000
.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_temporal_percent(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
Send the result as percentage of time since the last check run as a `rate`.
For example, say the result is a forever increasing counter representing the total time spent pausing for
garbage collection since start up. That number by itself is quite useless, but as a percentage of time spent
pausing since the previous collection interval it becomes a useful metric.
There is one required parameter called `scale` that indicates what unit of time the result should be considered.
Valid values are:
- `second`
- `millisecond`
- `microsecond`
- `nanosecond`
You may also define the unit as an integer number of parts compared to seconds e.g. `millisecond` is
equivalent to `1000`.
"""
scale = modifiers.pop('scale', None)
if scale is None:
raise ValueError('the `scale` parameter is required')
if isinstance(scale, str):
scale = constants.TIME_UNITS.get(scale.lower())
if scale is None:
raise ValueError(
'the `scale` parameter must be one of: {}'.format(' | '.join(sorted(constants.TIME_UNITS)))
)
elif not isinstance(scale, int):
raise ValueError(
'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond'
)
rate = transformers['rate'](transformers, column_name, **modifiers) # type: Callable
def temporal_percent(_, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> None
rate(_, total_time_to_temporal_percent(float(value), scale=scale), **kwargs)
return temporal_percent
time_elapsed¶
Send the number of seconds elapsed from a time in the past as a gauge
.
For example, if the result is an instance of datetime.datetime representing 5 seconds ago, then this would submit with a value of 5
.
The optional modifier format
indicates what format the result is in. By default it is native
, assuming the underlying library provides timestamps as datetime
objects.
If the value is a UNIX timestamp you can set the format
modifier to unix_time
.
If the value is a string representation of a date, you must provide the expected timestamp format using the supported codes.
Example:
columns:
- name: time_since_x
type: time_elapsed
format: native # default value and can be omitted
- name: time_since_y
type: time_elapsed
format: unix_time
- name: time_since_z
type: time_elapsed
format: "%d/%m/%Y %H:%M:%S"
Note
The code %z
(lower case) is not supported on Windows.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_time_elapsed(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
Send the number of seconds elapsed from a time in the past as a `gauge`.
For example, if the result is an instance of
[datetime.datetime](https://docs.python.org/3/library/datetime.html#datetime.datetime) representing 5 seconds ago,
then this would submit with a value of `5`.
The optional modifier `format` indicates what format the result is in. By default it is `native`, assuming the
underlying library provides timestamps as `datetime` objects.
If the value is a UNIX timestamp you can set the `format` modifier to `unix_time`.
If the value is a string representation of a date, you must provide the expected timestamp format using the
[supported codes](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).
Example:
```yaml
columns:
- name: time_since_x
type: time_elapsed
format: native # default value and can be omitted
- name: time_since_y
type: time_elapsed
format: unix_time
- name: time_since_z
type: time_elapsed
format: "%d/%m/%Y %H:%M:%S"
```
!!! note
The code `%z` (lower case) is not supported on Windows.
"""
time_format = modifiers.pop('format', 'native')
if not isinstance(time_format, str):
raise ValueError('the `format` parameter must be a string')
gauge = transformers['gauge'](transformers, column_name, **modifiers)
if time_format == 'native':
def time_elapsed(_, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> None
value = ensure_aware_datetime(value)
gauge(_, (datetime.now(value.tzinfo) - value).total_seconds(), **kwargs)
elif time_format == 'unix_time':
def time_elapsed(_, value, **kwargs):
gauge(_, time.time() - value, **kwargs)
else:
def time_elapsed(_, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> None
value = ensure_aware_datetime(datetime.strptime(value, time_format))
gauge(_, (datetime.now(value.tzinfo) - value).total_seconds(), **kwargs)
return time_elapsed
monotonic_gauge¶
Send the result as both a gauge
suffixed by .total
and a monotonic_count
suffixed by .count
.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_monotonic_gauge(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
Send the result as both a `gauge` suffixed by `.total` and a `monotonic_count` suffixed by `.count`.
"""
gauge = transformers['gauge'](transformers, '{}.total'.format(column_name), **modifiers) # type: Callable
monotonic_count = transformers['monotonic_count'](
transformers, '{}.count'.format(column_name), **modifiers
) # type: Callable
def monotonic_gauge(_, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> None
gauge(_, value, **kwargs)
monotonic_count(_, value, **kwargs)
return monotonic_gauge
service_check¶
Submit a service check.
The required modifier status_map
is a mapping of values to statuses. Valid statuses include:
OK
WARNING
CRITICAL
UNKNOWN
Any encountered values that are not defined will be sent as UNKNOWN
.
In addition, a message
modifier can be passed which can contain placeholders (based on Python's str.format) for other column names from the same query to add a message dynamically to the service_check.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_service_check(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
Submit a service check.
The required modifier `status_map` is a mapping of values to statuses. Valid statuses include:
- `OK`
- `WARNING`
- `CRITICAL`
- `UNKNOWN`
Any encountered values that are not defined will be sent as `UNKNOWN`.
In addition, a `message` modifier can be passed which can contain placeholders
(based on Python's str.format) for other column names from the same query to add a message
dynamically to the service_check.
"""
# Do work in a separate function to avoid having to `del` a bunch of variables
status_map = _compile_service_check_statuses(modifiers)
message_field = modifiers.pop('message', None)
service_check_method = transformers['__service_check'](transformers, column_name, **modifiers) # type: Callable
def service_check(sources, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> None
check_status = status_map.get(value, ServiceCheck.UNKNOWN)
if not message_field or check_status == ServiceCheck.OK:
message = None
else:
message = message_field.format(**sources)
service_check_method(sources, check_status, message=message, **kwargs)
return service_check
tag¶
Convert a column to a tag that will be used in every subsequent submission.
For example, if you named the column env
and the column returned the value prod1
, all submissions from that row will be tagged by env:prod1
.
This also accepts an optional modifier called boolean
that when set to true
will transform the result to the string true
or false
. So for example if you named the column alive
and the result was the number 0
the tag will be alive:false
.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_tag(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
Convert a column to a tag that will be used in every subsequent submission.
For example, if you named the column `env` and the column returned the value `prod1`, all submissions
from that row will be tagged by `env:prod1`.
This also accepts an optional modifier called `boolean` that when set to `true` will transform the result
to the string `true` or `false`. So for example if you named the column `alive` and the result was the
number `0` the tag will be `alive:false`.
"""
template = '{}:{{}}'.format(column_name)
boolean = is_affirmative(modifiers.pop('boolean', None))
def tag(_, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> str
if boolean:
value = str(is_affirmative(value)).lower()
return template.format(value)
return tag
tag_list¶
Convert a column to a list of tags that will be used in every submission.
Tag name is determined by column_name
. The column value represents a list of values. It is expected to be either a list of strings, or a comma-separated string.
For example, if the column is named server_tag
and the column returned the value us,primary
, then all submissions for that row will be tagged by server_tag:us
and server_tag:primary
.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_tag_list(transformers, column_name, **modifiers):
# type: (Dict[str, Transformer], str, Any) -> Transformer
"""
Convert a column to a list of tags that will be used in every submission.
Tag name is determined by `column_name`. The column value represents a list of values. It is expected to be either
a list of strings, or a comma-separated string.
For example, if the column is named `server_tag` and the column returned the value `us,primary`, then all
submissions for that row will be tagged by `server_tag:us` and `server_tag:primary`.
"""
template = '%s:{}' % column_name
def tag_list(_, value, **kwargs):
# type: (List, str, Dict[str, Any]) -> List[str]
if isinstance(value, str):
value = [v.strip() for v in value.split(',')]
return [template.format(v) for v in value]
return tag_list
Extra¶
Every column transformer (except tag
) is supported at this level, the only difference being one must set a source
to retrieve the desired value.
So for example here:
columns:
- name: foo.bar
type: rate
extras:
- name: foo.current
type: gauge
source: foo.bar
the metric foo.current
will be sent as a gauge with the value of foo.bar
.
percent¶
Send a percentage based on 2 sources as a gauge
.
The required modifiers are part
and total
.
For example, if you have this configuration:
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: disk.utilized
type: percent
part: disk.used
total: disk.total
then the extra metric disk.utilized
would be sent as a gauge
calculated as disk.used / disk.total * 100
.
If the source of total
is 0
, then the submitted value will always be sent as 0
too.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_percent(transformers, name, **modifiers):
# type: (Dict[str, Callable], str, Any) -> Transformer
"""
Send a percentage based on 2 sources as a `gauge`.
The required modifiers are `part` and `total`.
For example, if you have this configuration:
```yaml
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: disk.utilized
type: percent
part: disk.used
total: disk.total
```
then the extra metric `disk.utilized` would be sent as a `gauge` calculated as `disk.used / disk.total * 100`.
If the source of `total` is `0`, then the submitted value will always be sent as `0` too.
"""
available_sources = modifiers.pop('sources')
part = modifiers.pop('part', None)
if part is None:
raise ValueError('the `part` parameter is required')
elif not isinstance(part, str):
raise ValueError('the `part` parameter must be a string')
elif part not in available_sources:
raise ValueError('the `part` parameter `{}` is not an available source'.format(part))
total = modifiers.pop('total', None)
if total is None:
raise ValueError('the `total` parameter is required')
elif not isinstance(total, str):
raise ValueError('the `total` parameter must be a string')
elif total not in available_sources:
raise ValueError('the `total` parameter `{}` is not an available source'.format(total))
del available_sources
gauge = transformers['gauge'](transformers, name, **modifiers)
gauge = create_extra_transformer(gauge)
def percent(sources, **kwargs):
gauge(sources, compute_percent(sources[part], sources[total]), **kwargs)
return percent
expression¶
This allows the evaluation of a limited subset of Python syntax and built-in functions.
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: disk.free
expression: disk.total - disk.used
submit_type: gauge
For brevity, if the expression
attribute exists and type
does not then it is assumed the type is expression
. The submit_type
can be any transformer and any extra options are passed down to it.
The result of every expression is stored, so in lieu of a submit_type
the above example could also be written as:
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: free
expression: disk.total - disk.used
- name: disk.free
type: gauge
source: free
The order matters though, so for example the following will fail:
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: disk.free
type: gauge
source: free
- name: free
expression: disk.total - disk.used
since the source free
does not yet exist.
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_expression(transformers, name, **modifiers):
# type: (Dict[str, Transformer], str, Dict[str, Any]) -> Transformer
"""
This allows the evaluation of a limited subset of Python syntax and built-in functions.
```yaml
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: disk.free
expression: disk.total - disk.used
submit_type: gauge
```
For brevity, if the `expression` attribute exists and `type` does not then it is assumed the type is
`expression`. The `submit_type` can be any transformer and any extra options are passed down to it.
The result of every expression is stored, so in lieu of a `submit_type` the above example could also be written as:
```yaml
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: free
expression: disk.total - disk.used
- name: disk.free
type: gauge
source: free
```
The order matters though, so for example the following will fail:
```yaml
columns:
- name: disk.total
type: gauge
- name: disk.used
type: gauge
extras:
- name: disk.free
type: gauge
source: free
- name: free
expression: disk.total - disk.used
```
since the source `free` does not yet exist.
"""
available_sources = modifiers.pop('sources')
expression = modifiers.pop('expression', None)
if expression is None:
raise ValueError('the `expression` parameter is required')
elif not isinstance(expression, str):
raise ValueError('the `expression` parameter must be a string')
elif not expression:
raise ValueError('the `expression` parameter must not be empty')
if not modifiers.pop('verbose', False):
# Sort the sources in reverse order of length to prevent greedy matching
available_sources = sorted(available_sources, key=lambda s: -len(s))
# Escape special characters, mostly for the possible dots in metric names
available_sources = list(map(re.escape, available_sources))
# Finally, utilize the order by relying on the guarantees provided by the alternation operator
available_sources = '|'.join(available_sources)
expression = re.sub(
SOURCE_PATTERN.format(available_sources),
# Replace by the particular source that matched
lambda match_obj: 'SOURCES["{}"]'.format(match_obj.group(1)),
expression,
)
expression = compile(expression, filename=name, mode='eval')
del available_sources
if 'submit_type' in modifiers:
if modifiers['submit_type'] not in transformers:
raise ValueError('unknown submit_type `{}`'.format(modifiers['submit_type']))
submit_method = transformers[modifiers.pop('submit_type')](transformers, name, **modifiers) # type: Transformer
submit_method = create_extra_transformer(submit_method) # type: Callable
def execute_expression(sources, **kwargs):
# type: (Dict[str, Any], Dict[str, Any]) -> float
result = eval(expression, ALLOWED_GLOBALS, {'SOURCES': sources}) # type: float
submit_method(sources, result, **kwargs)
return result
else:
def execute_expression(sources, **kwargs):
# type: (Dict[str, Any], Dict[str, Any]) -> Any
return eval(expression, ALLOWED_GLOBALS, {'SOURCES': sources})
return execute_expression
log¶
Send a log.
The only required modifier is attributes
.
For example, if you have this configuration:
columns:
- name: msg
type: source
- name: level
type: source
- name: time
type: source
- name: bar
type: source
extras:
- type: log
attributes:
message: msg
status: level
date: time
foo: bar
then a log will be sent with the following attributes:
message
: value of themsg
columnstatus
: value of thelevel
columndate
: value of thetime
columnfoo
: value of thebar
column
Source code in datadog_checks_base/datadog_checks/base/utils/db/transform.py
def get_log(transformers, name, **modifiers):
# type: (Dict[str, Callable], str, Any) -> Transformer
"""
Send a log.
The only required modifier is `attributes`.
For example, if you have this configuration:
```yaml
columns:
- name: msg
type: source
- name: level
type: source
- name: time
type: source
- name: bar
type: source
extras:
- type: log
attributes:
message: msg
status: level
date: time
foo: bar
```
then a log will be sent with the following attributes:
- `message`: value of the `msg` column
- `status`: value of the `level` column
- `date`: value of the `time` column
- `foo`: value of the `bar` column
"""
available_sources = modifiers.pop('sources')
attributes = _compile_log_attributes(modifiers, available_sources)
del available_sources
send_log = transformers['__send_log'](transformers, **modifiers)
send_log = create_extra_transformer(send_log)
def log(sources, **kwargs):
data = {attribute: sources[source] for attribute, source in attributes.items()}
if kwargs['tags']:
data['ddtags'] = ','.join(kwargs['tags'])
send_log(sources, data)
return log