pipeline.py [download]
#!/usr/bin/env python3
import sys
import argparse
import logging
import os.path
import pandas as pd
import numpy as np
import sklearn.linear_model
import sklearn.preprocessing
import sklearn.pipeline
import sklearn.base
import sklearn.metrics
import sklearn.impute
import sklearn.svm
import sklearn.ensemble
import joblib
import pprint
import matplotlib.pyplot as plt
from pipeline_elements import *
from data_overhead import *
from make_pipeline import *
def do_fit(my_args):
"""
fit pipeline to training data
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
pipeline.fit(X, y)
model_file = get_model_filename(my_args.model_file, train_file)
joblib.dump(pipeline, model_file)
return
def do_cross(my_args):
"""
do cross validation with training data
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
cv_results = sklearn.model_selection.cross_validate(pipeline, X, y, cv=my_args.cv_count, n_jobs=-1, verbose=3, scoring=('r2', 'neg_mean_squared_error', 'neg_mean_absolute_error'),)
# print(cv_results.keys())
print("R2:", cv_results['test_r2'], cv_results['test_r2'].mean())
print("MSE:", cv_results['test_neg_mean_squared_error'], cv_results['test_neg_mean_squared_error'].mean())
print("MAE:", cv_results['test_neg_mean_absolute_error'], cv_results['test_neg_mean_absolute_error'].mean())
# pipeline.fit(X, y)
# model_file = get_model_filename(my_args.model_file, train_file)
# joblib.dump(pipeline, model_file)
return
def show_score(my_args):
"""
shows the already trained model's score on training data.
also on the test data, if --show-test 1
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
test_file = get_test_filename(my_args.test_file, train_file)
if not os.path.exists(test_file):
raise Exception("testing data file, '{}', does not exist.".format(test_file))
model_file = get_model_filename(my_args.model_file, train_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
X_train, y_train = load_data(my_args, train_file)
X_test, y_test = load_data(my_args, test_file)
pipeline = joblib.load(model_file)
regressor = pipeline['model']
basename = get_basename(train_file)
score_train = regressor.score(pipeline['features'].transform(X_train), y_train)
if my_args.show_test:
score_test = regressor.score(pipeline['features'].transform(X_test), y_test)
print("{}: train_score: {} test_score: {}".format(basename, score_train, score_test))
else:
print("{}: train_score: {}".format(basename, score_train))
return
def show_loss(my_args):
"""
shows the already trained model's loss on training data.
# commented out for Kaggle data
# also on the test data if --show-test 1
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
# test_file = get_test_filename(my_args.test_file, train_file)
# if not os.path.exists(test_file):
# raise Exception("testing data file, '{}', does not exist.".format(test_file))
model_file = get_model_filename(my_args.model_file, train_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
X_train, y_train = load_data(my_args, train_file)
# commented out for Kaggle data
# X_test, y_test = load_data(my_args, test_file)
pipeline = joblib.load(model_file)
y_train_predicted = pipeline.predict(X_train)
# commented out for Kaggle data
# y_test_predicted = pipeline.predict(X_test)
basename = get_basename(train_file)
loss_train = sklearn.metrics.mean_squared_error(y_train, y_train_predicted)
# commented out for Kaggle data
# if my_args.show_test:
# loss_test = sklearn.metrics.mean_squared_error(y_test, y_test_predicted)
# print("{}: L2(MSE) train_loss: {} test_loss: {}".format(basename, loss_train, loss_test))
# else:
print("{}: L2(MSE) train_loss: {}".format(basename, loss_train))
loss_train = sklearn.metrics.mean_absolute_error(y_train, y_train_predicted)
# commented out for Kaggle data
# if my_args.show_test:
# loss_test = sklearn.metrics.mean_absolute_error(y_test, y_test_predicted)
# print("{}: L1(MAE) train_loss: {} test_loss: {}".format(basename, loss_train, loss_test))
# else:
print("{}: L1(MAE) train_loss: {}".format(basename, loss_train))
loss_train = sklearn.metrics.r2_score(y_train, y_train_predicted)
# commented out for Kaggle data
# if my_args.show_test:
# loss_test = sklearn.metrics.r2_score(y_test, y_test_predicted)
# print("{}: R2 train_loss: {} test_loss: {}".format(basename, loss_train, loss_test))
# else:
print("{}: R2 train_loss: {}".format(basename, loss_train))
return
def do_predict(my_args):
"""
Do predictions on the test data using the already trained model.
Writes the result to file. Designed for use with Kaggle competitions.
"""
test_file = my_args.test_file
if not os.path.exists(test_file):
raise Exception("testing data file: {} does not exist.".format(test_file))
model_file = get_model_filename(my_args.model_file, test_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
X_test, y_test = load_data(my_args, test_file)
pipeline = joblib.load(model_file)
y_test_predicted = pipeline.predict(X_test)
merged = X_test.index.to_frame()
merged[my_args.label] = y_test_predicted
merged.to_csv("predictions.csv", index=False)
return
def do_proba(my_args):
"""
Do predictions on the test data using the already trained model.
Writes the result to file. Designed for use with Kaggle competitions.
"""
test_file = my_args.test_file
if not os.path.exists(test_file):
raise Exception("testing data file: {} does not exist.".format(test_file))
model_file = get_model_filename(my_args.model_file, test_file)
if not os.path.exists(model_file):
raise Exception("Model file, '{}', does not exist.".format(model_file))
X_test, y_test = load_data(my_args, test_file)
pipeline = joblib.load(model_file)
y_test_predicted = pipeline.predict_proba(X_test)
merged = X_test.index.to_frame()
merged[my_args.label] = y_test_predicted[:,1]
merged.to_csv("predictions_proba.csv", index=False)
return
def do_grid_search(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
fit_params = make_fit_params(my_args)
search_grid = sklearn.model_selection.GridSearchCV(pipeline, fit_params,
scoring="f1_micro",
cv=my_args.cv_count,
n_jobs=-1, verbose=1)
search_grid.fit(X, y)
search_grid_file = get_search_grid_filename(my_args.search_grid_file, train_file)
joblib.dump(search_grid, search_grid_file)
model_file = get_model_filename(my_args.model_file, train_file)
joblib.dump(search_grid.best_estimator_, model_file)
return
def do_random_search(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
fit_params = make_fit_params(my_args)
search_grid = sklearn.model_selection.RandomizedSearchCV(pipeline, fit_params,
scoring="f1_micro",
cv=my_args.cv_count,
n_jobs=-1, verbose=1,
n_iter=my_args.n_search_iterations)
search_grid.fit(X, y)
search_grid_file = get_search_grid_filename(my_args.search_grid_file, train_file)
joblib.dump(search_grid, search_grid_file)
model_file = get_model_filename(my_args.model_file, train_file)
joblib.dump(search_grid.best_estimator_, model_file)
return
def show_best_params(my_args):
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
test_file = get_test_filename(my_args.test_file, train_file)
if not os.path.exists(test_file):
raise Exception("testing data file, '{}', does not exist.".format(test_file))
search_grid_file = get_search_grid_filename(my_args.search_grid_file, train_file)
if not os.path.exists(search_grid_file):
raise Exception("Search grid file, '{}', does not exist.".format(search_grid_file))
search_grid = joblib.load(search_grid_file)
pp = pprint.PrettyPrinter(indent=4)
print("Best Score:", search_grid.best_score_)
print("Best Params:")
pp.pprint(search_grid.best_params_)
return
def do_cross_score(my_args):
"""
do cross validation scoring with training data
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
# scoring="accuracy" is a classification metric.
# scoring="r2" is a regression metric.
# https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter
score = sklearn.model_selection.cross_val_score(pipeline, X, y, cv=my_args.cv_count, n_jobs=-1, scoring="accuracy")
print("Cross Validation Score: {:.3f} : {}".format(score.mean(), ["{:.3f}".format(x) for x in score]))
return
from cm_display import print_cm
def do_confusion_matrix(my_args):
"""
do cross validation and show confusion matrix
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
y_pred = sklearn.model_selection.cross_val_predict(pipeline, X, y, cv=my_args.cv_count, n_jobs=-1)
cm = sklearn.metrics.confusion_matrix(y, y_pred)
labels = ["F", "T"]
print()
print()
print_cm(cm, labels)
print()
print()
pscore = sklearn.metrics.precision_score(y, y_pred)
rscore = sklearn.metrics.recall_score(y, y_pred)
f1score = sklearn.metrics.f1_score(y, y_pred)
print("Precision: {:.3f}".format(pscore))
print("Recall: {:.3f}".format(rscore))
print("F1: {:.3f}".format(f1score))
return
def do_precision_recall_plot(my_args):
"""
plot the precision-recall curve
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
y_pred = sklearn.model_selection.cross_val_predict(pipeline, X, y, cv=my_args.cv_count, n_jobs=-1, method=my_args.cross_val_predict_method)
if my_args.cross_val_predict_method == "predict_proba":
y_pred = y_pred[:, 1]
precisions, recalls, thresholds = sklearn.metrics.precision_recall_curve(y, y_pred)
# compute maximum f1 score, and its threshold
numerator = 2 * recalls * precisions
denom = recalls + precisions
f1_scores = np.divide(numerator, denom, out=np.zeros_like(denom), where=(denom!=0))
max_f1 = np.max(f1_scores)
max_f1_thresh = thresholds[np.argmax(f1_scores)]
#
threshold = max_f1_thresh
plt.plot(thresholds, precisions[:-1], "b--", label="Precision", linewidth=2)
plt.plot(thresholds, recalls[:-1], "g-", label="Recall", linewidth=2)
plt.vlines(threshold, 0, 1.0, "k", "dotted", label="max f1 {:.3f}".format(max_f1))
plt.title(my_args.model_type + " Precision+Recall")
plt.xlabel("Threshold")
plt.grid(True)
plt.legend()
plt.savefig(my_args.image_file)
plt.clf()
return
def do_precision_recall_curve(my_args):
"""
plot the precision-recall curve
"""
train_file = my_args.train_file
if not os.path.exists(train_file):
raise Exception("training data file: {} does not exist.".format(train_file))
X, y = load_data(my_args, train_file)
pipeline = make_fit_pipeline(my_args)
y_pred = sklearn.model_selection.cross_val_predict(pipeline, X, y, cv=my_args.cv_count, n_jobs=-1, method=my_args.cross_val_predict_method)
if my_args.cross_val_predict_method == "predict_proba":
y_pred = y_pred[:, 1]
precisions, recalls, thresholds = sklearn.metrics.precision_recall_curve(y, y_pred)
# compute maximum f1 score, the precision and recall at that point
numerator = 2 * recalls * precisions
denom = recalls + precisions
f1_scores = np.divide(numerator, denom, out=np.zeros_like(denom), where=(denom!=0))
max_f1 = np.max(f1_scores)
max_f1_thresh = thresholds[np.argmax(f1_scores)]
max_f1_precision = precisions[np.argmax(f1_scores)]
max_f1_recall = recalls[np.argmax(f1_scores)]
#
plt.plot(recalls, precisions, linewidth=2, label="Precision/Recall curve")
plt.title(my_args.model_type + " Precision/Recall")
plt.vlines(max_f1_recall, 0, max_f1_precision, "k", "dotted", label="max f1 {:.3f}".format(max_f1))
plt.hlines(max_f1_precision, 0, max_f1_recall, "k", "dotted")
plt.ylabel("Precision")
plt.xlabel("Recall")
plt.grid(True)
plt.legend()
plt.savefig(my_args.image_file)
plt.clf()
return
def parse_args(argv):
parser = argparse.ArgumentParser(prog=argv[0], description='Fit Data Using Pipeline',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('action', default='fit',
choices=[ "fit", "score", "loss", "cross", "predict", "grid-search", "show-best-params", "random-search",
"cross-score", "confusion-matrix", "precision-recall-plot", "pr-curve" ],
nargs='?', help="desired action")
parser.add_argument('--model-type', '-M', default="SGD", type=str, choices=["SGD", "linear", "SVM", "boost", "forest", "tree"], help="Model type")
parser.add_argument('--train-file', '-t', default="", type=str, help="name of file with training data")
parser.add_argument('--test-file', '-T', default="", type=str, help="name of file with test data (default is constructed from train file name)")
parser.add_argument('--model-file', '-m', default="", type=str, help="name of file for the model (default is constructed from train file name when fitting)")
parser.add_argument('--search-grid-file', '-g', default="", type=str, help="name of file for the search grid (default is constructed from train file name when fitting)")
parser.add_argument('--random-seed', '-R', default=314159265,type=int,help="random number seed (-1 to use OS entropy)")
parser.add_argument('--features', '-f', default=None, action="extend", nargs="+", type=str,
help="column names for features")
parser.add_argument('--label', '-l', default="label", type=str, help="column name for label")
parser.add_argument('--use-polynomial-features', '-p', default=0, type=int, help="degree of polynomial features. 0 = don't use (default=0)")
parser.add_argument('--use-scaler', '-s', default=0, type=int, help="0 = don't use scaler, 1 = do use scaler (default=0)")
parser.add_argument('--categorical-missing-strategy', default="", type=str, choices=("", "most_frequent"), help="strategy for missing categorical information")
parser.add_argument('--numerical-missing-strategy', default="", type=str, choices=("", "mean", "median", "most_frequent"), help="strategy for missing numerical information")
parser.add_argument('--show-test', '-S', default=0, type=int, help="0 = don't show test loss, 1 = do show test loss (default=0)")
parser.add_argument('--n-search-iterations', default=10, type=int, help="number of random iterations in randomized grid search.")
parser.add_argument('--cv-count', default=3, type=int, help="number of partitions for cross validation.")
parser.add_argument('--image-file', default="image.png", type=str, help="name of file to store output images")
parser.add_argument('--cross-val-predict-method', default="", type=str, help="method argument for cross_val_predict, will be determined by model-type")
my_args = parser.parse_args(argv[1:])
if my_args.model_type in ("SGD", "linear"):
my_args.cross_val_predict_method = "decision_function"
elif my_args.model_type in ("SVM", "boost", "forest", "tree"):
my_args.cross_val_predict_method = "predict_proba"
else:
raise Exception("???")
return my_args
def main(argv):
my_args = parse_args(argv)
# logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
if my_args.action == 'fit':
do_fit(my_args)
elif my_args.action == "score":
show_score(my_args)
elif my_args.action == "loss":
show_loss(my_args)
elif my_args.action == "cross":
do_cross(my_args)
elif my_args.action == "predict":
do_predict(my_args)
elif my_args.action == 'grid-search':
do_grid_search(my_args)
elif my_args.action == 'random-search':
do_random_search(my_args)
elif my_args.action == "show-best-params":
show_best_params(my_args)
elif my_args.action == "cross-score":
do_cross_score(my_args)
elif my_args.action == "confusion-matrix":
do_confusion_matrix(my_args)
elif my_args.action == "precision-recall-plot":
do_precision_recall_plot(my_args)
elif my_args.action == "pr-curve":
do_precision_recall_curve(my_args)
else:
raise Exception("Action: {} is not known.".format(my_args.action))
return
if __name__ == "__main__":
main(sys.argv)
Last Updated 02/13/2025