#!/usr/bin/env python
# coding=utf-8
#
#
from __future__ import unicode_literals
import os
"""\
Classification models using scikit-learn. The main objects here are Model
and SplitModel.
"""
from alex.components.nlg.tectotpl.core.util import file_stream
from alex.components.nlg.tectotpl.core.log import log_info
# Sklearn renamed zero_one_loss in new version.
try:
from sklearn.metrics import zero_one_loss
except:
from sklearn.metrics import zero_one_loss as zero_one_score
from alex.components.nlg.tectotpl.tool.ml.dataset import DataSet
from sklearn.dummy import DummyClassifier
from alex.components.nlg.tectotpl.core.exception import RuntimeException
from alex.components.nlg.tectotpl.tool.cluster import Job
import numpy as np
import pickle
import marshal
import re
import types
import codecs
import sys
__author__ = "Ondřej Dušek"
__date__ = "2012"
[docs]class AbstractModel(object):
"""\
Abstract ancestor of different model classes
"""
def __init__(self, config):
"""\
Initialize a few attributes from the configuration.
"""
# this will store the training data headers for value ordering
self.data_headers = None
# attribute pre-selection (remove class attribute from select_attr)
self.class_attr = config['class_attr']
self.select_attr = config.get('select_attr', [])
if self.class_attr in self.select_attr:
self.select_attr.remove(self.class_attr)
# part of the training data to be used
self.train_part = config.get('train_part', 1)
[docs] def evaluate(self, test_file, encoding='UTF-8', classif_file=None):
"""\
Evaluate on the given test data file. Return accuracy.
If classif_file is set, save the classification results to this file.
"""
test = DataSet()
test.load_from_arff(test_file, encoding)
values = self.classify(test)
golden = self.get_classes(test, dtype=None)
if classif_file is not None:
classif = DataSet()
classif.load_from_vect(test.get_attrib(self.class_attr), values)
classif.rename_attrib(self.class_attr, self.PREDICTED)
test.merge(classif)
test.save_to_arff(classif_file, encoding)
return zero_one_score(golden, values)
@staticmethod
[docs] def load_from_file(model_file):
"""\
Load the model from a pickle file or stream
(supports GZip compression).
"""
log_info('Loading model from file ' + str(model_file))
fh = file_stream(model_file, mode='rb', encoding=None)
unpickler = pickle.Unpickler(fh)
model = unpickler.load()
fh.close()
log_info('Model loaded successfully.')
return model
[docs] def load_training_set(self, filename, encoding='UTF-8'):
"""\
Load the given training data set into memory and strip it if
configured to via the train_part parameter.
"""
log_info('Loading training data set from ' + str(filename) + '...')
train = DataSet()
train.load_from_arff(filename, encoding)
if self.train_part < 1:
train = train.subset(0, int(round(self.train_part * len(train))),
copy=False)
return train
[docs] def save_to_file(self, model_file):
"""\
Save the model to a pickle file or stream (supports GZip compression).
"""
log_info('Saving model to file ' + str(model_file))
fh = file_stream(model_file, mode='wb', encoding=None)
pickle.Pickler(fh, pickle.HIGHEST_PROTOCOL).dump(self)
fh.close()
log_info('Model successfully saved.')
[docs] def get_classes(self, data, dtype=int):
"""\
Return a vector of class values from the given DataSet.
If dtype is int, the integer values are returned. If dtype is
None, the string values are returned.
"""
return np.array(data.attrib_as_vect(self.class_attr, dtype=dtype))
[docs] def classify(self, instances):
"""\
This must be implemented in derived classes.
"""
raise NotImplementedError()
[docs]class Model(AbstractModel):
# predicted class name
PREDICTED = 'PREDICTED'
def __init__(self, config):
"""\
Initialize the model with the given configuration.
"""
super(Model, self).__init__(config)
# vectorization and filtering settings
self.filter_attr = config.get('filter_attr')
self.vectorizer = config.get('vectorizer')
self.vectorizer_trained = False
self.feature_filter = config.get('feature_filter')
self.feature_filter_trained = False
self.use_weights = config.get('use_weights', False)
# classification settings
self.classifier = self.construct_classifier(config)
self.classifier_trained = False
self.postprocess = config.get('postprocess')
[docs] def construct_classifier(self, cfg):
"""\
Given the config file, construct the classifier (based on the
'classifier' or 'classifier_class'/'classifier_params' settings.
Defaults to DummyClassifier.
"""
if 'classifier' in cfg:
return cfg['classifier']
elif 'classifier_class' in cfg:
if 'classifier_params' in cfg:
return cfg['classifier_class'](**cfg['classifier_params'])
else:
return cfg['classifier_class']()
else:
return DummyClassifier()
@staticmethod
[docs] def create_training_job(config, work_dir, train_file,
name=None, memory=8, encoding='UTF-8'):
"""\
Submit a training process on the cluster which will save the
model to a pickle. Return the submitted job and the future location of
the model pickle.
train_file cannot be a stream, it must be an actual file.
"""
# purge name
if name is None:
name = 'TR-' + re.sub(r'[^A-Za-z0-9_]', '_', train_file)
else:
name = re.sub(r'[^A-Za-z0-9_]', '_', name)
# create working directory, if not existing
if not os.path.isdir(work_dir):
os.mkdir(work_dir)
train_file = os.path.abspath(train_file)
# generate model file name
model_file = os.path.abspath(os.path.join(work_dir,
name + '-model.pickle.gz'))
config_pickle = os.path.abspath(os.path.join(work_dir,
name + '-cfg.pickle.gz'))
# create the configuration pickle
fh = file_stream(config_pickle, mode='wb', encoding=None)
pickle.Pickler(fh, pickle.HIGHEST_PROTOCOL).dump(config)
fh.close()
# create the job
job = Job(name=name, work_dir=work_dir)
job.code = "fh = file_stream('" + config_pickle + \
"', mode='rb', encoding=None)\n" + \
"cfg = pickle.Unpickler(fh).load()\n" + \
"fh.close()\n" + \
"model = Model(cfg)\n" + \
"model.train('" + train_file + "', encoding='" + \
encoding + "')\n" \
"model.save_to_file('" + model_file + "')\n"
job.header += "from alex.components.nlg.tectotpl.tool.ml.model import Model\n" + \
"import pickle\n" + \
"from alex.components.nlg.tectotpl.core.util import file_stream\n"
return job, model_file
[docs] def train_on_data(self, train):
"""\
Train model on the specified training data set (which must be a loaded
DataSet object).
"""
log_info('Preparing data set...')
self.data_headers = train.get_headers()
train_vect = self.__vectorize(train)
train_classes = self.get_classes(train)
# if all the training data have the same class, use a dummy classifier
if train.get_attrib(self.class_attr).num_values == 1:
self.feature_filter = None
self.classifier = DummyClassifier()
# filter features
log_info('Filtering...')
train_filt = self.__filter_features(train_vect, train_classes)
# train the classifier
log_info('Training...')
if self.use_weights:
self.classifier.fit(train_filt, train_classes,
sample_weight=train.inst_weights)
else:
self.classifier.fit(train_filt, train_classes)
self.classifier_trained = True
log_info('Training done.')
[docs] def train(self, train_file, encoding='UTF-8'):
"""\
Train the model on the specified training data file.
"""
self.train_on_data(self.load_training_set(train_file, encoding))
[docs] def classify(self, instances):
"""\
Classify a set of instances (possibly one member).
"""
# prepare for classification
instances, nolist = self.check_classification_input(instances)
if not instances:
return instances
# vectorize and filter the instances
inst_vect = self.__vectorize(instances)
if self.feature_filter is not None:
inst_filt = self.__filter_features(inst_vect)
else:
inst_filt = inst_vect
# classify
values = self.classifier.predict(inst_filt)
# return the result
class_attr = self.data_headers.get_attrib(self.class_attr)
values = [class_attr.value(val) for val in values]
# (optional) post-processing
if self.postprocess:
values = [self.postprocess(inst, val)
for inst, val in zip(instances, values)]
if nolist:
return values[0]
return values
def __vectorize(self, data):
"""\
Train vectorization and subsequently vectorize. Accepts a DataSet
or a list of dictionaries to be vectorized.
"""
# no vectorization performed, only converted to matrix
if self.vectorizer is None:
if not isinstance(data, DataSet):
data_set = DataSet()
data_set.load_from_dict(data)
data = data_set
data.match_headers(self.data_headers, add_values=True)
# TODO pre-filtering here?
return data.as_bunch(target=self.class_attr,
select_attrib=self.select_attr).data
# vectorization needed: converted to dictionary
# and passed to the vectorizer
if isinstance(data, DataSet):
data = data.as_dict(select_attrib=self.select_attr,
mask_attrib=self.class_attr)
else:
data = [{key: val for key, val in inst.items()
if key != self.class_attr and key in self.select_attr}
for inst in data]
# pre-filter attributes if filter_attr is set
if self.filter_attr:
data = [{key: val for key, val in inst.items()
if self.filter_attr(key, val)} for inst in data]
if not self.vectorizer_trained:
self.vectorizer.fit(data)
self.vectorizer_trained = True
return self.vectorizer.transform(data).tocsr()
def __filter_features(self, data, classes=None):
"""\
Filter features according to the pre-selected filter. Return the
same set of features if the filter is not set.
"""
if self.feature_filter is None:
return data
if not self.feature_filter_trained:
if classes is None:
raise RuntimeException('Classes must be given to ' +
'train a feature filter!')
self.feature_filter.fit(data, classes)
self.feature_filter_trained = True
return self.feature_filter.transform(data)
def __marshal_member(self, state, key):
"""\
Check for a key lambda function under the specified key
and marshal it if needed.
"""
if key in state and hasattr(state[key], '__call__'):
try:
code = state[key].func_code
state[key] = marshal.dumps(code)
except (AttributeError, ValueError):
# try to use original version if marshaling fails
pass
def __demarshal_member(self, state, key):
"""\
Check for a key lambda function under the specified key
and de-marshal it if needed.
"""
if key in state:
try:
code = marshal.loads(state[key])
state[key] = types.FunctionType(code, globals())
except (TypeError, ValueError):
# try to use original version if demarshaling fails
pass
def __getstate__(self):
"""\
Check and marshal member lambda functions.
"""
state = self.__dict__
self.__marshal_member(state, 'filter_attr')
self.__marshal_member(state, 'postprocess')
return state
def __setstate__(self, state):
"""\
Check and de-marshal member lambda functions.
"""
self.__demarshal_member(state, 'filter_attr')
self.__demarshal_member(state, 'postprocess')
if 'postprocess' not in state:
state['postprocess'] = None
self.__dict__ = state
[docs]class SplitModel(AbstractModel):
"""\
A model that's actually composed of several Model-s.
"""
def __init__(self, config):
"""\
Just store the configuration, be prepared for training.
"""
super(SplitModel, self).__init__(config)
# create storage for split models
self.divide_func = config['divide_func']
self.config = config
self.models = {}
self.backoff_model = None
self.trained = False
[docs] def train(self, train_file, work_dir, memory=8, encoding='UTF-8'):
"""\
Read training data, split them and train the individual models
(in cluster jobs).
"""
# load the entire data set
train = self.load_training_set(train_file, encoding)
self.data_headers = train.get_headers()
# train a backoff model
log_info('Training a backoff model...')
self.backoff_model = self.__train_backoff_model(train)
# split it
log_info('Split...')
train_split = train.split(eval(self.divide_func), keep_copy=False)
jobs = []
model_files = {}
# save training files and create training jobs
for key, subset in train_split.iteritems():
fn = re.sub(r'(.arff(.gz)?)?$', '-' + key + '.arff.gz', train_file)
fn = os.path.join(work_dir, os.path.basename(fn))
subset.save_to_arff(fn, encoding)
job, model_file = Model.create_training_job(self.config, work_dir,
fn, memory=memory,
encoding=encoding)
jobs.append(job)
model_files[key] = model_file
# submit the training jobs and wait for all of them
log_info('Submitting training jobs...')
for job in jobs:
job.submit()
log_info('Waiting for jobs...')
for job in jobs:
job.wait()
# load all models
log_info('Training complete. Assembling model files...')
for key, model_file in model_files.iteritems():
self.models[key] = Model.load_from_file(model_file)
self.trained = True
log_info('Training done.')
[docs] def classify(self, instances):
"""\
Classify a set of instances.
"""
# prepare for classification
instances, nolist = self.check_classification_input(instances)
if not instances:
return instances
# classify each instance with the respective model
# TODO: bulk classify
results = []
divide_func = eval(self.divide_func)
for instance in instances:
model_key = divide_func(0, instance)
if model_key in self.models:
results.append(self.models[model_key].classify(instance))
else:
results.append(self.backoff_model.classify(instance))
# return the results
if nolist:
return results[0]
return results
def __train_backoff_model(self, train):
"""\
Train a DummyClassifier back-off on the given training data.
"""
config = {'class_attr': self.class_attr, 'select_attr': []}
model = Model(config)
model.train_on_data(train)
return model