Source code for seqlog.structured_logging

# -*- coding: utf-8 -*-

import logging
import os
import socket
from datetime import datetime
from dateutil.tz import tzlocal
from queue import Queue
import requests

from .consumer import QueueConsumer

# Well-known keyword arguments used by the logging system.
_well_known_logger_kwargs = {"extra", "exc_info", "func", "sinfo"}

# Default global log properties.
_default_global_log_props = {
    "MachineName": socket.gethostname(),
    "ProcessId": os.getpid()
}

# Global properties attached to all log entries.
_global_log_props = _default_global_log_props


[docs]def get_global_log_properties(logger_name=None): """ Get the properties to be added to all structured log entries. :param logger_name: An optional logger name to be added to the log entry. :type logger_name: str :return: A copy of the global log properties. :rtype: dict """ global_log_properties = {key: value for (key, value) in _global_log_props.items()} if logger_name: global_log_properties["LoggerName"] = logger_name return global_log_properties
[docs]def set_global_log_properties(**properties): """ Configure the properties to be added to all structured log entries. :param properties: Keyword arguments representing the properties. :type properties: dict """ global _global_log_props _global_log_props = {key: value for (key, value) in properties.items()}
[docs]def reset_global_log_properties(): """ Initialize global log properties to their default values. """ global _global_log_props _global_log_props = _default_global_log_props
[docs]def clear_global_log_properties(): """ Remove all global properties. """ global _global_log_props _global_log_props = {}
[docs]class StructuredLogRecord(logging.LogRecord): """ An extended LogRecord that with custom properties to be logged to Seq. """ def __init__(self, name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, log_props=None, **kwargs): """ Create a new StructuredLogRecord. :param name: The name of the logger that produced the log record. :param level: The logging level (severity) associated with the logging record. :param pathname: The name of the file (if known) where the log entry was created. :param lineno: The line number (if known) in the file where the log entry was created. :param msg: The log message (or message template). :param args: Ordinal message format arguments (if any). :param exc_info: Exception information to be included in the log entry. :param func: The function (if known) where the log entry was created. :param sinfo: Stack trace information (if known) for the log entry. :param log_props: Named message format arguments (if any). :param kwargs: Keyword (named) message format arguments. """ super().__init__(name, level, pathname, lineno, msg, args, exc_info, func, sinfo, **kwargs) self.log_props = log_props or {} if self.thread and "ThreadId" not in self.log_props: self.log_props["ThreadId"] = self.thread if self.threadName and "ThreadName" not in self.log_props: self.log_props["ThreadName"] = self.threadName
[docs] def getMessage(self): """ Get a formatted message representing the log record (with arguments replaced by values as appropriate). :return: The formatted message. """ if self.args: return self.msg % self.args elif self.log_props: return self.msg.format(**self.log_props) else: return self.msg
[docs]class StructuredLogger(logging.Logger): """ Custom (dummy) logger that understands named log arguments. """ def __init__(self, name, level=logging.NOTSET): """ Create a new StructuredLogger :param name: The logger name. :param level: The logger minimum level (severity). """ super().__init__(name, level) def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, **kwargs): """ Called by public logger methods to generate a log entry. :param level: The level (severity) for the log entry. :param msg: The log message or message template. :param args: Ordinal arguments for the message format template. :param exc_info: Exception information to be included in the log entry. :param extra: Extra information to be included in the log entry. :param stack_info: Include stack-trace information in the log entry? :param kwargs: Keyword arguments (if any) passed to the public logger method that called _log. """ # Slightly hacky: # # We take keyword arguments provided to public logger methods (except # well-known ones used by the logging system itself) and move them # into the `extra` argument as a sub-dictionary. # Start off with a copy of the global log properties. log_props = get_global_log_properties(self.name) # Add supplied keyword arguments. for prop in kwargs.keys(): if prop in _well_known_logger_kwargs: continue log_props[prop] = kwargs[prop] extra = extra or {} extra['log_props'] = log_props super()._log(level, msg, args, exc_info, extra, stack_info)
[docs] def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): """ Create a LogRecord. :param name: The name of the logger that produced the log record. :param level: The logging level (severity) associated with the logging record. :param fn: The name of the file (if known) where the log entry was created. :param lno: The line number (if known) in the file where the log entry was created. :param msg: The log message (or message template). :param args: Ordinal message format arguments (if any). :param exc_info: Exception information to be included in the log entry. :param func: The function (if known) where the log entry was created. :param extra: Extra information (if any) to add to the log record. :param sinfo: Stack trace information (if known) for the log entry. """ # Do we have named format arguments? if extra and 'log_props' in extra: return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props']) return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
[docs]class StructuredRootLogger(logging.RootLogger): """ Custom root logger that understands named log arguments. """ def __init__(self, level=logging.NOTSET): """ Create a `StructuredRootLogger`. """ super().__init__(level) def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, **kwargs): """ Called by public logger methods to generate a log entry. :param level: The level (severity) for the log entry. :param msg: The log message or message template. :param args: Ordinal arguments for the message format template. :param exc_info: Exception information to be included in the log entry. :param extra: Extra information to be included in the log entry. :param stack_info: Include stack-trace information in the log entry? :param kwargs: Keyword arguments (if any) passed to the public logger method that called _log. """ # Slightly hacky: # # We take keyword arguments provided to public logger methods (except # well-known ones used by the logging system itself) and move them # into the `extra` argument as a sub-dictionary. log_props = get_global_log_properties(self.name) for prop in kwargs.keys(): if prop in _well_known_logger_kwargs: continue log_props[prop] = kwargs[prop] extra = extra or {} extra['log_props'] = log_props super()._log(level, msg, args, exc_info, extra, stack_info)
[docs] def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): """ Create a `LogRecord`. :param name: The name of the logger that produced the log record. :param level: The logging level (severity) associated with the logging record. :param fn: The name of the file (if known) where the log entry was created. :param lno: The line number (if known) in the file where the log entry was created. :param msg: The log message (or message template). :param args: Ordinal message format arguments (if any). :param exc_info: Exception information to be included in the log entry. :param func: The function (if known) where the log entry was created. :param extra: Extra information (if any) to add to the log record. :param sinfo: Stack trace information (if known) for the log entry. """ # Do we have named format arguments? if extra and 'log_props' in extra: return StructuredLogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo, extra['log_props']) return super().makeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
[docs]class ConsoleStructuredLogHandler(logging.Handler): def __init__(self): super().__init__()
[docs] def emit(self, record): msg = self.format(record) print(msg) if hasattr(record, 'kwargs'): print("\tLog entry properties: {}".format(repr(record.kwargs)))
[docs]class SeqLogHandler(logging.Handler): """ Log handler that posts to Seq. """ def __init__(self, server_url, api_key=None, batch_size=10, auto_flush_timeout=None): """ Create a new `SeqLogHandler`. :param server_url: The Seq server URL. :param api_key: The Seq API key (if any). :param batch_size: The number of messages to batch up before posting to Seq. :param auto_flush_timeout: If specified, the time (in seconds) before the current batch is automatically flushed. """ super().__init__() self.server_url = server_url if not self.server_url.endswith("/"): self.server_url += "/" self.server_url += "api/events/raw" self.session = requests.Session() if api_key: self.session.headers["X-Seq-ApiKey"] = api_key self.log_queue = Queue() self.consumer = QueueConsumer( name="SeqLogHandler", queue=self.log_queue, callback=self.publish_log_batch, batch_size=batch_size, auto_flush_timeout=auto_flush_timeout ) self.consumer.start()
[docs] def flush(self): try: self.consumer.flush() finally: super().flush()
[docs] def emit(self, record): """ Emit a log record. :param record: The LogRecord. """ self.log_queue.put(record, block=False)
[docs] def close(self): """ Close the log handler. """ try: if self.consumer.is_running: self.consumer.stop() # TODO: Implement QueueConsumer.join() so we can wait # for processing to complete before closing the HTTP session # self.consumer.join() self.session.close() finally: super().close()
[docs] def publish_log_batch(self, batch): """ Publish a batch of log records. :param batch: A list representing the batch. """ if len(batch) == 0: return request_body = { "Events": [ _build_event_data(record) for record in batch ] } self.acquire() try: response = self.session.post( self.server_url, json=request_body, stream=True # prevent '362' ) response.raise_for_status() except requests.RequestException: # Only notify for the first record in the batch, or we'll be generating too much noise. self.handleError(batch[0]) finally: self.release()
def _build_event_data(record): """ Build an event data dictionary from the specified log record for submission to Seq. :param record: The LogRecord. :type record: StructuredLogRecord :return: A dictionary containing event data representing the log record. :rtype: dict """ if record.args: # Standard (unnamed) format arguments (use 0-base index as property name). log_props_shim = get_global_log_properties(record.name) for (arg_index, arg) in enumerate(record.args or []): log_props_shim[str(arg_index)] = arg event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.getMessage(), "Properties": log_props_shim } elif isinstance(record, StructuredLogRecord): # Named format arguments (and, therefore, log event properties). event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.msg, "Properties": record.log_props } else: # No format arguments; interpret message as-is. event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.getMessage(), "Properties": _global_log_props } return event_data def _get_local_timestamp(record): """ Get the record's UTC timestamp as an ISO-formatted date / time string. :param record: The LogRecord. :type record: StructuredLogRecord :return: The ISO-formatted date / time string. :rtype: str """ timestamp = datetime.fromtimestamp( timestamp=record.created, tz=tzlocal() ) return timestamp.isoformat(sep=' ')