# -*- coding: utf-8 -*-
# =============================================================================
# COPYRIGHT NOTICE
# =============================================================================
#
# Copyright (c) 2022 Steven Spector
#
# The pyspssio python package is distributed under the MIT license,
# EXCLUDING files from the IBM I/O Modules for SPSS Statistics
# which are covered under a different license.
#
# License information pertaining to the IBM I/O Modules for SPSS Statistics
# is available in the LICENSE document.
# =============================================================================
from ctypes import *
from typing import Union, Any
import numpy as np
import pandas as pd
from pandas import DataFrame
from .errors import warn_or_raise
from . import config
from .constants import *
from .constants_map import *
from .header import Header
[docs]
class Reader(Header):
"""Class for reading metadata and data"""
[docs]
def __init__(
self,
*args,
row_offset: int = 0,
row_limit: int = None,
usecols: Union[list, tuple, str, callable, None] = None,
chunksize: int = None,
convert_datetimes: bool = True,
include_user_missing: bool = True,
string_nan: Any = "",
**kwargs
):
super().__init__(*args, **kwargs)
# adjust usecols
if usecols is None:
usecols = self.var_names
elif isinstance(usecols, str):
usecols = [x.strip() for x in usecols.split(",")]
usecols = [col for col in usecols if col in self.var_names]
elif callable(usecols):
usecols = [col for col in self.var_names if usecols(col)]
else:
usecols = [col for col in usecols if col in self.var_names]
self.usecols = usecols
self.convert_datetimes = convert_datetimes
self.include_user_missing = include_user_missing
self.case_record = create_string_buffer(self.case_size)
(
self.dtype_double,
self.numeric_names,
self.numeric_slices,
self.datetime_names,
self.datetime_slices,
self.time_names,
self.time_slices,
self.string_names,
self.string_slices,
) = self._build_struct()
self.chunksize = chunksize
self.chunk = 0
self.string_nan = string_nan
if row_offset is None:
row_offset = 0
self.row_offset = row_offset
# adjust row_limit
if row_limit is None:
row_limit = self.case_count
row_limit = min(row_limit, self.case_count - row_offset)
self.total_rows = row_limit
# row offset
if row_offset:
self._seek_next_case(row_offset)
def __iter__(self):
return self
def __next__(self):
try:
if self.chunk < self.total_rows:
row_limit = min(self.chunksize, self.total_rows - self.chunk)
df = self.read_data(
row_limit=row_limit,
convert_datetimes=self.convert_datetimes,
include_user_missing=self.include_user_missing,
)
df.index = pd.RangeIndex(
self.row_offset + self.chunk, self.row_offset + self.chunk + row_limit
)
self.chunk += self.chunksize
return df
else:
raise StopIteration()
except Exception:
self._exit_cleanup()
raise
def _seek_next_case(self, case_number):
func = self.spssio.spssSeekNextCase
retcode = func(self.fh, c_long(case_number))
warn_or_raise(retcode, func, case_number)
def _whole_case_in(self, case_record):
"""caseRec is a string buffer of case_size
see case_size in Header class
"""
func = self.spssio.spssWholeCaseIn
retcode = func(self.fh, case_record)
warn_or_raise(retcode, func)
return case_record
@property
def metadata(self) -> dict:
"""Metadata object
This object contains properties/attributes from the Header class
mostly pertaining to variable information and data structure.
"""
usecols = self.usecols
# these are dictionaries in the form {var1: attributes, var2: attributes, ...}
variable_properties = [
"var_types",
"var_formats",
"var_formats_tuple",
"var_labels",
"var_alignments",
"var_column_widths",
"var_measure_levels",
"var_roles",
"var_missing_values",
"var_value_labels",
"var_attributes",
"var_compat_names",
]
# trim mrsets
mrsets = {}
if len(self.mrsets):
for set_name, set_attr in self.mrsets.items():
set_attr["variable_list"] = [
col for col in set_attr["variable_list"] if col in usecols
]
if set_attr["variable_list"]:
mrsets[set_name] = set_attr
metadata = {
"file_attributes": self.file_attributes,
"encoding": self.encoding,
"case_count": self.case_count,
"case_weight_var": self.case_weight_var,
"mrsets": mrsets,
"var_names": self.var_names,
}
for prop in variable_properties:
metadata[prop] = {k: v for k, v in getattr(self, prop).items() if k in usecols}
return metadata
def _build_struct(self):
usecols = self.usecols
# get variable info
var_names = self.var_names
var_types = self.var_types
var_formats = self.var_formats_tuple
# get buffer structure
numeric_names = []
numeric_formats = []
numeric_offsets = []
numeric_nbytes = []
numeric_slices = [slice(0, 0)]
time_names = []
time_formats = []
time_offsets = []
time_nbytes = []
time_slices = [slice(0, 0)]
datetime_names = []
datetime_formats = []
datetime_offsets = []
datetime_nbytes = []
datetime_slices = [slice(0, 0)]
string_names = []
string_slices = []
offset = 0
for var_name in var_names:
var_type = var_types[var_name]
var_format = var_formats[var_name]
if var_type:
nbytes = int(8 * -(var_type // -8))
sformat = "a" + str(nbytes)
if var_name in usecols:
string_names.append(var_name)
s = slice(offset, offset + nbytes)
string_slices.append(s)
else:
nbytes = 8
sformat = "d"
if var_name in usecols:
if var_format[0] in config.spss_datetime_formats_to_convert:
datetime_names.append(var_name)
datetime_formats.append(sformat)
datetime_offsets.append(offset)
datetime_nbytes.append(nbytes)
s = slice(offset, offset + nbytes)
s_prev = datetime_slices[-1]
if s.start == s_prev.stop:
datetime_slices[-1] = slice(s_prev.start, s.stop)
else:
datetime_slices.append(s)
elif var_format[0] in config.spss_time_formats_to_convert:
time_names.append(var_name)
time_formats.append(sformat)
time_offsets.append(offset)
time_nbytes.append(nbytes)
s = slice(offset, offset + nbytes)
s_prev = time_slices[-1]
if s.start == s_prev.stop:
time_slices[-1] = slice(s_prev.start, s.stop)
else:
time_slices.append(s)
else:
numeric_names.append(var_name)
numeric_formats.append(sformat)
numeric_offsets.append(offset)
numeric_nbytes.append(nbytes)
s = slice(offset, offset + nbytes)
s_prev = numeric_slices[-1]
if s.start == s_prev.stop:
numeric_slices[-1] = slice(s_prev.start, s.stop)
else:
numeric_slices.append(s)
offset += nbytes
dtype_double = np.dtype("d")
# endianness adjustments
endianness = {0: "<", 1: ">"}.get(self.release_info.get("big/little-endian code"))
if endianness:
dtype_double = dtype_double.newbyteorder(endianness)
return (
dtype_double,
numeric_names,
numeric_slices,
datetime_names,
datetime_slices,
time_names,
time_slices,
string_names,
string_slices,
)
[docs]
def read_data(
self,
row_limit: int = None,
convert_datetimes: bool = None,
include_user_missing: bool = None,
) -> DataFrame:
"""Read data
Parameters
----------
row_limit
Maximum number of rows to return
convert_datetimes
Convert SPSS datetimes to Python/Pandas datetime columns;
False returns seconds from October 15, 1582 (SPSS start date)
include_user_missing
Whether to keep user missing values or
replace them with NaN (numeric) and "" (strings)
Returns
-------
DataFrame
"""
if row_limit:
row_limit = min(row_limit, self.total_rows)
else:
row_limit = self.total_rows
if convert_datetimes is None:
convert_datetimes = self.convert_datetimes
if include_user_missing is None:
include_user_missing = self.include_user_missing
def load_strings(case):
return tuple(
str(case[self.string_slices[idx]], self.encoding).rstrip(" ")
for idx, var_name in enumerate(self.string_names)
)
def load_numerics(case):
b = bytearray()
for s in self.numeric_slices:
b += case[s]
return np.frombuffer(b, dtype=self.dtype_double)
def load_times(case):
b = bytearray()
for s in self.time_slices:
b += case[s]
return np.frombuffer(b, dtype=self.dtype_double)
def load_datetimes(case):
b = bytearray()
for s in self.datetime_slices:
b += case[s]
return np.frombuffer(b, dtype=self.dtype_double)
def replace_sysmis(arr):
return np.where(arr == self.sysmis, np.nan, arr)
def convert_datetime(arr):
return ((arr - SPSS_ORIGIN_OFFSET) * S_TO_NS).astype("datetime64[ns]", copy=False)
def convert_time(arr):
return (arr * S_TO_NS).astype("timedelta64[ns]", copy=False)
# create empty arrays
n_arr = np.empty(shape=(row_limit, len(self.numeric_names)), dtype=self.dtype_double)
t_arr = np.empty(shape=(row_limit, len(self.time_names)), dtype=self.dtype_double)
d_arr = np.empty(shape=(row_limit, len(self.datetime_names)), dtype=self.dtype_double)
s_arr = np.empty(shape=(row_limit, len(self.string_names)), dtype="O")
# load cases into arrays
for row in range(row_limit):
case = memoryview(self._whole_case_in(self.case_record))
# return (load_numerics(case), struct_names)
n_arr[row] = load_numerics(case)
t_arr[row] = load_times(case)
d_arr[row] = load_datetimes(case)
s_arr[row] = load_strings(case)
# replace system missing
n_arr = replace_sysmis(n_arr)
t_arr = replace_sysmis(t_arr)
d_arr = replace_sysmis(d_arr)
# convert datetimes
if convert_datetimes and len(d_arr):
d_arr = convert_datetime(d_arr)
# convert times
if convert_datetimes and len(t_arr):
t_arr = convert_time(t_arr)
# create final dataframe
all_cols = {col: None for col in self.usecols}
for idx, col in enumerate(self.datetime_names):
all_cols[col] = d_arr[:, idx]
for idx, col in enumerate(self.time_names):
all_cols[col] = t_arr[:, idx]
for idx, col in enumerate(self.string_names):
all_cols[col] = s_arr[:, idx]
for idx, col in enumerate(self.numeric_names):
all_cols[col] = n_arr[:, idx]
df = pd.DataFrame(all_cols, copy=False)
# drop user missing values if specified
if not include_user_missing:
var_types = self.var_types
for col, missing in self.var_missing_values.items():
if col in df.columns:
df.loc[df[col].isin(missing.get("values", [])), col] = (
"" if var_types[col] else np.nan
)
high = missing.get("hi")
low = missing.get("lo")
if high is not None and low is not None:
df.loc[df[col].between(low, high, inclusive="both"), col] = np.nan
# use user-defined string nan value
if self.string_nan != "":
df = df.replace("", self.string_nan, regex=False)
# adjust index for with row_offset
if self.row_offset and not self.chunksize:
df.index = pd.RangeIndex(self.row_offset, self.row_offset + self.total_rows)
return df