Source code for pynion.main.executable

"""

@author: Jaume Bonet
@mail:   jaume.bonet@gmail.com
@date:   2013

@ [oliva's lab](http://sbi.imim.es)

"""

import os
import subprocess
import copy

from . import Manager
from ..errors.xe import ExecutableNoExistsError         as ENE
from ..errors.xe import ExecutableNotInPathError        as ENP
from ..errors.xe import ExecutablePermissionDeniedError as EPD

m = Manager()


[docs]class Executable(object): """ Manages the execution of external programs. """ def __init__(self, executable, path=None): """ :param str executable: name of the executable program :param str path: path to the executable. Not needed if the executable is in the ``$PATH`` environment variable. Default is :py:data:`None` :raise: :py:class:`pynion.errors.xe.ExecutableNoExistsError` if the executable does not exist :raise: :py:class:`pynion.errors.xe.ExecutableNotInPathError` if the path is :py:data:`None` and the executable is not in the ``$PATH`` environment variable. :raise: :py:class:`pynion.errors.xe.ExecutablePermissionDeniedrror` if executable has no execution permission """ self._exec = executable self._path = path if path is not None: self._path = os.path.abspath(path) else: found = self._load_executable_path() if not found: raise ENP(self._exec) self._check_executable() self._command = [] self._command.append(self.full_executable) self._backup_command = [] m.detail('Executable for {0} created.'.format(self.full_executable)) ############## # ATTRIBUTES # ############## @property def executable(self): """ Name of the executable. :rtype: str """ return self._exec @property def path(self): """ Path to the executable. :rtype: str """ return self._path @property def command(self): """ Full command to execute. :rtype: list """ return self._command @property def full_executable(self): """ Full executable with path. :rtype: str """ return os.path.join(self._path, self._exec) ########### # METHODS # ###########
[docs] def add_attribute(self, attribute_value, attribute_id=None): """ Adds a new parameter to the command. Specifically for parameters with 'tags' like '-i'. :param str attribute_value: value of the attribute to add :param str attribute_id: label of the attribute to add. By default is :py:data:`None`, which makes the function identical to :py:class:`pynion.Executable.add_parameter`. """ if attribute_id is None: self.add_parameter(attribute_value) else: self._command.append(attribute_id) self._command.append(str(attribute_value))
[docs] def add_parameter(self, parameter): """ Adds a new stand alone parameter to the command. :param str parameter: value of the parameter to add """ self._command.append(str(parameter))
[docs] def clean_command(self): """ Removes all attributes and parameters added to the command. """ self._command = [] self._command.append(self.full_executable)
[docs] def backup_command(self): """ Store a copy of the command up to that point to retrieve import afterwards. """ self._backup_command = copy.deepcopy(self._command)
[docs] def restore_command(self): """ Retrieve the backup command into the working command. The backup command is emptied. """ self._command = self._backup_command self._backup_command = []
[docs] def execute(self, silent=False): """ Executes the commands. :param bool silent: If :py:data:`True`, external program STDERR is shown through STDERR :raises: :py:class:`SystemError` if an error occurs in the external program. """ outPIPE = subprocess.PIPE if m._debug else open('/dev/null', 'w') errPIPE = open('/dev/null', 'w') if silent else subprocess.PIPE command = " ".join(self.command) m.info('\tExecuting:\t{0}'.format(command)) try: p = subprocess.Popen(self.command, stdout = outPIPE, stderr = errPIPE) if m._debug: for out in iter(p.stdout.readline, b''): m.info(out.strip()) p.communicate() except: raise SystemError()
################### # PRIVATE METHODS # ################### def _load_executable_path(self): """ Retrieves the executable path in case self._path is None. :return: :py:data:`True` if the path is found, :py:Data:`False` otherwise. :rtype: bool """ search = ["which", self.executable] p = subprocess.Popen(search, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if out != '': self._path = os.path.split(out.strip())[0] return True else: return False def _check_executable(self): if not os.path.isfile(self.full_executable): raise ENE(self.full_executable) if not os.access(self.full_executable, os.X_OK): raise EPD(self.full_executable) ################# # CLASS METHODS # ################# def __repr__(self): return " ".join(self._command) def __str__(self): return repr(self)