Skip to content

Log Crawlers

Overview

Some systems expose their logs from HTTP endpoints instead of files that the Logs Agent can tail. In such cases, you can create an Agent integration to crawl the endpoints and submit the logs.

The following diagram illustrates how crawling logs integrates into the Datadog Agent.

graph LR
    subgraph "Agent Integration (you write this)"
    A[Log Stream] -->|Log Records| B(Log Crawler Check)
    end
    subgraph Agent
    B -->|Save Logs| C[(Log File)]
    D(Logs Agent) -->|Tail Logs| C
    end
    D -->|Submit Logs| E(Logs Intake)

Interface

datadog_checks.base.checks.logs.crawler.base.LogCrawlerCheck

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/base.py
class LogCrawlerCheck(AgentCheck, ABC):
    @abstractmethod
    def get_log_streams(self) -> Iterable[LogStream]:
        """
        Yields the log streams associated with this check.
        """

    def process_streams(self) -> None:
        """
        Process the log streams and send the collected logs.

        Crawler checks that need more functionality can implement the `check` method and call this directly.
        """
        for stream in self.get_log_streams():
            last_cursor = self.get_log_cursor(stream.name)
            for record in stream.records(cursor=last_cursor):
                self.send_log(record.data, cursor=record.cursor, stream=stream.name)

    def check(self, _) -> None:
        self.process_streams()

get_log_streams() abstractmethod

Yields the log streams associated with this check.

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/base.py
@abstractmethod
def get_log_streams(self) -> Iterable[LogStream]:
    """
    Yields the log streams associated with this check.
    """

process_streams()

Process the log streams and send the collected logs.

Crawler checks that need more functionality can implement the check method and call this directly.

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/base.py
def process_streams(self) -> None:
    """
    Process the log streams and send the collected logs.

    Crawler checks that need more functionality can implement the `check` method and call this directly.
    """
    for stream in self.get_log_streams():
        last_cursor = self.get_log_cursor(stream.name)
        for record in stream.records(cursor=last_cursor):
            self.send_log(record.data, cursor=record.cursor, stream=stream.name)

check(_)

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/base.py
def check(self, _) -> None:
    self.process_streams()

datadog_checks.base.checks.logs.crawler.stream.LogStream

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/stream.py
class LogStream(ABC):
    def __init__(self, *, check: AgentCheck, name: str):
        self.__check = check
        self.__name = name

    @property
    def check(self) -> AgentCheck:
        """
        The AgentCheck instance associated with this LogStream.
        """
        return self.__check

    @property
    def name(self) -> str:
        """
        The name of this LogStream.
        """
        return self.__name

    def construct_tags(self, tags: list[str]) -> list[str]:
        """
        Returns a formatted string of tags which may be used directly as the `ddtags` field of logs.
        This will include the `tags` from the integration instance config.
        """
        formatted_tags = ','.join(tags)
        return f'{self.check.formatted_tags},{formatted_tags}' if self.check.formatted_tags else formatted_tags

    @abstractmethod
    def records(self, *, cursor: dict[str, Any] | None = None) -> Iterable[LogRecord]:
        """
        Yields log records as they are received.
        """

records(*, cursor=None) abstractmethod

Yields log records as they are received.

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/stream.py
@abstractmethod
def records(self, *, cursor: dict[str, Any] | None = None) -> Iterable[LogRecord]:
    """
    Yields log records as they are received.
    """

__init__(*, check, name)

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/stream.py
def __init__(self, *, check: AgentCheck, name: str):
    self.__check = check
    self.__name = name

datadog_checks.base.checks.logs.crawler.stream.LogRecord

Source code in datadog_checks_base/datadog_checks/base/checks/logs/crawler/stream.py
class LogRecord:
    __slots__ = ('cursor', 'data')

    def __init__(self, data: dict[str, str], *, cursor: dict[str, Any] | None):
        self.data = data
        self.cursor = cursor

Last update: September 4, 2024