diff options
Diffstat (limited to 'cffi/verifier.py')
-rw-r--r-- | cffi/verifier.py | 307 |
1 files changed, 0 insertions, 307 deletions
diff --git a/cffi/verifier.py b/cffi/verifier.py deleted file mode 100644 index a500c78..0000000 --- a/cffi/verifier.py +++ /dev/null @@ -1,307 +0,0 @@ -# -# DEPRECATED: implementation for ffi.verify() -# -import sys, os, binascii, shutil, io -from . import __version_verifier_modules__ -from . import ffiplatform -from .error import VerificationError - -if sys.version_info >= (3, 3): - import importlib.machinery - def _extension_suffixes(): - return importlib.machinery.EXTENSION_SUFFIXES[:] -else: - import imp - def _extension_suffixes(): - return [suffix for suffix, _, type in imp.get_suffixes() - if type == imp.C_EXTENSION] - - -if sys.version_info >= (3,): - NativeIO = io.StringIO -else: - class NativeIO(io.BytesIO): - def write(self, s): - if isinstance(s, unicode): - s = s.encode('ascii') - super(NativeIO, self).write(s) - - -class Verifier(object): - - def __init__(self, ffi, preamble, tmpdir=None, modulename=None, - ext_package=None, tag='', force_generic_engine=False, - source_extension='.c', flags=None, relative_to=None, **kwds): - if ffi._parser._uses_new_feature: - raise VerificationError( - "feature not supported with ffi.verify(), but only " - "with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,)) - self.ffi = ffi - self.preamble = preamble - if not modulename: - flattened_kwds = ffiplatform.flatten(kwds) - vengine_class = _locate_engine_class(ffi, force_generic_engine) - self._vengine = vengine_class(self) - self._vengine.patch_extension_kwds(kwds) - self.flags = flags - self.kwds = self.make_relative_to(kwds, relative_to) - # - if modulename: - if tag: - raise TypeError("can't specify both 'modulename' and 'tag'") - else: - key = '\x00'.join(['%d.%d' % sys.version_info[:2], - __version_verifier_modules__, - preamble, flattened_kwds] + - ffi._cdefsources) - if sys.version_info >= (3,): - key = key.encode('utf-8') - k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff) - k1 = k1.lstrip('0x').rstrip('L') - k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff) - k2 = k2.lstrip('0').rstrip('L') - modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key, - k1, k2) - suffix = _get_so_suffixes()[0] - self.tmpdir = tmpdir or _caller_dir_pycache() - self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension) - self.modulefilename = os.path.join(self.tmpdir, modulename + suffix) - self.ext_package = ext_package - self._has_source = False - self._has_module = False - - def write_source(self, file=None): - """Write the C source code. It is produced in 'self.sourcefilename', - which can be tweaked beforehand.""" - with self.ffi._lock: - if self._has_source and file is None: - raise VerificationError( - "source code already written") - self._write_source(file) - - def compile_module(self): - """Write the C source code (if not done already) and compile it. - This produces a dynamic link library in 'self.modulefilename'.""" - with self.ffi._lock: - if self._has_module: - raise VerificationError("module already compiled") - if not self._has_source: - self._write_source() - self._compile_module() - - def load_library(self): - """Get a C module from this Verifier instance. - Returns an instance of a FFILibrary class that behaves like the - objects returned by ffi.dlopen(), but that delegates all - operations to the C module. If necessary, the C code is written - and compiled first. - """ - with self.ffi._lock: - if not self._has_module: - self._locate_module() - if not self._has_module: - if not self._has_source: - self._write_source() - self._compile_module() - return self._load_library() - - def get_module_name(self): - basename = os.path.basename(self.modulefilename) - # kill both the .so extension and the other .'s, as introduced - # by Python 3: 'basename.cpython-33m.so' - basename = basename.split('.', 1)[0] - # and the _d added in Python 2 debug builds --- but try to be - # conservative and not kill a legitimate _d - if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'): - basename = basename[:-2] - return basename - - def get_extension(self): - ffiplatform._hack_at_distutils() # backward compatibility hack - if not self._has_source: - with self.ffi._lock: - if not self._has_source: - self._write_source() - sourcename = ffiplatform.maybe_relative_path(self.sourcefilename) - modname = self.get_module_name() - return ffiplatform.get_extension(sourcename, modname, **self.kwds) - - def generates_python_module(self): - return self._vengine._gen_python_module - - def make_relative_to(self, kwds, relative_to): - if relative_to and os.path.dirname(relative_to): - dirname = os.path.dirname(relative_to) - kwds = kwds.copy() - for key in ffiplatform.LIST_OF_FILE_NAMES: - if key in kwds: - lst = kwds[key] - if not isinstance(lst, (list, tuple)): - raise TypeError("keyword '%s' should be a list or tuple" - % (key,)) - lst = [os.path.join(dirname, fn) for fn in lst] - kwds[key] = lst - return kwds - - # ---------- - - def _locate_module(self): - if not os.path.isfile(self.modulefilename): - if self.ext_package: - try: - pkg = __import__(self.ext_package, None, None, ['__doc__']) - except ImportError: - return # cannot import the package itself, give up - # (e.g. it might be called differently before installation) - path = pkg.__path__ - else: - path = None - filename = self._vengine.find_module(self.get_module_name(), path, - _get_so_suffixes()) - if filename is None: - return - self.modulefilename = filename - self._vengine.collect_types() - self._has_module = True - - def _write_source_to(self, file): - self._vengine._f = file - try: - self._vengine.write_source_to_f() - finally: - del self._vengine._f - - def _write_source(self, file=None): - if file is not None: - self._write_source_to(file) - else: - # Write our source file to an in memory file. - f = NativeIO() - self._write_source_to(f) - source_data = f.getvalue() - - # Determine if this matches the current file - if os.path.exists(self.sourcefilename): - with open(self.sourcefilename, "r") as fp: - needs_written = not (fp.read() == source_data) - else: - needs_written = True - - # Actually write the file out if it doesn't match - if needs_written: - _ensure_dir(self.sourcefilename) - with open(self.sourcefilename, "w") as fp: - fp.write(source_data) - - # Set this flag - self._has_source = True - - def _compile_module(self): - # compile this C source - tmpdir = os.path.dirname(self.sourcefilename) - outputfilename = ffiplatform.compile(tmpdir, self.get_extension()) - try: - same = ffiplatform.samefile(outputfilename, self.modulefilename) - except OSError: - same = False - if not same: - _ensure_dir(self.modulefilename) - shutil.move(outputfilename, self.modulefilename) - self._has_module = True - - def _load_library(self): - assert self._has_module - if self.flags is not None: - return self._vengine.load_library(self.flags) - else: - return self._vengine.load_library() - -# ____________________________________________________________ - -_FORCE_GENERIC_ENGINE = False # for tests - -def _locate_engine_class(ffi, force_generic_engine): - if _FORCE_GENERIC_ENGINE: - force_generic_engine = True - if not force_generic_engine: - if '__pypy__' in sys.builtin_module_names: - force_generic_engine = True - else: - try: - import _cffi_backend - except ImportError: - _cffi_backend = '?' - if ffi._backend is not _cffi_backend: - force_generic_engine = True - if force_generic_engine: - from . import vengine_gen - return vengine_gen.VGenericEngine - else: - from . import vengine_cpy - return vengine_cpy.VCPythonEngine - -# ____________________________________________________________ - -_TMPDIR = None - -def _caller_dir_pycache(): - if _TMPDIR: - return _TMPDIR - result = os.environ.get('CFFI_TMPDIR') - if result: - return result - filename = sys._getframe(2).f_code.co_filename - return os.path.abspath(os.path.join(os.path.dirname(filename), - '__pycache__')) - -def set_tmpdir(dirname): - """Set the temporary directory to use instead of __pycache__.""" - global _TMPDIR - _TMPDIR = dirname - -def cleanup_tmpdir(tmpdir=None, keep_so=False): - """Clean up the temporary directory by removing all files in it - called `_cffi_*.{c,so}` as well as the `build` subdirectory.""" - tmpdir = tmpdir or _caller_dir_pycache() - try: - filelist = os.listdir(tmpdir) - except OSError: - return - if keep_so: - suffix = '.c' # only remove .c files - else: - suffix = _get_so_suffixes()[0].lower() - for fn in filelist: - if fn.lower().startswith('_cffi_') and ( - fn.lower().endswith(suffix) or fn.lower().endswith('.c')): - try: - os.unlink(os.path.join(tmpdir, fn)) - except OSError: - pass - clean_dir = [os.path.join(tmpdir, 'build')] - for dir in clean_dir: - try: - for fn in os.listdir(dir): - fn = os.path.join(dir, fn) - if os.path.isdir(fn): - clean_dir.append(fn) - else: - os.unlink(fn) - except OSError: - pass - -def _get_so_suffixes(): - suffixes = _extension_suffixes() - if not suffixes: - # bah, no C_EXTENSION available. Occurs on pypy without cpyext - if sys.platform == 'win32': - suffixes = [".pyd"] - else: - suffixes = [".so"] - - return suffixes - -def _ensure_dir(filename): - dirname = os.path.dirname(filename) - if dirname and not os.path.isdir(dirname): - os.makedirs(dirname) |