# -*- coding: utf-8 -*-
# =============================================================================
# COPYRIGHT NOTICE
# =============================================================================
#
# Copyright (c) 2022 Steven Spector
#
# The pyspssio python package is distributed under the MIT license,
# EXCLUDING files from the IBM I/O Modules for SPSS Statistics
# which are covered under a different license.
#
# License information pertaining to the IBM I/O Modules for SPSS Statistics
# is available in the LICENSE document.
# =============================================================================
import os
import re
import platform
import warnings
import locale as lc
from ctypes import *
from .errors import warn_or_raise
from . import config
from .constants import SPSS_MAX_ENCODING
[docs]
class SPSSFile(object):
"""Base class for opening and closing SPSS files"""
[docs]
def __init__(self, spss_file: str, mode: str = "rb", unicode: bool = True, locale: str = None):
if config.spssio_module is None:
raise ValueError(
"Missing spssio module. Set location of module by changing pyspssio.config.spssio_module = path/to/module.ext"
)
# basic settings
self.filename = spss_file
self.mode = mode[0] + "b" # always open/close in byte mode
# load I/O module
pf = platform.system().lower()
if pf.startswith("win"):
loader = WinDLL
lib_pat = r".*\.dll.*"
elif pf.startswith("darwin"):
loader = CDLL
lib_pat = r".*\.dylib.*"
else:
loader = CDLL
lib_pat = r".*\.so.*"
path = os.path.dirname(config.spssio_module)
libs = [os.path.join(path, lib) for lib in sorted(os.listdir(path))]
libs = [lib for lib in libs if re.fullmatch(lib_pat, lib, re.I)]
if pf.startswith("win"):
self._load_libs(libs, loader)
self.spssio = loader(config.spssio_module)
else:
self._load_libs(libs, loader)
self.spssio = loader(config.spssio_module)
# functions for opening and closing (always open with utf-8 encoded filenames)
self._modes = {
"rb": {"open": self.spssio.spssOpenReadU8, "close": self.spssio.spssCloseRead},
"wb": {"open": self.spssio.spssOpenWriteU8, "close": self.spssio.spssCloseWrite},
"ab": {"open": self.spssio.spssOpenAppendU8, "close": self.spssio.spssCloseAppend},
}
# get current locale information and set initial encoding
self.system_locale = lc.setlocale(lc.LC_ALL, "")
language_code, encoding_category = lc.getlocale()
self.encoding = "utf-8" if unicode else encoding_category
# test setting initial locale to obtain encoding
if locale:
# force unicode off if locale is specified
unicode = False
# set system locale to get locale encoding information
locale = lc.setlocale(lc.LC_ALL, locale)
language_code, encoding_category = lc.getlocale()
# set encoding
self.encoding = encoding_category
# reset system locale after getting locale information
lc.setlocale(lc.LC_ALL, self.system_locale)
# initialize I/O module in unicode or codepage mode
self.interface_encoding = unicode
# set I/O locale and initial encoding
self.locale = self.set_locale(self.system_locale if not locale else locale)
# match I/O encoding based on file information for read/append modes
if self.mode in ["rb", "ab"]:
self.mode = "rb"
self.fh = self.open()
self.encoding = self.file_encoding
self.close()
self.interface_encoding = self.encoding.lower() in ["utf-8", "utf8"]
# open file with proper interface encoding and specified mode
self.mode = mode[0] + "b"
self.fh = self.open()
self.encoding = self.file_encoding
# test encoding compatibility
compatible = self.is_compatible_encoding
if not compatible:
UnicodeWarning("File encoding may not be compatible with SPSS I/O interface encoding")
# system missing value for reference to replace with null types
self.sysmis = self._host_sysmis_val
# lowest and highest values for missing value ranges
self.low_value, self.high_value = self._low_high_val
def __enter__(self):
return self
def _exit_cleanup(self):
self.close()
self.set_locale(self.system_locale)
del self.spssio
def __exit__(self, exception_type, exception_value, exception_traceback):
self._exit_cleanup()
def _load_libs(self, libs, loader):
lib_status = {}
lib_errors = {}
try_num = 0
success = False
while not success and try_num < len(libs):
for lib in libs:
status = True
try:
loader(lib)
except Exception as e:
status = False
lib_errors[lib] = e
finally:
lib_status[lib] = status
success = all(lib_status.values())
try_num += 1
return {
os.path.basename(lib): (status if status else lib_errors[lib])
for lib, status in lib_status.items()
}
@property
def _low_high_val(self):
func = self.spssio.spssLowHighVal
func.argtypes = [POINTER(c_double), POINTER(c_double)]
lowest = c_double()
highest = c_double()
func(lowest, highest)
return lowest.value, highest.value
@property
def _host_sysmis_val(self):
func = self.spssio.spssHostSysmisVal
func.argtypes = [POINTER(c_double)]
sysmis = c_double()
func(sysmis)
return sysmis.value
@property
def interface_encoding(self) -> int:
"""I/O interface mode (Unicode or code page)
- 0 = SPSS_ENCODING_CODEPAGE
- 1 = SPSS_ENCODING_UTF8
"""
return self.spssio.spssGetInterfaceEncoding()
@interface_encoding.setter
def interface_encoding(self, unicode: bool):
func = self.spssio.spssSetInterfaceEncoding
func.argtypes = [c_int]
retcode = func(c_int(int(unicode)))
warn_or_raise(retcode, func)
return
@property
def file_encoding(self) -> str:
"""File encoding reported by I/O module"""
func = self.spssio.spssGetFileEncoding
psz_encoding = create_string_buffer(SPSS_MAX_ENCODING + 1)
retcode = func(self.fh, psz_encoding)
warn_or_raise(retcode, func)
return psz_encoding.value.decode(self.encoding)
[docs]
def set_locale(self, locale: str) -> str:
"""Set I/O module to a specific locale"""
func = self.spssio.spssSetLocale
func.argtypes = [c_int, c_char_p]
func.restype = c_char_p
result = func(lc.LC_ALL, locale.encode(self.encoding))
if result:
return result.decode(self.encoding)
else:
warnings.warn(
"Failed to set locale to: "
+ locale
+ ". "
+ "Current locale is: "
+ ".".join(lc.getlocale()),
stacklevel=2,
)
return ".".join(lc.getlocale())
@property
def is_compatible_encoding(self) -> bool:
"""Check encoding compatibility
From I/O module documentation: "This function determines whether the file's encoding is compatible with the current interface encoding.
The result value ... will be false when reading a code page file in UTF-8 mode, when reading
a UTF-8 file in code page mode when reading a code page file encoded in other than the current locale's
code page, or when reading a file with numbers represented in reverse bit order. If the encoding is
incompatible, data stored in the file by other applications, particularly Data Entry for Windows, may be
unreliable."
"""
func = self.spssio.spssIsCompatibleEncoding
func.argtypes = [c_int, POINTER(c_int)]
b_compatible = c_int()
retcode = func(self.fh, b_compatible)
warn_or_raise(retcode, func)
return bool(b_compatible.value)
[docs]
def open(self) -> int:
"""Open file
Returns file handle that is used for most other I/O module functions.
Notes
-----
Filenames are always encoded in UTF-8 regardless of interface mode and locale settings.
This is to avoid issues where a filename uses special characters that aren't available
in the encoding defined by the file itself. For example, a Windows-1252 .sav file
which uses Chinese (or other special multibyte characters) in its filename.
"""
with open(self.filename, self.mode) as f:
fh = c_int(f.fileno())
filename_adjusted = os.path.expanduser(os.path.abspath(self.filename))
filename_encoded = filename_adjusted.encode("utf-8")
func = self._modes[self.mode]["open"]
retcode = func(filename_encoded, byref(fh))
warn_or_raise(retcode, func)
return fh
[docs]
def close(self):
"""Close file"""
func = self._modes[self.mode]["close"]
retcode = func(self.fh)
warn_or_raise(retcode, func)
@property
def compression(self) -> int:
"""Compression level
- 0 = No compression
- 1 = SAV
- 2 = ZSAV
"""
func = self.spssio.spssGetCompression
func.argtypes = [c_int, POINTER(c_int)]
comp_switch = c_int()
retcode = func(self.fh, comp_switch)
warn_or_raise(retcode, func)
return comp_switch.value
@compression.setter
def compression(self, comp_switch=1):
func = self.spssio.spssSetCompression
retcode = func(self.fh, c_int(comp_switch))
warn_or_raise(retcode, func)
@property
def release_info(self) -> dict:
"""Basic file information
- release number
- release subnumber
- fixpack number
- machine code
- floating-point representation code
- compression scheme code
- big/little-endian code
- character representation code
"""
fields = [
"release number",
"release subnumber",
"fixpack number",
"machine code",
"floating-point representation code",
"compression scheme code",
"big/little-endian code",
"character representation code",
]
rel_info_arr = (c_int * len(fields))()
func = self.spssio.spssGetReleaseInfo
retcode = func(self.fh, rel_info_arr)
warn_or_raise(retcode, func)
return dict([(item, rel_info_arr[i]) for i, item in enumerate(fields)])
@property
def var_count(self) -> int:
"""Number of variables"""
func = self.spssio.spssGetNumberofVariables
func.argtypes = [c_int, POINTER(c_long)]
num_vars = c_long()
retcode = func(self.fh, num_vars)
warn_or_raise(retcode, func)
return num_vars.value
@property
def case_count(self) -> int:
"""Number of cases"""
func = self.spssio.spssGetNumberofCases
func.argtypes = [c_int, POINTER(c_long)]
num_cases = c_long()
retcode = func(self.fh, num_cases)
warn_or_raise(retcode, func)
return num_cases.value