# -*- coding: utf-8 -*-

# Copyright 2018, IBM.
# This source code is licensed under the Apache License, Version 2.0 found in
# the LICENSE.txt file in the root directory of this source tree.

# pylint: disable=invalid-name

Ignis Logging

import logging
import logging.handlers
import logging.config
from logging import Logger
import os
import glob
from datetime import datetime
import re
from typing import List, Union, Optional

[docs]class IgnisLogger(logging.getLoggerClass()): """ A logger class for Ignis IgnisLogger is a like any other :class:`logging.Logger` object except it has an additional method, :meth:`log_to_file`, used to log data in the form of key:value pairs to a log file. Logging configuration is performed via a configuration file and is handled by IgnisLogging. Refer to Python's logging documentation for more details on how to use logging in Python """ def __init__(self, name: str, level: Optional[int] = logging.NOTSET): """ Initialize the IgnisLogger object Args: name: name of the logger. Usually set to package name using __name__ level(logging.NOTSET): Verbosity level (use logging package enums) """ Logger.__init__(self, name, level) self._file_logging_enabled = False self._file_handler = None self._stream_handler = None self._conf_file_exists = False self._warning_omitted = False
[docs] def configure(self, sh: logging.StreamHandler, conf_file_exists: bool): """ Internal configuration method of IgnisLogger. Should only be called by IgnisLogger Args: sh: StreamHandler object conf_file_exists: Whether or not a file config exists """ self._stream_handler = sh self.addHandler(sh) self._conf_file_exists = conf_file_exists
[docs] def log_to_file(self, **kwargs): """ Log key:value pairs to a log file. Note: Logger name in the log file is fixed (ignis_logging) Args: kwargs: key/value pairs to be logged, e.g t1=0.02, qubits=[1,2,4] """ if not self._file_logging_enabled: if not self._warning_omitted: # Omitting this warning only once msg = "File logging is disabled" if not self._conf_file_exists: msg += ": no config file" logger = logging.getLogger(__name__) logger.warning(msg) self._warning_omitted = True return # We defer setting up the file handler, since its __init__ method # has the side effect of creating the file if self._file_handler is None: self._file_handler = IgnisLogging().get_file_handler() assert(self._file_handler is not None), "file_handler is not set" Logger.removeHandler(self, self._stream_handler) Logger.addHandler(self, self._file_handler) logstr = "" for k, v in kwargs.items(): logstr += "'{}':'{}' ".format(k, v) Logger.log(self, 100, logstr) Logger.removeHandler(self, self._file_handler) Logger.addHandler(self, self._stream_handler)
[docs] def enable_file_logging(self): """ Enable file logging for this logger object (note there is a single object for a given logger name """ self._file_logging_enabled = True
[docs] def disable_file_logging(self): """ Disable file logging for this logger object (note there is a single object for a given logger name """ self._file_logging_enabled = False
[docs]class IgnisLogging: """Singleton class to configure file logging via IgnisLogger Logging to file is enabled only if there is a config file present. Otherwise IgnisLogger will behave as a regular logger. Config file is assumed to be in <user home>/.qiskit/logging.yaml **Config file fields:** file_logging: {true/false} - Specifies whether file logging is enabled log_file: <path> - path to the log file. If not specified, ignis.log will be used max_size: <# bytes> - maximum size limit for a given log file. If not specified file size is unlimited max_rotations: <count> - maximum number of log files to rotate (oldest file is deleted in case count is reached) """ # TODO: Should we allow to override file settings programmatically ? # (e.g. enable logging) _instance = None _file_logging_enabled = False _log_file = None _max_bytes = 0 _max_rotations = 0 _log_label = "ignis_logging" _default_datefmt = '%Y/%m/%d %H:%M:%S' _config_file_exists = False # Making the class a Singleton def __new__(cls): if IgnisLogging._instance is None: IgnisLogging._instance = object.__new__(cls) IgnisLogging._initialize() return IgnisLogging._instance @staticmethod def _load_config_file(): """ Load and parse the config file Returns: dict: A dictionary containing all the settings """ config_file_path = os.path.join(os.path.expanduser('~'), ".qiskit", "logging.yaml") config = dict() if os.path.exists(config_file_path): with open(config_file_path, "r") as log_file: for line in log_file: # removing comments line = line[:line.find('#') if "#" in line else None] line = line.split(':') # Splitting to key value if len(line) < 2: continue config[line[0].strip().lower()] = line[1].strip().lower() IgnisLogging._config_file_exists = True return config @staticmethod def _initialize(): """ Initialize the logging facility for Ignis """ logging.setLoggerClass(IgnisLogger) log_config = IgnisLogging._load_config_file() # Reading the config file content IgnisLogging._file_logging_enabled = \ log_config.get('file_logging') == "true" IgnisLogging._log_file = log_config.get('log_file') if \ log_config.get('log_file') is not None else "ignis.log" max_size = log_config.get('max_size') IgnisLogging._max_bytes = int(max_size) if \ max_size is not None and max_size.isdigit() else 0 max_rotations = log_config.get('max_rotations') IgnisLogging._max_rotations = int(max_rotations) if \ max_rotations is not None and max_rotations.isdigit() else 0
[docs] def get_logger(self, name: str) -> IgnisLogger: """ Return an IgnisLogger object To be used in by the code which needs logging. Args: name: Name of the module being logged Returns: An IgnisLogger object """ logger = logging.getLogger(name) assert(isinstance(logger, IgnisLogger)), \ "IgnisLogger class was not registered" self._configure_logger(logger) return logger
[docs] def get_file_handler(self): """ Configure and retrieve the RotatingFileHandler object. Called on demand the first time IgnisLoggers needs to write to a file Returns: RotatingFileHandler: The configured RotatingFileHandler object """ # Configuring the file handling aspect fh = logging.handlers.RotatingFileHandler( IgnisLogging._log_file, maxBytes=IgnisLogging._max_bytes, backupCount=IgnisLogging._max_rotations) # Formatting formatter = logging.Formatter( '%(asctime)s {} %(message)s'.format(IgnisLogging._log_label), datefmt=IgnisLogging._default_datefmt) fh.setFormatter(formatter) return fh
def _configure_logger(self, logger): """ Configure the stream handler of the logger Args: logger (Logger): the logger to be configured """ # Configuring the stream handler sh = logging.StreamHandler() sh.setLevel(logging.NOTSET) stream_fmt = logging.Formatter('%(levelname)s: %(name)s - %(message)s') sh.setFormatter(stream_fmt) # This will enable limiting file size and rotating once file size # is exhausted logger.configure(sh, IgnisLogging._config_file_exists) if IgnisLogging._file_logging_enabled: logger.enable_file_logging()
[docs] def get_log_file(self) -> str: """ Get the name of the log file Returns: Name of the log file """ return IgnisLogging._log_file
[docs] def default_datetime_fmt(self) -> str: """ Get the default date time format used for writing log entries Returns: Default date time format """ return IgnisLogging._default_datefmt
[docs]class IgnisLogReader: """ Class to read from Ignis log files Reads and constructs tabular representation of logged data based on date/time and key criteria """
[docs] def get_log_files(self) -> List[str]: """ Get Names of all log files (several may be present due to logging file rotation). File names are sorted by modification time. Returns: list: list of all log file names """ file_name = IgnisLogging().get_log_file() search_path = os.path.abspath(file_name + "*") files = sorted(glob.glob(search_path), key=os.path.getmtime) result = list() m = re.compile( os.path.abspath(file_name).replace('\\', r'\\') + r"$|" + os.path.abspath(file_name).replace('\\', r'\\') + r".\d+$") for f in files: if m.match(f): result.append(f) return result
[docs] def read_values(self, log_files: Optional[List[str]] = None, keys: Optional[List[str]] = None, from_datetime: Optional[str] = None, from_datetime_format: Optional[Union[str, datetime]] = None, to_datetime: Optional[str] = None, to_datetime_format: Optional[Union[str, datetime]] = None)\ -> List[List[str]]: """ Retrieve log lines using key and date/time filtering criteria Params: log_files: List of log files to read from keys: Retrieve only key value pairs of corresponding to keys A row with no matching keys will not be retrieved. If not specified, all keys are retrieved (optional) from_datetime(None): Retrieve only rows newer than the given date and time from_datetime_format(None): datetime format string. If not specified will assume "%Y/%m/%d %H:%M:%S" to_datetime(None): Retrieve only rows older than the given date and time to_datetime_format(None): datetime format string. If not specified will assume "%Y/%m/%d %H:%M:%S" Returns: list: A list containing the retrieved rows of key pair values """ if log_files is not None: files = [log_files] if isinstance(log_files, str) else log_files else: files = self.get_log_files() retrieved_date = list() for file in files: with open(file, "r") as f: for line in f: terms = line.split() date_time = terms[0:2] dt_filterd = self._filter_by_datetime(date_time, from_datetime, from_datetime_format, to_datetime, to_datetime_format) if dt_filterd: continue key_values = terms[3:] if keys is not None: key_values = self._filter_keys(key_values, keys) if not key_values: continue retrieved_date.append(date_time + key_values) return retrieved_date
def _filter_keys(self, key_values, keys): """ Retrieve key value pairs matching the given keys Params: key_values (list): list of key value pairs keys (list): list of keys to retrieve key value pair of Returns: list: A list of key value pairs according to keys """ result = list() assert(isinstance(key_values, list)), "key_values is not a list" for kv in key_values: if kv.split(":")[0].strip("'") in keys: result.append(kv) return result def _filter_by_datetime(self, row_datetime, from_dt, from_dt_fmt, to_dt, to_dt_fmt): """ Determine whether the given datetime should be filtered Params: row_datetime: the date/time in question from_dt: starting date/time from_dt_fmt: format of the starting date/time to_dt: ending date/time to_dt_fmt: format of the ending date/time Returns: bool: True if the row should be filtered out Raises: ValueError: if invalid date time fields are passed in """ if from_dt is not None and not isinstance(from_dt, datetime): try: if from_dt_fmt is None: from_dt_fmt = IgnisLogging().default_datetime_fmt() from_dt = datetime.strptime(from_dt, from_dt_fmt) except ValueError as ve: raise ve if to_dt is not None and not isinstance(to_dt, datetime): try: if to_dt_fmt is None: to_dt_fmt = IgnisLogging().default_datetime_fmt() to_dt = datetime.strptime(to_dt, to_dt_fmt) except ValueError as ve: raise ve if from_dt is None and to_dt is None: return False row_datetime = datetime.strptime("%s %s" % (row_datetime[0], row_datetime[1]), IgnisLogging().default_datetime_fmt()) if from_dt is not None: if row_datetime < from_dt: return True if to_dt is not None: if row_datetime > to_dt: return True return False