Source code for acton.acton

"""Main processing script for Acton."""

import logging
import time
from typing import Iterable, List, TypeVar

import acton.database
import acton.labellers
import acton.predictors
import acton.proto.io
import acton.proto.wrappers
import acton.recommenders
import numpy
import pandas
import sklearn.linear_model
import sklearn.metrics
import sklearn.model_selection
import sklearn.preprocessing

T = TypeVar('T')


[docs]def draw(n: int, lst: List[T], replace: bool=True) -> List[T]: """Draws n random elements from a list. Parameters --------- n Number of elements to draw. lst List of elements to draw from. replace Draw with replacement. Returns ------- List[T] n random elements. """ # While we use replace=False generally in this codebase, the NumPy default # is True - so we should use that here. return list(numpy.random.choice(lst, size=n, replace=replace))
[docs]def validate_predictor(predictor: str): """Raises an exception if the predictor is not valid. Parameters ---------- predictor Name of predictor. Raises ------ ValueError """ if predictor not in acton.predictors.PREDICTORS: raise ValueError('Unknown predictor: {}. predictors are one of ' '{}.'.format(predictor, acton.predictors.PREDICTORS.keys()))
[docs]def validate_recommender(recommender: str): """Raises an exception if the recommender is not valid. Parameters ---------- recommender Name of recommender. Raises ------ ValueError """ if recommender not in acton.recommenders.RECOMMENDERS: raise ValueError('Unknown recommender: {}. Recommenders are one of ' '{}.'.format(recommender, acton.recommenders.RECOMMENDERS.keys()))
[docs]def simulate_active_learning( ids: Iterable[int], db: acton.database.Database, db_kwargs: dict, output_path: str, n_initial_labels: int=10, n_epochs: int=10, test_size: int=0.2, recommender: str='RandomRecommender', predictor: str='LogisticRegression', n_recommendations: int=1): """Simulates an active learning task. Parameters --------- ids IDs of instances in the unlabelled pool. db Database with features and labels. db_kwargs Keyword arguments for the database constructor. output_path Path to output intermediate predictions to. Will be overwritten. n_initial_labels Number of initial labels to draw. n_epochs Number of epochs. test_size Percentage size of testing set. recommender Name of recommender to make recommendations. predictor Name of predictor to make predictions. n_recommendations Number of recommendations to make at once. """ validate_recommender(recommender) validate_predictor(predictor) # Seed RNG. numpy.random.seed(0) # Bytestring describing this run. metadata = '{} | {}'.format(recommender, predictor).encode('ascii') # Split into training and testing sets. logging.debug('Found {} instances.'.format(len(ids))) logging.debug('Splitting into training/testing sets.') train_ids, test_ids = sklearn.model_selection.train_test_split( ids, test_size=test_size) test_ids.sort() # Set up predictor, labeller, and recommender. # TODO(MatthewJA): Handle multiple labellers better than just averaging. predictor_name = predictor # For saving. predictor = acton.predictors.PREDICTORS[predictor](db=db, n_jobs=-1) labeller = acton.labellers.DatabaseLabeller(db) recommender = acton.recommenders.RECOMMENDERS[recommender](db=db) # Draw some initial labels. logging.debug('Drawing initial labels.') recommendations = draw(n_initial_labels, train_ids, replace=False) logging.debug('Recommending: {}'.format(recommendations)) # This will store all IDs of things we have already labelled. labelled_ids = [] # This will store all the corresponding labels. labels = numpy.zeros((0, 1)) # Simulation loop. logging.debug('Writing protobufs to {}.'.format(output_path)) writer = acton.proto.io.write_protos(output_path, metadata=metadata) next(writer) # Prime the coroutine. for epoch in range(n_epochs): logging.info('Epoch {}/{}'.format(epoch + 1, n_epochs)) # Label the recommendations. logging.debug('Labelling recommendations.') new_labels = numpy.array([ labeller.query(id_) for id_ in recommendations]).reshape((-1, 1)) labelled_ids.extend(recommendations) logging.debug('Sorting label IDs.') labelled_ids.sort() labels = numpy.concatenate([labels, new_labels], axis=0) # Here, we would write the labels to the database, but they're already # there since we're just reading them from there anyway. pass # Pass the labels to the predictor. logging.debug('Fitting predictor.') then = time.time() predictor.fit(labelled_ids) logging.debug('(Took {:.02} s.)'.format(time.time() - then)) # Evaluate the predictor. logging.debug( 'Making predictions (reference, n = {}).'.format(len(test_ids))) then = time.time() test_pred, _test_var = predictor.reference_predict(test_ids) logging.debug('(Took {:.02} s.)'.format(time.time() - then)) logging.debug(test_pred) # Construct a protobuf for outputting predictions. proto = acton.proto.wrappers.Predictions.make( test_ids, labelled_ids, test_pred.transpose([1, 0, 2]), # T x N x C -> N x T x C predictor=predictor_name, db=db) # Then write them to a file. logging.debug('Writing predictions.') writer.send(proto.proto) # Pass the predictions to the recommender. unlabelled_ids = list(set(ids) - set(labelled_ids)) if not unlabelled_ids: logging.info('Labelled all instances.') break unlabelled_ids.sort() logging.debug( 'Making predictions (unlabelled, n = {}).'.format( len(unlabelled_ids))) then = time.time() predictions, _variances = predictor.predict(unlabelled_ids) logging.debug('(Took {:.02} s.)'.format(time.time() - then)) logging.debug('Making recommendations.') recommendations = recommender.recommend( unlabelled_ids, predictions, n=n_recommendations) logging.debug('Recommending: {}'.format(recommendations)) return 0
[docs]def try_pandas(data_path: str) -> bool: """Guesses if a file is a pandas file. Parameters ---------- data_path Path to file. Returns ------- bool True if the file is pandas. """ try: pandas.read_hdf(data_path) except ValueError: return False return True
[docs]def get_DB( data_path: str, pandas_key: str=None) -> (acton.database.Database, dict): """Gets a Database that will handle the given data table. Parameters ---------- data_path Path to file. pandas_key Key for pandas HDF5. Specify iff using pandas. Returns ------- Database Database that will handle the given data table. dict Keyword arguments for the Database constructor. """ db_kwargs = {} is_fits = data_path.endswith('.fits') is_ascii = not data_path.endswith('.h5') if is_fits: logging.debug('Reading {} as FITS.'.format(data_path)) DB = acton.database.FITSReader elif is_ascii: logging.debug('Reading {} as ASCII.'.format(data_path)) DB = acton.database.ASCIIReader else: # Assume HDF5. is_pandas = bool(pandas_key) if is_pandas: logging.debug('Reading {} as pandas.'.format(data_path)) DB = acton.database.PandasReader db_kwargs['key'] = pandas_key else: logging.debug('Reading {} as HDF5.'.format(data_path)) DB = acton.database.HDF5Reader return DB, db_kwargs
[docs]def main(data_path: str, feature_cols: List[str], label_col: str, output_path: str, n_epochs: int=10, initial_count: int=10, recommender: str='RandomRecommender', predictor: str='LogisticRegression', pandas_key: str='', n_recommendations: int=1): """Simulate an active learning experiment. Parameters --------- data_path Path to data file. feature_cols List of column names of the features. If empty, all non-label and non-ID columns will be used. label_col Column name of the labels. output_path Path to output file. Will be overwritten. n_epochs Number of epochs to run. initial_count Number of random instances to label initially. recommender Name of recommender to make recommendations. predictor Name of predictor to make predictions. pandas_key Key for pandas HDF5. Specify iff using pandas. n_recommendations Number of recommendations to make at once. """ DB, db_kwargs = get_DB(data_path, pandas_key=pandas_key) db_kwargs['feature_cols'] = feature_cols db_kwargs['label_col'] = label_col with DB(data_path, **db_kwargs) as reader: return simulate_active_learning(reader.get_known_instance_ids(), reader, db_kwargs, output_path, n_epochs=n_epochs, n_initial_labels=initial_count, recommender=recommender, predictor=predictor, n_recommendations=n_recommendations)
[docs]def predict( labels: acton.proto.wrappers.LabelPool, predictor: str) -> acton.proto.wrappers.Predictions: """Train a predictor and predict labels. Parameters --------- labels IDs of labelled instances. predictor Name of predictor to make predictions. """ validate_predictor(predictor) with labels.DB() as db: ids = db.get_known_instance_ids() train_ids = labels.ids predictor_name = predictor predictor = acton.predictors.PREDICTORS[predictor](db=db, n_jobs=-1) logging.debug('Training predictor with IDs: {}'.format(train_ids)) predictor.fit(train_ids) predictions, _variances = predictor.reference_predict(ids) # Construct a protobuf for outputting predictions. proto = acton.proto.wrappers.Predictions.make( ids, train_ids, predictions.transpose([1, 0, 2]), # T x N x C -> N x T x C predictor=predictor_name, db=db) return proto
[docs]def recommend( predictions: acton.proto.wrappers.Predictions, recommender: str='RandomRecommender', n_recommendations: int=1) -> acton.proto.wrappers.Recommendations: """Recommends instances to label based on predictions. Parameters --------- recommender Name of recommender to make recommendations. n_recommendations Number of recommendations to make at once. Default 1. Returns ------- acton.proto.wrappers.Recommendations """ validate_recommender(recommender) # Make a list of IDs that do not have labels and the indices of the # corresponding predictions. ids = [] indices = [] has_labels = set(predictions.labelled_ids) for pred_index, id_ in enumerate(predictions.predicted_ids): if id_ not in has_labels: ids.append(id_) indices.append(pred_index) # Array of predictions for unlabelled instances. predictions_array = predictions.predictions[:, indices] with predictions.DB() as db: recommender_name = recommender recommender = acton.recommenders.RECOMMENDERS[recommender](db=db) recommendations = recommender.recommend( ids, predictions_array, n=n_recommendations) logging.debug('Recommending: {}'.format(list(recommendations))) # Construct a protobuf for outputting recommendations. proto = acton.proto.wrappers.Recommendations.make( [int(r) for r in recommendations], predictions.labelled_ids, recommender=recommender_name, db=db) return proto
[docs]def label(recommendations: acton.proto.wrappers.Recommendations ) -> acton.proto.wrappers.LabelPool: """Simulates a labelling task. Parameters --------- data_path Path to data file. feature_cols List of column names of features. If empty, all columns will be used. label_col Column name of the labels. pandas_key Key for pandas HDF5. Specify iff using pandas. Returns ------- acton.proto.wrappers.LabelPool """ # We'd store the labels here, except that we just read them from the DB. # Instead, we'll record that we've labelled them. # # labeller = acton.labellers.DatabaseLabeller(db) # # labels = [labeller.query(id_) for id_ in ids] # TODO(MatthewJA): Consider optimising this (doesn't really need a sort). ids_to_label = recommendations.recommendations labelled_ids = recommendations.labelled_ids logging.debug('Recommended IDs: {}'.format(ids_to_label)) logging.debug('Already labelled IDs: {}'.format(labelled_ids)) ids = sorted(set(ids_to_label) | set(labelled_ids)) logging.debug('Now labelled IDs: {}'.format(ids)) # Return a protobuf. with recommendations.DB() as db: proto = acton.proto.wrappers.LabelPool.make(ids=ids, db=db) return proto