import os
import tarfile
import zipfile
from .basefile import BaseFile
from ...errors.fe import FileContainerFileNotFound as CNF
from ...errors.fe import FileContainerFailedExtraction as CFE
[docs]class ContainerFile(BaseFile):
"""
The **CompressedFile** :py:class:`pynion.Multiton` is a file management object
created directly through the py:class:`pynion.File` factory.
Extends :py:class:`pynion.filesystem._filetypes.BaseFile`
It specifically manages compacted or compressed files with multiple files
within.
"""
def __init__(self, file_name, action, ctype):
super(ContainerFile, self).__init__(file_name, action)
self.ctype = ctype
if self.is_tarfile:
self.action = self.action + '|*'
############
# BOOLEANS #
############
@property
def is_gzipped(self):
"""
:return: Check if compression is gzip
:rtype: bool
"""
return self.ctype.endswith('gzip')
@property
def is_bzipped(self):
"""
:return: Check if compression is bzip
:rtype: bool
"""
return self.ctype.endswith('bzip')
@property
def is_zipped(self):
"""
:return: Check if compression is zip
:rtype: bool
"""
return self.ctype == 'zip'
@property
def is_tarfile(self):
"""
:return: Check if compression is tar
:rtype: bool
"""
return self.ctype.startswith('tar')
####################
# METHODS: ON FILE #
####################
[docs] def open(self):
"""
Open the file in the previously defined action type.
:rtype: self
"""
if self.is_open:
return self
if self.is_tarfile:
self._fd = tarfile.open(self.full)
if self.is_zipped:
self._fd = zipfile.ZipFile(self.full)
return self
[docs] def read_file(self, file_name):
"""
Get a specific file from the file bundle.
:param str file_name: Name of the query internal file.
:raise: :py:class:`pynion.errors.fe.FileContainerFileNotFound` if query
file is not in the bundle.
:rtype: str
"""
if self.is_zipped:
if file_name not in self._fd.namelist():
raise CNF(self.full, file_name)
return self._fd.read(file_name).split('\n')
if self.is_tarfile:
if file_name not in self._fd.getnames():
raise CNF(self.full, file_name)
return self._fd.extractfile(self._fd.getmember(file_name)).readlines()
[docs] def has_file(self, file_name):
"""
Query if file exists in the bundle.
:rtype: bool
"""
if self.is_zipped:
return file_name in self._fd.namelist()
if self.is_tarfile:
return file_name in self._fd.getnames()
[docs] def list_files(self):
"""
Name of all files in the bundle..
:rtype: list
"""
if self.is_tarfile: return self._fd.getnames()
if self.is_zipped: return self._fd.namelist()
[docs] def length(self):
"""
Number of lines in the file.
:rtype: int
"""
return len(self.list_files)
def __new__(self):
raise NotImplementedError
def __exit__(self, type, value, traceback):
raise NotImplementedError