pipeline_titanic.py [download]
#!/usr/bin/env python3
import sys
import argparse
import logging
import os.path
import pandas as pd
import numpy as np
import sklearn.tree
import sklearn.preprocessing
import sklearn.pipeline
import sklearn.base
import sklearn.metrics
import sklearn.impute
import joblib
import matplotlib.pyplot as plt
class PipelineNoop(sklearn.base.BaseEstimator, sklearn.base.TransformerMixin):
"""
Just a placeholder with no actions on the data.
"""
def __init__(self):
return
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X
#
# Pipeline member to display the data at this stage of the transformation.
#
class Printer(sklearn.base.BaseEstimator, sklearn.base.TransformerMixin):
def __init__(self, title):
self.title = title
return
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
print("{}::type(X)".format(self.title), type(X))
print("{}::X.shape".format(self.title), X.shape)
if not isinstance(X, pd.DataFrame):
print("{}::X[0]".format(self.title), X[0])
print("{}::X".format(self.title), X)
return X
class DataFrameSelector(sklearn.base.BaseEstimator, sklearn.base.TransformerMixin):
def __init__(self, do_predictors=True, do_numerical=True):
# Titanic fields
#Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
self.mCategoricalPredictors = ["Sex", "Pclass", "Embarked"]
self.mNumericalPredictors = ["Age", "SibSp", "Parch", "Fare"]
self.mLabels = ["Survived"]
#
# Not currently using:
# "Pclass"
# "Name"
# "Cabin"
# "Ticket"
# "Embarked"
# "SibSp"
# "Parch"
# "Fare"
#
self.do_numerical = do_numerical
self.do_predictors = do_predictors
if do_predictors:
if do_numerical:
self.mAttributes = self.mNumericalPredictors
else:
self.mAttributes = self.mCategoricalPredictors
else:
self.mAttributes = self.mLabels
return
def fit( self, X, y=None ):
# no fit necessary
return self
def transform( self, X, y=None ):
# only keep columns selected
values = X[self.mAttributes]
return values
def get_test_filename(test_file, filename):
if test_file == "":
basename = get_basename(filename)
test_file = "{}-test.csv".format(basename)
return test_file
def get_basename(filename):
root, ext = os.path.splitext(filename)
dirname, basename = os.path.split(root)
logging.info("root: {} ext: {} dirname: {} basename: {}".format(root, ext, dirname, basename))
stub = "-train"
if basename[len(basename)-len(stub):] == stub:
basename = basename[:len(basename)-len(stub)]
return basename
def get_model_filename(model_file, filename):
if model_file == "":
basename = get_basename(filename)
model_file = "{}-model.joblib".format(basename)
return model_file
def get_data(filename):
"""
### Assumes column 0 is the instance index stored in the
### csv file. If no such column exists, remove the
### index_col=0 parameter.
Assumes the column named "Cabin" should be a interpreted
as a string, but Pandas can't figure that out on its own.
###Request missing values (blank cells) to be left as empty strings.
https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
"""
###, index_col=0
###, keep_default_na=False
data = pd.read_csv(filename, dtype={ "Cabin": str })
return data
def load_data(my_args, filename):
data = get_data(filename)
feature_columns, label_column = get_feature_and_label_names(my_args, data)
X = data[feature_columns]
y = data[label_column]
return X, y
def get_feature_and_label_names(my_args, data):
label_column = my_args.label
feature_columns = my_args.features
if label_column in data.columns:
label = label_column
else:
label = ""
features = []
if feature_columns is not None:
for feature_column in feature_columns:
if feature_column in data.columns:
features.append(feature_column)
# no features specified, so add all non-labels
if len(features) == 0:
for feature_column in data.columns:
if feature_column != label:
features.append(feature_column)
return features, label
def make_numerical_feature_pipeline(my_args):
items = []
items.append(("numerical-features-only", DataFrameSelector(do_predictors=True, do_numerical=True)))
if my_args.numerical_missing_strategy:
items.append(("missing-data", sklearn.impute.SimpleImputer(strategy=my_args.numerical_missing_strategy)))
if my_args.use_polynomial_features:
items.append(("polynomial-features", sklearn.preprocessing.PolynomialFeatures(degree=my_args.use_polynomial_features)))
if my_args.use_scaler:
items.append(("scaler", sklearn.preprocessing.StandardScaler()))
if my_args.print_preprocessed_data:
items.append(("printer", Printer("Numerical Preprocessing")))
numerical_pipeline = sklearn.pipeline.Pipeline(items)
return numerical_pipeline
def make_categorical_feature_pipeline(my_args):
items = []
items.append(("categorical-features-only", DataFrameSelector(do_predictors=True, do_numerical=False)))
if my_args.categorical_missing_strategy:
items.append(("missing-data", sklearn.impute.SimpleImputer(strategy=my_args.categorical_missing_strategy)))
###
### sklearn's decision tree classifier requires all input features to be numerical
### one hot encoding accomplishes this.
###
items.append(("encode-category-bits", sklearn.preprocessing.OneHotEncoder(categories='auto')))
if my_args.print_preprocessed_data:
items.append(("printer", Printer("Categorial Preprocessing")))
numerical_pipeline = sklearn.pipeline.Pipeline(items)
return numerical_pipeline
def make_feature_pipeline(my_args):
"""
Numerical features and categorical features are usually preprocessed
differently. We split them out here, preprocess them, then merge
the preprocessed features into one group again.
"""
items = []
items.append(("numerical", make_numerical_feature_pipeline(my_args)))
items.append(("categorical", make_categorical_feature_pipeline(my_args)))
pipeline = sklearn.pipeline.FeatureUnion(transformer_list=items)
return pipeline
def make_decision_tree_fit_pipeline(my_args):
items = []
items.append(("features", make_feature_pipeline(my_args)))
if my_args.print_preprocessed_data:
items.append(("printer", Printer("Final Preprocessing")))
items.append(("model", sklearn.tree.DecisionTreeClassifier(max_depth=3, max_leaf_nodes=20)))
return sklearn.pipeline.Pipeline(items)
def do_fit(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_decision_tree_fit_pipeline(my_args)
pipeline.fit(X, y)
model_file = get_model_filename(my_args.model_file, train_file)
joblib.dump(pipeline, model_file)
return
def get_feature_names(pipeline, X):
primary_feature_names = list(X.columns[:])
if 'polynomial-features' in pipeline['features'].named_steps:
secondary_powers = pipeline['features']['polynomial-features'].powers_
feature_names = []
for powers in secondary_powers:
s = ""
for i in range(len(powers)):
for j in range(powers[i]):
if len(s) > 0:
s += "*"
s += primary_feature_names[i]
feature_names.append(s)
logging.info("powers: {} s: {}".format(powers, s))
else:
logging.info("polynomial-features not in features: {}".format(pipeline['features'].named_steps))
feature_names = primary_feature_names
return feature_names
def get_scale_offset(pipeline, count):
if 'scaler' in pipeline['features'].named_steps:
scaler = pipeline['features']['scaler']
logging.info("scaler: {}".format(scaler))
logging.info("scale: {} mean: {} var: {}".format(scaler.scale_, scaler.mean_, scaler.var_))
theta_scale = 1.0 / scaler.scale_
intercept_offset = scaler.mean_ / scaler.scale_
else:
theta_scale = np.ones(count)
intercept_offset = np.zeros(count)
logging.info("scaler not in features: {}".format(pipeline['features'].named_steps))
return theta_scale, intercept_offset
def show_function(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
model_file = get_model_filename(my_args.model_file, train_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
X, y = load_data(my_args, train_file)
pipeline = joblib.load(model_file)
feature_names = get_feature_names(pipeline, X)
scale, offset = get_scale_offset(pipeline, len(feature_names))
features = pipeline['features']
X = features.transform(X)
regressor = pipeline['model']
intercept_offset = 0.0
for i in range(len(regressor.coef_)):
intercept_offset += regressor.coef_[i] * offset[i]
s = "{}".format(regressor.intercept_[0]-intercept_offset)
for i in range(len(regressor.coef_)):
if len(feature_names[i]) > 0:
t = "({}*{})".format(regressor.coef_[i]*scale[i], feature_names[i])
else:
t = "({})".format(regressor.coef_[i])
if len(s) > 0:
s += " + "
s += t
basename = get_basename(train_file)
print("{}: {}".format(basename, s))
return
def sklearn_metric(y, yhat):
cm = sklearn.metrics.confusion_matrix(y, yhat)
table = "+-----+-----+\n|{:4d} |{:4d} |\n+-----+-----+\n|{:4d} |{:4d} |\n+-----+-----+\n".format(cm[0][0], cm[1][0], cm[0][1], cm[1][1])
print(table)
print()
precision = sklearn.metrics.precision_score(y, yhat)
recall = sklearn.metrics.recall_score(y, yhat)
f1 = sklearn.metrics.f1_score(y, yhat)
print("precision: {}".format(precision))
print("recall: {}".format(recall))
print("f1: {}".format(f1))
return
def show_score(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
test_file = get_test_filename(my_args.test_file, train_file)
if not os.path.exists(test_file):
raise Exception("testing data file, '{}', does not exist.".format(test_file))
model_file = get_model_filename(my_args.model_file, train_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
X_train, y_train = load_data(my_args, train_file)
X_test, y_test = load_data(my_args, test_file)
pipeline = joblib.load(model_file)
basename = get_basename(train_file)
yhat_train = pipeline.predict(X_train)
print()
print("{}: train: ".format(basename))
print()
sklearn_metric(y_train, yhat_train)
print()
if my_args.show_test:
yhat_test = pipeline.predict(X_test)
print()
print("{}: train: ".format(basename))
print()
print()
sklearn_metric(y_test, yhat_test)
print()
return
def show_model(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
test_file = get_test_filename(my_args.test_file, train_file)
if not os.path.exists(test_file):
raise Exception("testing data file, '{}', does not exist.".format(test_file))
model_file = get_model_filename(my_args.model_file, train_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
pipeline = joblib.load(model_file)
tree = pipeline['model']
fig = plt.figure(figsize=(6, 6))
ax = fig.add_subplot(1, 1, 1)
sklearn.tree.plot_tree(tree, ax=ax)
fig.tight_layout()
fig.savefig("tree.png", dpi=300)
plt.close(fig)
return
def parse_args(argv):
parser = argparse.ArgumentParser(prog=argv[0], description='Fit Data With Linear Regression Using Pipeline')
parser.add_argument('action', default='DT',
choices=[ "DT", "score", "show-model" ],
nargs='?', help="desired action")
parser.add_argument('--train-file', '-t', default="", type=str, help="name of file with training data")
parser.add_argument('--test-file', '-T', default="", type=str, help="name of file with test data (default is constructed from train file name)")
parser.add_argument('--model-file', '-m', default="", type=str, help="name of file for the model (default is constructed from train file name when fitting)")
parser.add_argument('--random-seed', '-R', default=314159265,type=int,help="random number seed (-1 to use OS entropy)")
parser.add_argument('--features', '-f', default=None, action="extend", nargs="+", type=str,
help="column names for features")
parser.add_argument('--label', '-l', default="label", type=str, help="column name for label")
parser.add_argument('--use-polynomial-features', '-p', default=0, type=int, help="degree of polynomial features. 0 = don't use (default=0)")
parser.add_argument('--use-scaler', '-s', default=0, type=int, help="0 = don't use scaler, 1 = do use scaler (default=0)")
parser.add_argument('--show-test', '-S', default=0, type=int, help="0 = don't show test loss, 1 = do show test loss (default=0)")
parser.add_argument('--categorical-missing-strategy', default="", type=str, help="strategy for missing categorical information")
parser.add_argument('--numerical-missing-strategy', default="", type=str, help="strategy for missing numerical information")
parser.add_argument('--print-preprocessed-data', default=0, type=int, help="0 = don't do the debugging print, 1 = do print (default=0)")
my_args = parser.parse_args(argv[1:])
#
# Do any special fixes/checks here
#
allowed_categorical_missing_strategies = ("most_frequent")
if my_args.categorical_missing_strategy != "":
if my_args.categorical_missing_strategy not in allowed_categorical_missing_strategies:
raise Exception("Missing categorical strategy {} is not in the allowed list {}.".format(my_args.categorical_missing_strategy, allowed_categorical_missing_strategies))
allowed_numerical_missing_strategies = ("mean", "median", "most_frequent")
if my_args.numerical_missing_strategy != "":
if my_args.numerical_missing_strategy not in allowed_numerical_missing_strategies:
raise Exception("Missing numerical strategy {} is not in the allowed list {}.".format(my_args.numerical_missing_strategy, allowed_numerical_missing_strategies))
return my_args
def main(argv):
my_args = parse_args(argv)
# logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
if my_args.action == 'DT':
do_fit(my_args)
elif my_args.action == "score":
show_score(my_args)
elif my_args.action == "show-model":
show_model(my_args)
else:
raise Exception("Action: {} is not known.".format(my_args.action))
return
if __name__ == "__main__":
main(sys.argv)
Last Updated 02/06/2024