Source code for acton.database

"""Wrapper class for databases."""

from abc import ABC, abstractmethod
from inspect import Traceback
import json
import logging
import os.path
import tempfile
from typing import Iterable, List, Sequence
import warnings

from acton.proto.acton_pb2 import Database as DatabasePB
import astropy.io.ascii as io_ascii
import astropy.io.fits as io_fits
import astropy.table
import h5py
import numpy
import pandas
import sklearn.preprocessing


LabelEncoderPB = DatabasePB.LabelEncoder


[docs]def product(seq: Iterable[int]): """Finds the product of a list of ints. Arguments --------- seq List of ints. Returns ------- int Product. """ prod = 1 for i in seq: prod *= i return prod
[docs]def serialise_encoder( encoder: sklearn.preprocessing.LabelEncoder) -> LabelEncoderPB: """Serialises a LabelEncoder as a protobuf. Parameters ---------- encoder LabelEncoder. Returns ------- LabelEncoderPB Protobuf representing the LabelEncoder. """ proto = LabelEncoderPB() if not hasattr(encoder, 'classes_'): return proto for i, class_label in enumerate(encoder.classes_): encoding = proto.encoding.add() encoding.class_label = str(class_label) encoding.class_int = i return proto
[docs]class Database(ABC): """Base class for database wrappers.""" @abstractmethod def __enter__(self): return self @abstractmethod def __exit__(self, exc_type: Exception, exc_val: object, exc_tb: Traceback): pass @abstractmethod
[docs] def read_features(self, ids: Sequence[int]) -> numpy.ndarray: """Reads feature vectors from the database. Parameters ---------- ids Iterable of IDs. Returns ------- numpy.ndarray N x D array of feature vectors. """
@abstractmethod
[docs] def read_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int]) -> numpy.ndarray: """Reads label vectors from the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. Returns ------- numpy.ndarray T x N x F array of label vectors. """
@abstractmethod
[docs] def write_features(self, ids: Sequence[int], features: numpy.ndarray): """Writes feature vectors to the database. Parameters ---------- ids Iterable of IDs. features N x D array of feature vectors. The ith row corresponds to the ith ID in `ids`. """
@abstractmethod
[docs] def write_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int], labels: numpy.ndarray): """Writes label vectors to the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. labels T x N x D array of label vectors. The ith row corresponds to the ith labeller ID in `labeller_ids` and the jth column corresponds to the jth instance ID in `instance_ids`. """
@abstractmethod
[docs] def get_known_instance_ids(self) -> List[int]: """Returns a list of known instance IDs. Returns ------- List[str] A list of known instance IDs. """
@abstractmethod
[docs] def get_known_labeller_ids(self) -> List[int]: """Returns a list of known labeller IDs. Returns ------- List[str] A list of known labeller IDs. """
@abstractmethod
[docs] def to_proto(self) -> DatabasePB: """Serialises this database as a protobuf. Returns ------- DatabasePB Protobuf representing this database. """
[docs]class HDF5Database(Database): """Database wrapping an HDF5 file as a context manager. Attributes ---------- path : str Path to HDF5 file. _h5_file : h5py.File HDF5 file object. """ def __init__(self, path: str): self.path = path def __enter__(self): self._open_hdf5() return self def __exit__(self, exc_type: Exception, exc_val: object, exc_tb: Traceback): self._h5_file.close() delattr(self, '_h5_file') def _assert_open(self): """Asserts that the HDF5 file is ready to be read to/written from. Raises ------ AssertionError """ assert hasattr(self, '_h5_file'), ('HDF5 database must be used as a ' 'context manager.') def _open_hdf5(self): """Opens the HDF5 file and creates it if it doesn't exist. Notes ----- The HDF5 file will be stored in self._h5_file. """ try: self._h5_file = h5py.File(self.path, 'r+') except OSError: with h5py.File(self.path, 'w') as h5_file: self._setup_hdf5(h5_file) self._h5_file = h5py.File(self.path, 'r+')
[docs]class ManagedHDF5Database(HDF5Database): """Database using an HDF5 file. Notes ----- This database uses an internal schema. For reading files from disk, use another Database. Attributes ---------- path : str Path to HDF5 file. label_dtype : str Data type of labels. feature_dtype : str Data type of features. _h5_file : h5py.File Opened HDF5 file. _sync_attrs : List[str] List of instance attributes to sync with the HDF5 file's attributes. """ def __init__(self, path: str, label_dtype: str=None, feature_dtype: str=None): """ Parameters ---------- path Path to HDF5 file. label_dtype Data type of labels. If not provided then it will be read from the database file; if the database file does not exist then the default type of 'float32' will be used. feature_dtype Data type of features. If not provided then it will be read from the database file; if the database file does not exist then the default type of 'float32' will be used. """ super().__init__(path) self.label_dtype = label_dtype self._default_label_dtype = 'float32' self.feature_dtype = feature_dtype self._default_feature_dtype = 'float32' # List of attributes to keep in sync with the HDF5 file. self._sync_attrs = ['label_dtype', 'feature_dtype']
[docs] def to_proto(self) -> DatabasePB: """Serialises this database as a protobuf. Returns ------- DatabasePB Protobuf representing this database. """ proto = DatabasePB() proto.path = self.path proto.class_name = 'ManagedHDF5Database' db_kwargs = { 'label_dtype': self.label_dtype, 'feature_dtype': self.feature_dtype} for key, value in db_kwargs.items(): kwarg = proto.kwarg.add() kwarg.key = key kwarg.value = json.dumps(value) # No encoder for a managed DB - assume that labels are encoded already. # proto.label_encoder.CopyFrom(serialise_encoder(self.label_encoder)) return proto
def _open_hdf5(self): """Opens the HDF5 file and creates it if it doesn't exist. Notes ----- The HDF5 file will be stored in self._h5_file. """ super()._open_hdf5() # Load attrs from HDF5 file if we haven't specified them. for attr in self._sync_attrs: if getattr(self, attr) is None: setattr(self, attr, self._h5_file.attrs[attr]) self._validate_hdf5()
[docs] def write_features(self, ids: Sequence[int], features: numpy.ndarray): """Writes feature vectors to the database. Parameters ---------- ids Iterable of IDs. features: N x D array of feature vectors. The ith row corresponds to the ith ID in `ids`. Returns ------- numpy.ndarray N x D array of feature vectors. """ self._assert_open() # Input validation. if len(ids) != len(features): raise ValueError('Must have same number of IDs and features.') if self._h5_file.attrs['n_features'] == -1: # This is the first time we've stored features, so make a record of # the dimensionality. self._h5_file.attrs['n_features'] = features.shape[1] elif self._h5_file.attrs['n_features'] != features.shape[1]: raise ValueError( 'Expected features to have dimensionality {}, got {}'.format( self._h5_file.attrs['n_features'], features.shape[1])) # Early termination. if not ids: return # Cast the features to the right type. if features.dtype != self.feature_dtype: warnings.warn('Casting features from type {} to type {}.'.format( features.dtype, self.feature_dtype)) features = features.astype(self.feature_dtype) # Resize the feature array if we need to store more IDs than before. max_id = max(ids) + 1 if max_id > self._h5_file['features'].shape[0]: self._h5_file['features'].resize( (max_id, self._h5_file.attrs['n_features'])) # Store the feature vectors. # TODO(MatthewJA): Vectorise this. This could be tricky as HDF5 doesn't # fully support NumPy's fancy indexing. for id_, feature in zip(ids, features): self._h5_file['features'][id_, :] = feature # Add the IDs to the database. known_ids = set(self.get_known_instance_ids()) new_ids = [i for i in ids if i not in known_ids] n_new_ids = len(new_ids) n_old_ids = self._h5_file['instance_ids'].shape[0] self._h5_file['instance_ids'].resize((n_old_ids + n_new_ids,)) self._h5_file['instance_ids'][-n_new_ids:] = numpy.array( new_ids, dtype=int)
[docs] def read_features(self, ids: Sequence[int]) -> numpy.ndarray: """Reads feature vectors from the database. Parameters ---------- ids Iterable of IDs. Returns ------- numpy.ndarray N x D array of feature vectors. """ self._assert_open() if self._h5_file.attrs['n_features'] == -1 and ids: raise KeyError('No features stored in database.') # Allocate the features array. features = numpy.zeros((len(ids), self._h5_file.attrs['n_features']), dtype=self._h5_file.attrs['feature_dtype']) # Loop through each ID we want to query and put the associated feature # into the features array. features = self._h5_file['features'].value[ids, :] features = numpy.asarray( features, dtype=self._h5_file.attrs['feature_dtype']) return features
[docs] def write_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int], labels: numpy.ndarray): """Writes label vectors to the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. labels T x N x D array of label vectors. The ith row corresponds to the ith labeller ID in `labeller_ids` and the jth column corresponds to the jth instance ID in `instance_ids`. """ self._assert_open() # Input validation. if len(labeller_ids) != labels.shape[0]: raise ValueError( 'labels array has incorrect number of labellers:' ' expected {}, got {}.'.format(len(labeller_ids), labels.shape[0])) if len(instance_ids) != labels.shape[1]: raise ValueError( 'labels array has incorrect number of instances:' ' expected {}, got {}.'.format(len(instance_ids), labels.shape[1])) if self._h5_file.attrs['label_dim'] == -1: # This is the first time we've stored labels, so make a record of # the dimensionality. self._h5_file.attrs['label_dim'] = labels.shape[2] elif self._h5_file.attrs['label_dim'] != labels.shape[2]: raise ValueError( 'Expected labels to have dimensionality {}, got {}'.format( self._h5_file.attrs['label_dim'], labels.shape[2])) # Early termination. if not labeller_ids or not instance_ids: return # Cast the labels to the right type. if labels.dtype != self.label_dtype: warnings.warn('Casting labels from type {} to type {}.'.format( labels.dtype, self.label_dtype)) labels = labels.astype(self.label_dtype) # Resize the label array if necessary. max_labeller_id = max(labeller_ids) + 1 max_instance_id = max(instance_ids) + 1 if (max_labeller_id > self._h5_file['labels'].shape[0] or max_instance_id > self._h5_file['labels'].shape[1]): self._h5_file['labels'].resize( (max_labeller_id, max_instance_id, self._h5_file.attrs['label_dim'])) # Store the labels. # TODO(MatthewJA): Vectorise this. for labeller_idx, labeller_id in enumerate(labeller_ids): for instance_idx, instance_id in enumerate(instance_ids): label = labels[labeller_idx, instance_idx] self._h5_file['labels'][ labeller_id, instance_id, :] = label logging.debug( 'New label array size: {}'.format(self._h5_file['labels'].shape)) # Add the instance IDs to the database. known_instance_ids = set(self.get_known_instance_ids()) new_instance_ids = [i for i in instance_ids if i not in known_instance_ids] n_new_instance_ids = len(new_instance_ids) n_old_instance_ids = self._h5_file['instance_ids'].shape[0] if n_new_instance_ids: self._h5_file['instance_ids'].resize( (n_old_instance_ids + n_new_instance_ids,)) self._h5_file['instance_ids'][-n_new_instance_ids:] = numpy.array( new_instance_ids, dtype=int) # Add the labeller IDs to the database. known_labeller_ids = set(self.get_known_labeller_ids()) new_labeller_ids = [i for i in labeller_ids if i not in known_labeller_ids] n_new_labeller_ids = len(new_labeller_ids) n_old_labeller_ids = self._h5_file['labeller_ids'].shape[0] if n_new_labeller_ids: self._h5_file['labeller_ids'].resize( (n_old_labeller_ids + n_new_labeller_ids,)) self._h5_file['labeller_ids'][-n_new_labeller_ids:] = numpy.array( new_labeller_ids, dtype=int)
[docs] def read_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int]) -> numpy.ndarray: """Reads label vectors from the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. Returns ------- numpy.ndarray T x N x F array of label vectors. """ self._assert_open() if self._h5_file.attrs['label_dim'] == -1 and ( labeller_ids or instance_ids): raise KeyError('No labels stored in database.') labels = self._h5_file['labels'].value[labeller_ids][:, instance_ids, :] labels = numpy.asarray(labels, dtype=self._h5_file.attrs['label_dtype']) return labels
[docs] def get_known_instance_ids(self) -> List[int]: """Returns a list of known instance IDs. Returns ------- List[str] A list of known instance IDs. """ self._assert_open() return [id_ for id_ in self._h5_file['instance_ids']]
[docs] def get_known_labeller_ids(self) -> List[int]: """Returns a list of known labeller IDs. Returns ------- List[str] A list of known labeller IDs. """ self._assert_open() return [id_ for id_ in self._h5_file['labeller_ids']]
def _setup_hdf5(self, h5_file: h5py.File): """Sets up an HDF5 file to work as a database. Parameters ---------- h5_file HDF5 file to set up. Must be opened in write mode. """ if self.label_dtype is None: self.label_dtype = self._default_label_dtype if self.feature_dtype is None: self.feature_dtype = self._default_feature_dtype h5_file.create_dataset('features', shape=(0, 0), dtype=self.feature_dtype, maxshape=(None, None)) h5_file.create_dataset('labels', shape=(0, 0, 0), dtype=self.label_dtype, maxshape=(None, None, None)) h5_file.create_dataset('instance_ids', shape=(0,), dtype=int, maxshape=(None,)) h5_file.create_dataset('labeller_ids', shape=(0,), dtype=int, maxshape=(None,)) h5_file.attrs['label_dtype'] = self.label_dtype h5_file.attrs['feature_dtype'] = self.feature_dtype h5_file.attrs['n_features'] = -1 h5_file.attrs['label_dim'] = -1 def _validate_hdf5(self): """Checks that self._h5_file has the correct schema. Raises ------ ValueError """ try: assert 'features' in self._h5_file assert 'labels' in self._h5_file assert 'instance_ids' in self._h5_file assert 'labeller_ids' in self._h5_file assert len(self._h5_file['features'].shape) == 2 assert len(self._h5_file['labels'].shape) == 3 assert len(self._h5_file['instance_ids'].shape) == 1 assert len(self._h5_file['labeller_ids'].shape) == 1 except AssertionError: raise ValueError( 'File {} is not a valid database.'.format(self.path)) for attr in self._sync_attrs: assert getattr(self, attr) is not None if self._h5_file.attrs[attr] != getattr(self, attr): raise ValueError('Incompatible {}: expected {}, got {}'.format( attr, getattr(self, attr), self._h5_file.attrs[attr]))
[docs]class HDF5Reader(HDF5Database): """Reads HDF5 databases. Attributes ---------- feature_cols : List[str] List of feature datasets. label_col : str Name of label dataset. n_features : int Number of features. n_instances : int Number of instances. n_labels : int Number of labels per instance. path : str Path to HDF5 file. encode_labels : bool Whether to encode labels as integers. label_encoder : sklearn.preprocessing.LabelEncoder Encodes labels as integers. _h5_file : h5py.File HDF5 file object. _is_multidimensional : bool Whether the features are in a multidimensional dataset. """ def __init__(self, path: str, feature_cols: List[str], label_col: str, encode_labels: bool=True, label_encoder: sklearn.preprocessing.LabelEncoder=None): """ Parameters ---------- path Path to HDF5 file. feature_cols List of feature datasets. If only one feature dataset is specified, this dataset is allowed to be a multidimensional dataset and contain multiple features. label_col Name of label dataset. encode_labels Whether to encode labels as integers. label_encoder Encodes labels as integers. If not specified, the label column will be read and a label encoding generated. """ super().__init__(path) if not feature_cols: raise ValueError('Must specify feature columns for HDF5.') self.feature_cols = feature_cols self.label_col = label_col self.encode_labels = encode_labels self.label_encoder = label_encoder if self.label_encoder and not self.encode_labels: raise ValueError('label_encoder specified but encode_labels is ' 'False') if self.label_encoder is None: self.label_encoder = sklearn.preprocessing.LabelEncoder() with h5py.File(self.path, 'r') as data: is_multidimensional = any(len(data[f_col].shape) > 1 or not product(data[f_col].shape[1:]) == 1 for f_col in feature_cols) if is_multidimensional and len(feature_cols) != 1: raise ValueError( 'Feature arrays and feature columns cannot be mixed. ' 'To read in features from a multidimensional dataset, ' 'only specify one feature column name.') self._is_multidimensional = is_multidimensional self.n_instances = data[label_col].shape[0] if len(data[label_col].shape) == 1: self.n_labels = 1 else: assert len(data[label_col].shape) == 2 self.n_labels = data[label_col].shape[1] if is_multidimensional: self.n_features = data[feature_cols[0]].shape[1] else: self.n_features = len(feature_cols)
[docs] def to_proto(self) -> DatabasePB: """Serialises this database as a protobuf. Returns ------- DatabasePB Protobuf representing this database. """ proto = DatabasePB() proto.path = self.path proto.class_name = 'HDF5Reader' db_kwargs = { 'feature_cols': self.feature_cols, 'label_col': self.label_col, 'encode_labels': self.encode_labels} for key, value in db_kwargs.items(): kwarg = proto.kwarg.add() kwarg.key = key kwarg.value = json.dumps(value) proto.label_encoder.CopyFrom(serialise_encoder(self.label_encoder)) return proto
[docs] def read_features(self, ids: Sequence[int]) -> numpy.ndarray: """Reads feature vectors from the database. Parameters ---------- ids Iterable of IDs. Returns ------- numpy.ndarray N x D array of feature vectors. """ # TODO(MatthewJA): Optimise this. self._assert_open() # For each ID, get the corresponding features. if self._is_multidimensional: # If there are duplicates in ids, then this will crash with an # OSError! (and a very cryptic error message...) To get around this, # we'll first get all the unique IDs. unique_ids = [] unique_ids_set = set() # For lookups. id_to_index = {} # For reconstructing the features. for id_ in ids: if id_ not in unique_ids_set: unique_ids.append(id_) unique_ids_set.add(id_) id_to_index[id_] = len(unique_ids) - 1 # Then index with just the unique IDs. features_ = self._h5_file[self.feature_cols[0]][unique_ids] # Finally, reconstruct the features array. features = numpy.zeros((len(ids), features_.shape[1])) for index, id_ in enumerate(ids): index_ = id_to_index[id_] features[index, :] = features_[index_, :] return features else: # Allocate output array. features = numpy.zeros((len(ids), len(self.feature_cols))) # Read each feature. features_h5 = self._h5_file[self.feature_cols[0]] for feature_idx, feature_name in enumerate(self.feature_cols): features[ids, feature_idx] = features_h5[feature_name][ids] return numpy.nan_to_num(features)
[docs] def read_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int]) -> numpy.ndarray: """Reads label vectors from the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. Returns ------- numpy.ndarray T x N x F array of label vectors. """ self._assert_open() if len(labeller_ids) > 1: raise NotImplementedError('Multiple labellers not yet supported.') # TODO(MatthewJA): Optimise this. # For each ID, get the corresponding labels. # If there are duplicates in ids, then this will crash with an # OSError! (and a very cryptic error message...) To get around this, # we'll first get all the unique IDs. unique_ids = [] unique_ids_set = set() # For lookups. id_to_index = {} # For reconstructing the labels. for id_ in instance_ids: if id_ not in unique_ids_set: unique_ids.append(id_) unique_ids_set.add(id_) id_to_index[id_] = len(unique_ids) - 1 # Then index with just the unique IDs. labels_ = self._h5_file[self.label_col][unique_ids].reshape( (1, len(unique_ids), -1)) # Finally, reconstruct the labels array. labels = numpy.zeros( (1, len(instance_ids), labels_.shape[2]), dtype=labels_.dtype) for index, id_ in enumerate(instance_ids): index_ = id_to_index[id_] labels[0, index, :] = labels_[0, index_, :] if labels.shape[2] != 1: raise NotImplementedError('Multidimensional labels not currently ' 'supported.') # Encode labels. if self.encode_labels: labels = numpy.apply_along_axis( self.label_encoder.fit_transform, axis=1, arr=labels.reshape(labels.shape[:2]) ).reshape(labels.shape) return labels
[docs] def write_features(self, ids: Sequence[int], features: numpy.ndarray): raise PermissionError('Cannot write to read-only database.')
[docs] def write_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int], labels: numpy.ndarray): raise PermissionError('Cannot write to read-only database.')
[docs] def get_known_instance_ids(self) -> List[int]: """Returns a list of known instance IDs. Returns ------- List[str] A list of known instance IDs. """ self._assert_open() return [i for i in range(self.n_instances)]
[docs] def get_known_labeller_ids(self) -> List[int]: """Returns a list of known labeller IDs. Returns ------- List[str] A list of known labeller IDs. """ raise NotImplementedError()
[docs]class ASCIIReader(Database): """Reads ASCII databases. Attributes ---------- feature_cols : List[str] List of feature columns. label_col : str Name of label column. max_id_length : int Maximum length of IDs. n_features : int Number of features. n_instances : int Number of instances. n_labels : int Number of labels per instance. path : str Path to ASCII file. encode_labels : bool Whether to encode labels as integers. label_encoder : sklearn.preprocessing.LabelEncoder Encodes labels as integers. _db : Database Underlying ManagedHDF5Database. _db_filepath : str Path of underlying HDF5 database. _tempdir : str Temporary directory where the underlying HDF5 database is stored. """ def __init__(self, path: str, feature_cols: List[str], label_col: str, encode_labels: bool=True, label_encoder: sklearn.preprocessing.LabelEncoder=None): """ Parameters ---------- path Path to ASCII file. feature_cols List of feature columns. label_col Name of label column. encode_labels Whether to encode labels as integers. label_encoder Encodes labels as integers. If not specified, the label column will be read and a label encoding generated. """ self.path = path self.feature_cols = feature_cols self.label_col = label_col self.encode_labels = encode_labels self.label_encoder = label_encoder if self.label_encoder and not self.encode_labels: raise ValueError('label_encoder specified but encode_labels is ' 'False') if self.label_encoder is None: self.label_encoder = sklearn.preprocessing.LabelEncoder()
[docs] def to_proto(self) -> DatabasePB: """Serialises this database as a protobuf. Returns ------- DatabasePB Protobuf representing this database. """ proto = DatabasePB() proto.path = self.path proto.class_name = 'ASCIIReader' db_kwargs = { 'feature_cols': self.feature_cols, 'label_col': self.label_col, 'encode_labels': self.encode_labels} for key, value in db_kwargs.items(): kwarg = proto.kwarg.add() kwarg.key = key kwarg.value = json.dumps(value) proto.label_encoder.CopyFrom(serialise_encoder(self.label_encoder)) return proto
def _db_from_ascii(self, db: Database, data: astropy.table.Table, feature_cols: Sequence[str], label_col: str, ids: Sequence[int]): """Reads an ASCII table into a database. Notes ----- The entire file is copied into memory. Arguments --------- db Database. data ASCII table. feature_cols List of column names of the features. If empty, all non-label and non-ID columns will be used. label_col Column name of the labels. ids List of instance IDs. """ # Read in features. columns = data.keys() if not feature_cols: # If there are no features given, use all columns. feature_cols = [c for c in columns if c != label_col] # This converts the features from a table to an array. features = data[feature_cols].as_array() features = features.view(numpy.float64).reshape(features.shape + (-1,)) # Read in labels. labels = numpy.array( data[label_col]).reshape((1, -1, 1)) # We want to support multiple labellers in the future, but currently # don't. So every labeller is the same, ID = 0. labeller_ids = [0] # Encode labels. if self.encode_labels: labels = numpy.apply_along_axis( self.label_encoder.fit_transform, axis=1, arr=labels.reshape(labels.shape[:2]) ).reshape(labels.shape) # Write to database. db.write_features(ids, features) db.write_labels(labeller_ids, ids, labels) def __enter__(self): self._tempdir = tempfile.TemporaryDirectory(prefix='acton') # Read the whole file into a DB. self._db_filepath = os.path.join(self._tempdir.name, 'db.h5') data = io_ascii.read(self.path) ids = list(range(len(data[self.label_col]))) max_label_len = max(len(str(i)) for i in data[self.label_col]) label_dtype = '<S{}'.format(max_label_len) self._db = ManagedHDF5Database( self._db_filepath, label_dtype=label_dtype, feature_dtype='float64') self._db.__enter__() try: # We want to handle the encoding ourselves. self._db_from_ascii(self._db, data, self.feature_cols, self.label_col, ids, encode_labels=False) except TypeError: # Encoding isn't supported in the underlying database. self._db_from_ascii(self._db, data, self.feature_cols, self.label_col, ids) return self def __exit__(self, exc_type: Exception, exc_val: object, exc_tb: Traceback): self._db.__exit__(exc_type, exc_val, exc_tb) self._tempdir.cleanup() delattr(self, '_db')
[docs] def read_features(self, ids: Sequence[int]) -> numpy.ndarray: """Reads feature vectors from the database. Parameters ---------- ids Iterable of IDs. Returns ------- numpy.ndarray N x D array of feature vectors. """ return self._db.read_features(ids)
[docs] def read_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int]) -> numpy.ndarray: """Reads label vectors from the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. Returns ------- numpy.ndarray T x N x F array of label vectors. """ # N.B. Labels are encoded in _db_from_ascii. return self._db.read_labels(labeller_ids, instance_ids)
[docs] def write_features(self, ids: Sequence[int], features: numpy.ndarray): raise NotImplementedError('Cannot write to read-only database.')
[docs] def write_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int], labels: numpy.ndarray): raise NotImplementedError('Cannot write to read-only database.')
[docs] def get_known_instance_ids(self) -> List[int]: """Returns a list of known instance IDs. Returns ------- List[str] A list of known instance IDs. """ return self._db.get_known_instance_ids()
[docs] def get_known_labeller_ids(self) -> List[int]: """Returns a list of known labeller IDs. Returns ------- List[str] A list of known labeller IDs. """ return self._db.get_known_labeller_ids()
[docs]class PandasReader(Database): """Reads HDF5 databases. Attributes ---------- feature_cols : List[str] List of feature datasets. label_col : str Name of label dataset. n_features : int Number of features. n_instances : int Number of instances. n_labels : int Number of labels per instance. path : str Path to HDF5 file. encode_labels : bool Whether to encode labels as integers. label_encoder : sklearn.preprocessing.LabelEncoder Encodes labels as integers. _df : pandas.DataFrame Pandas dataframe. """ def __init__(self, path: str, feature_cols: List[str], label_col: str, key: str, encode_labels: bool=True, label_encoder: sklearn.preprocessing.LabelEncoder=None): """ Parameters ---------- path Path to HDF5 file. feature_cols List of feature columns. If none are specified, then all non-label, non-ID columns will be used. label_col Name of label dataset. key Pandas key. encode_labels Whether to encode labels as integers. label_encoder Encodes labels as integers. If not specified, the label column will be read and a label encoding generated. """ self.path = path self.feature_cols = feature_cols self.label_col = label_col self.key = key self._df = pandas.read_hdf(self.path, self.key) self.encode_labels = encode_labels self.label_encoder = label_encoder if self.label_encoder and not self.encode_labels: raise ValueError('label_encoder specified but encode_labels is ' 'False') if self.label_encoder is None: self.label_encoder = sklearn.preprocessing.LabelEncoder() if not self.feature_cols: self.feature_cols = [k for k in self._df.keys() if k != self.label_col] self.n_instances = len(self._df[self.label_col]) self.n_features = len(self.feature_cols)
[docs] def to_proto(self) -> DatabasePB: """Serialises this database as a protobuf. Returns ------- DatabasePB Protobuf representing this database. """ proto = DatabasePB() proto.path = self.path proto.class_name = 'PandasReader' db_kwargs = { 'feature_cols': self.feature_cols, 'label_col': self.label_col, 'key': self.key, 'encode_labels': self.encode_labels} for key, value in db_kwargs.items(): kwarg = proto.kwarg.add() kwarg.key = key kwarg.value = json.dumps(value) proto.label_encoder.CopyFrom(serialise_encoder(self.label_encoder)) return proto
def __enter__(self): return self def __exit__(self, exc_type: Exception, exc_val: object, exc_tb: Traceback): delattr(self, '_df')
[docs] def read_features(self, ids: Sequence[int]) -> numpy.ndarray: """Reads feature vectors from the database. Parameters ---------- ids Iterable of IDs. Returns ------- numpy.ndarray N x D array of feature vectors. """ # TODO(MatthewJA): Optimise this. # Allocate output features array. features = numpy.zeros((len(ids), self.n_features)) # For each ID, get the corresponding features. for out_index, id_ in enumerate(ids): sel = self._df.iloc[id_] for feature_index, feature in enumerate(self.feature_cols): features[out_index, feature_index] = sel[feature] return features
[docs] def read_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int]) -> numpy.ndarray: """Reads label vectors from the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. Returns ------- numpy.ndarray T x N x 1 array of label vectors. """ # Draw a label to get the dtype. dtype = type(self._df.iloc[0][self.label_col]) # Allocate output labels array. labels = numpy.zeros( (len(labeller_ids), len(instance_ids), 1), dtype=dtype) if len(labeller_ids) > 1: raise NotImplementedError('Multiple labellers not yet supported.') # For each ID, get the corresponding labels. for out_index, id_ in enumerate(instance_ids): sel = self._df.iloc[int(id_)] labels[0, out_index, 0] = sel[self.label_col] if labels.shape[2] != 1: raise NotImplementedError('Multidimensional labels not currently ' 'supported.') # Encode labels. if self.encode_labels: labels = numpy.apply_along_axis( self.label_encoder.fit_transform, axis=1, arr=labels.reshape(labels.shape[:2]) ).reshape(labels.shape) return labels
[docs] def write_features(self, ids: Sequence[int], features: numpy.ndarray): raise PermissionError('Cannot write to read-only database.')
[docs] def write_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int], labels: numpy.ndarray): raise PermissionError('Cannot write to read-only database.')
[docs] def get_known_instance_ids(self) -> List[int]: """Returns a list of known instance IDs. Returns ------- List[str] A list of known instance IDs. """ return [i for i in range(self.n_instances)]
[docs] def get_known_labeller_ids(self) -> List[int]: """Returns a list of known labeller IDs. Returns ------- List[str] A list of known labeller IDs. """ raise NotImplementedError()
[docs]class FITSReader(Database): """Reads FITS databases. Attributes ---------- hdu_index : int Index of HDU in the FITS file. feature_cols : List[str] List of feature columns. label_col : str Name of label column. n_features : int Number of features. n_instances : int Number of instances. n_labels : int Number of labels per instance. path : str Path to FITS file. encode_labels : bool Whether to encode labels as integers. label_encoder : sklearn.preprocessing.LabelEncoder Encodes labels as integers. _hdulist : astropy.io.fits.HDUList FITS HDUList. """ def __init__(self, path: str, feature_cols: List[str], label_col: str, hdu_index: int=1, encode_labels: bool=True, label_encoder: sklearn.preprocessing.LabelEncoder=None): """ Parameters ---------- path Path to FITS file. feature_cols List of feature columns. If none are specified, then all non-label, non-ID columns will be used. label_col Name of label dataset. hdu_index Index of HDU in the FITS file. Default is 1, i.e., the first extension in the FITS file. encode_labels Whether to encode labels as integers. label_encoder Encodes labels as integers. If not specified, the label column will be read and a label encoding generated. """ self.path = path self.feature_cols = feature_cols self.label_col = label_col self.hdu_index = hdu_index self.encode_labels = encode_labels self.label_encoder = label_encoder if self.label_encoder and not self.encode_labels: raise ValueError('label_encoder specified but encode_labels is ' 'False') if self.label_encoder is None: self.label_encoder = sklearn.preprocessing.LabelEncoder() # These will be set when the FITS file is opened. self.n_instances = None self.n_features = None
[docs] def to_proto(self) -> DatabasePB: """Serialises this database as a protobuf. Returns ------- DatabasePB Protobuf representing this database. """ proto = DatabasePB() proto.path = self.path proto.class_name = 'FITSReader' db_kwargs = { 'feature_cols': self.feature_cols, 'label_col': self.label_col, 'hdu_index': self.hdu_index, 'encode_labels': self.encode_labels} for key, value in db_kwargs.items(): kwarg = proto.kwarg.add() kwarg.key = key kwarg.value = json.dumps(value) proto.label_encoder.CopyFrom(serialise_encoder(self.label_encoder)) return proto
def __enter__(self): self._hdulist = io_fits.open(self.path) # If we haven't specified columns, use all except the label column. cols = self._hdulist[self.hdu_index].columns.names if not self.feature_cols: self.feature_cols = [k for k in cols if k != self.label_col] self.n_features = len(self.feature_cols) self.n_instances = \ self._hdulist[self.hdu_index].data[self.label_col].ravel().shape[0] return self def __exit__(self, exc_type: Exception, exc_val: object, exc_tb: Traceback): self._hdulist.close() delattr(self, '_hdulist')
[docs] def read_features(self, ids: Sequence[int]) -> numpy.ndarray: """Reads feature vectors from the database. Parameters ---------- ids Iterable of IDs. Returns ------- numpy.ndarray N x D array of feature vectors. """ # TODO(MatthewJA): Optimise this. # Allocate output features array. features = numpy.zeros((len(ids), self.n_features)) for f_index, col in enumerate(self.feature_cols): col = self._hdulist[self.hdu_index].data[col] features[:, f_index] = col[ids] return features
[docs] def read_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int]) -> numpy.ndarray: """Reads label vectors from the database. Parameters ---------- labeller_ids Iterable of labeller IDs. instance_ids Iterable of instance IDs. Returns ------- numpy.p T x N x 1 array of label vectors. """ label_col = self._hdulist[self.hdu_index].data[self.label_col] labels = label_col[instance_ids].reshape((1, -1, 1)) # Encode labels. if self.encode_labels: labels = numpy.apply_along_axis( self.label_encoder.fit_transform, axis=1, arr=labels.reshape(labels.shape[:2]) ).reshape(labels.shape) return labels
[docs] def write_features(self, ids: Sequence[int], features: numpy.ndarray): raise PermissionError('Cannot write to read-only database.')
[docs] def write_labels(self, labeller_ids: Sequence[int], instance_ids: Sequence[int], labels: numpy.ndarray): raise PermissionError('Cannot write to read-only database.')
[docs] def get_known_instance_ids(self) -> List[int]: """Returns a list of known instance IDs. Returns ------- List[str] A list of known instance IDs. """ return [i for i in range(self.n_instances)]
[docs] def get_known_labeller_ids(self) -> List[int]: """Returns a list of known labeller IDs. Returns ------- List[str] A list of known labeller IDs. """ raise NotImplementedError()
# For safe string-based access to database classes. DATABASES = { 'ASCIIReader': ASCIIReader, 'HDF5Reader': HDF5Reader, 'FITSReader': FITSReader, 'ManagedHDF5Database': ManagedHDF5Database, 'PandasReader': PandasReader, }