"""
@author: Jaume Bonet
@mail: jaume.bonet@gmail.com
@date: 2014
@ [oliva's lab](http://sbi.imim.es)
"""
import atexit
import ConfigParser
import datetime
import inspect
import json
import logging
import os
import sys
import time
import traceback
import warnings
from ..metaclass import Singleton
from ._inner import Project
from ._inner import Experiment
[docs]class Manager(object):
"""The Manager class is a :py:class:`Singleton <pynion.Singleton>` that crosses
through the entire library. It is the main controller of the user's preferences.
"""
__metaclass__ = Singleton
_GENERAL_FORMAT = '%(asctime)s - %(levelname)-7.7s - %(message)s'
_TIME_FORMAT = '%Y-%m-%d %H:%M'
_FRMT = logging.Formatter(_GENERAL_FORMAT, _TIME_FORMAT)
_MSSG = '[ {0} ] - {1}'
def __init__(self):
""".. py:method:: __init__()
The class is instantiated at some point during the load of the library's
code. Thus, no parameters are passed to it. As any new instantiation will
only call the initial instance, parameters cannot be passed afterward.
Although any attribute can be accessed through its given *setter*, default
values can be assigned to the first initialization of **Manager** through
a configuration file. The configuration file should have the following
parameters:
.. literalinclude:: /../../pynion/config/default.settings
The user's configuration file must then be linked to a system variable
named ``PYNION_CONFIG_PY``. Thus, the configuration file can be setted
globally for all executions directly from bash, ::
export PYNION_CONFIG_PY=user.settings
Or specifically to a script by ::
import os
os.environ["PYNION_CONFIG_PY"] = 'user.settings'
**BEFORE** importing the **Pynion** library.
Regarding the different execution parameters in the configuration file:
:param bool stdout: Create a :py:class:`logging.StreamHandler` for standard
output.
:param bool verbose: Activate level *verbose* of logging report.
:param bool debug: Activate level *debug* of logging report. It also
forces the activation of *verbose*.
:param bool detail: Activate level *detail* of logging report. It also
forces the activation of *debug*, *verbose* and *unclean*.
:param bool overwrite: When :py:data:`True`, allows overwriting existing files.
:param bool unclean: When :py:data:`True`, avoids the deletion of
temporary and empty files at the end of the execution.
:param bool logfile: When defined, it creates a
:py:class:`logging.StreamHandler` to a file with the provided name.
If ``logfile = default`` or a directory, it creates a logfile with
a predefined name that includes the name of the execution and the
pid of the process.
Regarding the project parameters in the configuration file:
**(TODO)**
"""
# Logger parameters: Level of detail
self._verbose = False # Requires setter
self._debug = False # Requires setter
self._detail = False # Requires setter
# Management parameters:
self._tempfiles = set() # Requires setter
self._clean = True # Clean the temporary files
# IO conditions:
self._overwrite = False # Requires setter
# Create a logger.
# Null handler is added so that if no handler is active
# warnings and errors will not display a 'handler not found' message
self._stdout = False
self._fd = logging.getLogger(__name__)
self._fd.setLevel(logging.DEBUG)
self._fd.addHandler(logging.NullHandler())
# Project and Experiment
rfname, cfname, pfname = self._configuration()
self.project = Project(rfname, cfname, pfname)
try:
self.experiment = Experiment()
except:
self.exception(['Bash command could not be imported.',
'System needs to be UNIX based'])
# Citation Manager
self.citations = set()
# Register function to execute at exit
atexit.register(self.shutdown)
atexit.register(self.cleanup)
####################
# METHODS: SETTERS #
####################
[docs] def set_verbose(self):
"""Activate level *verbose* of logging report"""
self._verbose = True
[docs] def set_debug(self):
"""Activate level *debug* of logging report. It also forces the
activation of *verbose*."""
self.set_verbose()
self._debug = True
[docs] def set_detail(self):
"""Activate level *detail* of logging report. It also forces the
activation of *debug*, *verbose* and *unclean*."""
self._detail = True
self.set_unclean()
self.set_debug()
[docs] def set_unclean(self):
"""Avoids the deletion of temporary and empty
files at the end of the execution."""
self._clean = False
[docs] def set_stdout(self):
"""Create a :py:class:`logging.StreamHandler` for standard output."""
if self._stdout: return
handler = logging.StreamHandler()
handler.setFormatter(self._FRMT)
self._fd.addHandler(handler)
self.info('Active STDOUT')
self._stdout = True
[docs] def set_overwrite(self):
"""Allows overwriting existing files."""
self._overwrite = True
[docs] def set_logfile(self, logname = os.getcwd()):
"""Creates a :py:class:`logging.StreamHandler` to a file.
:param str logname: Name of the output logging file. If logname is a
directory, it will create a logging file in that directory named by
the name of the execution and its pid. Default value is current
working directory.
"""
if os.path.isdir(logname):
script_name = os.path.split(os.path.splitext(sys.argv[0])[0])[1]
if script_name == '__main__':
script_name = os.path.split(os.path.split(sys.argv[0])[0])[1]
log_file = ".".join([script_name, str(os.getpid()), 'log'])
logname = os.path.join(logname, log_file)
self.info('LOGfile: {0}'.format(logname))
handler = logging.FileHandler(filename = logname)
handler.setFormatter(self._FRMT)
self._fd.addHandler(handler)
############
# BOOLEANS #
############
[docs] def is_verbose(self):
"""Assess the status of the *verbose* logging level"""
return self._verbose
[docs] def is_debug(self):
"""Assess the status of the *debug* logging level"""
return self._debug
[docs] def is_detail(self):
"""Assess the status of the *detail* logging level"""
return self._detail
###########
# METHODS #
###########
[docs] def add_temporary_file(self, tempfile):
"""Register a new temporary file.
:param str tempfile: Name of the temporary file.
"""
self.info('Registering temporary file {0}'.format(tempfile))
self._tempfiles.add(tempfile)
[docs] def add_experiment_file(self, filename, action):
"""Register a new experiment file.
:param str filename: Name of the experiment file.
:param str action: Open mode of the registered file ('r', 'w', 'a')
"""
self.experiment.add_file(filename, action)
[docs] def add_citation(self, citation):
"""Adds a new citation from some code or other to be printed at the
end of the execution.
:param str citation: Citation to strore
"""
self.citations.add(citation)
[docs] def countdown(self, max_time):
"""Generate a STDERR printed countdown when needed to wait for something.
:param int max_time: Time to wait, in seconds.
"""
t = str(datetime.timedelta(seconds=max_time))
n = time.localtime()
s1 = 'Waiting for: {0} hours'.format(t)
s2 = 'Wait started at {0}'.format(time.strftime('%X', n))
s3 = 'on {0}'.format(time.strftime('%Y-%m-%d', n))
sys.stderr.write('{0}\t{1} {2}\n\n'.format(s1, s2, s3))
while max_time > 0:
t = str(datetime.timedelta(seconds=max_time))
sys.stderr.write('Remaining: {0} hours'.format(t))
time.sleep(1)
max_time -= 1
if bool(max_time):
sys.stderr.write('\r')
else:
sys.stderr.write('\r')
t = str(datetime.timedelta(seconds=max_time))
sys.stderr.write('Remaining: {0} hours'.format(t))
sys.stderr.write('\n')
[docs] def evaluate_overwrite(self, overwrite = None):
"""Given a overwrite command, it evaluates it with the global
overwrite configuration.
:param bool overwrite: Particular overwrite status. Default is :py:data:`None`
:return: Final overwrite status
:rtype: bool
"""
return self._overwrite if overwrite is None else overwrite
####################
# METHODS: LOGGING #
####################
[docs] def info(self, mssg):
"""Print *verbose* level information.
:param str mssg: Message to relay through logging. If it is :py:data:`list`,
each position is treated as a new line.
"""
if not self._verbose:
return
callerID = self._caller(inspect.stack()[1][0])
for line in self._message_to_array(callerID, mssg):
self._fd.info(line)
[docs] def debug(self, mssg):
"""Print *debug* level information.
:param str mssg: Message to relay through logging. If it is :py:data:`list`,
each position is treated as a new line.
"""
if not self._debug:
return
callerID = self._caller(inspect.stack()[1][0])
for line in self._message_to_array(callerID, mssg):
self._fd.debug(line)
[docs] def detail(self, mssg):
"""Print *detail* level information.
:param str mssg: Message to relay through logging. If it is :py:data:`list`,
each position is treated as a new line.
"""
if not self._detail:
return
callerID = self._caller(inspect.stack()[1][0])
for line in self._message_to_array(callerID, mssg):
self._fd.debug(line)
[docs] def warning(self, mssg):
"""Print *warning*.
:param str mssg: Message to relay through logging. If it is :py:data:`list`,
each position is treated as a new line.
"""
callerID = self._caller(inspect.stack()[1][0])
for line in self._message_to_array(callerID, mssg):
# If we have no handler added, we warn through the warnings module
if len(self._fd.handlers) > 1:
self._fd.warning(line)
else:
warnings.warn(line + '\n')
[docs] def exception(self, mssg):
"""Print *exceptions* and quit.
:param str mssg: Message to relay through logging. If it is :py:data:`list`,
each position is treated as a new line.
"""
callerID = self._caller(inspect.stack()[1][0])
for line in self._message_to_array(callerID, mssg):
if len(self._fd.handlers) > 1:
self._fd.exception(line)
else:
sys.stderr.write(line + '\n')
traceback.print_tb(sys.exc_info()[2])
os._exit(0) # This exit avoids atexit calls.
####################
# METHODS: AT EXIT #
####################
def cleanup(self):
if self._clean:
for tfile in self._tempfiles:
if os.path.isfile(tfile):
os.unlink(tfile)
self.info('Temporary file {0} removed.'.format(tfile))
for efile in self.experiment.clean_empty_files():
self.info('Empty file {0} removed.'.format(efile))
self._tempfiles = set()
def shutdown(self):
self.experiment.end = time.time()
self.experiment.calculate_duration()
self._write_to_pipeline()
info = 'Elapsed time: {0}'.format(self.experiment.duration)
if not self._verbose: self.set_verbose()
if not self._stdout: self.set_stdout()
for _ in self.citations:
self._fd.info('[ REFERENCE!! ]: -- {0}'.format(_))
self._fd.info('[ SUCCESS!! ]: -- {0}'.format(info))
self._fd.info('[ SUCCESS!! ]: -- Program ended as expected.')
logging.shutdown()
###################
# PRIVATE METHODS #
###################
def _caller(self, caller):
if inspect.getmodule(caller) is not None:
callerID = inspect.getmodule(caller).__name__
else:
callerID = 'Terminal'
if callerID is '__main__':
callerID = inspect.getmodule(caller).__file__
callerID = os.path.split(os.path.split(callerID)[0])[-1]
return callerID.upper()
def _message_to_array(self, callerID, mssg):
if isinstance(mssg, basestring):
mssg = [mssg, ]
for line in mssg:
yield self._MSSG.format(callerID, str(line))
def _write_to_pipeline(self):
if self.project.is_active:
data = []
with open(self.project.pipeline_file, 'r') as line:
l = line.read().strip()
if len(l) > 0:
data = json.loads(l)
data.append(self.experiment.to_dict())
with open(self.project.pipeline_file, 'w') as fd:
fd.write(json.dumps(data))
def _configuration(self):
dfile = '../config/default.settings'
ufile = '../config/user.settings'
default = os.path.join(os.path.dirname(__file__), dfile)
default = os.path.normpath(default)
user = os.path.join(os.path.dirname(__file__), ufile)
user = os.path.normpath(user)
user = os.getenv('PYNION_CONFIG_PY', user)
parse = ConfigParser.RawConfigParser(allow_no_value=True)
parse.readfp(open(default))
parse.read(user)
manager_opt = ['stdout', 'verbose',
'debug', 'detail',
'overwrite', 'unclean']
for opt in manager_opt:
func = getattr(self, 'set_' + opt)
if parse.getboolean('manager', opt):
self.info('Setting up {0} mode'.format(opt))
func()
logfile = parse.get('manager', 'logfile')
if logfile is not None and logfile != '':
if logfile.lower() == 'default':
self.set_logfile()
else:
self.set_logfile(logfile)
return [parse.get('project', 'name'),
parse.get('project', 'config'),
parse.get('project', 'pipeline')]