Machine Learning
Time Series splits
Example of 5 splits, each one with a train and test sets (test set size limited)
from sklearn.model_selection import TimeSeriesSplit
test_size = int(0.05*len(X))
max_train_size = 300
ts_cv = TimeSeriesSplit(
n_splits=5,
gap=2,
# max_train_size=max_train_size,
test_size=test_size,
)
all_splits = list(ts_cv.split(X, y))
Cross validation for model performance
cv_results = cross_validate(
model,
X,
y,
cv=all_splits, # or the splitter
scoring=["neg_mean_absolute_error", "neg_root_mean_squared_error", "r2"],
return_estimator=True
)
mae = -cv_results["test_neg_mean_absolute_error"]
rmse = -cv_results["test_neg_root_mean_squared_error"]
r2 = cv_results["test_r2"]
print(
f"CV performance: \n"
f"Mean Absolute Error: {mae.mean():.3f} +/- {mae.std():.3f}\n"
f"Root Mean Squared Error: {rmse.mean():.3f} +/- {rmse.std():.3f}\n"
f"R2: {r2.mean():.3f} +/- {r2.std():.3f}\n"
)
Visualize the predictions for that cross validation
The average score of the model for this method should be the same as the cross_validate
def plot_predictions(model, X, y, splits, fit=True, normalize=False, good_r2=0.2):
# Each split has a Train and Test set
for idx, split in enumerate(splits):
train, test = split
if fit is True:
model.fit(X.iloc[train], y.iloc[train])
model_preds = model.predict(X.iloc[test])
df_test_true_pred = y.iloc[test].to_frame()
df_test_true_pred["preds"] = [p for p in model_preds]
# escalating values
if normalize is True:
df_test_true_pred["preds"] = df_test_true_pred["preds"] / y.max()
df_test_true_pred["revenue"] = df_test_true_pred["revenue"] / y.max()
mae = mean_absolute_error(df_test_true_pred["revenue"], df_test_true_pred["preds"])
rmse = mean_squared_error(df_test_true_pred["revenue"], df_test_true_pred["preds"])
r2 = r2_score( df_test_true_pred["revenue"], df_test_true_pred["preds"])
if r2 < good_r2:
print(f"❗Low r2 for split {idx}")
print(
f"For this split {idx}\n"
f"MAE: {mae:.3f}\n"
f"RMSE: {rmse:.3f}\n"
f"R2: {r2:.3f}"
)
fig = px.line(df_test_true_pred, title=f"Predictions for test set {idx}")
fig.show()
if fit:
return model
Pipelines for column transformation
Apply functions to columns so it will be handled better by the model
categorical_columns = [
"season",
"is_covid"
]
categories = [
["spring", "summer", "autumn", "winter"],
[False, True]
]
# Ordinal encoder will create new columns with 0,1 values
ordinal_encoder = OrdinalEncoder(categories=categories)
xgb_pipeline = make_pipeline(
ColumnTransformer(
transformers=[
("categorical", ordinal_encoder, categorical_columns),
],
remainder="passthrough",
verbose_feature_names_out=False
),
xgb.XGBRegressor() # The model
)
Applying the pipeline transformers to the data matrix
So it is easily accesible to the transformed data passed to the model.
scaled_values = xgb_pipeline.named_steps["columntransformer"].fit_transform(X)
transformed_columns = xgb_pipeline.named_steps["columntransformer"].get_feature_names_out()
X_transformed = pd.DataFrame(scaled_values, X.index, columns=transformed_columns)
Parameter search using a foking Pipeline
Just use the named step following by __param
Check the named step using pipeline.named_steps
and select the key of the estimator/s (the last ones).
regr = RandomForestRegressor()
pipeline = make_pipeline(
ColumnTransformer(
transformers=[
("categorical", one_hot_encoder, categorical_columns),
("month_sin", sin_transformer(12), ["month"]),
("month_cos", cos_transformer(12), ["month"]),
],
remainder="passthrough",
),
regr
)
n_estimators = [int(x) for x in np.linspace(start=200, stop=1000, num=10)]
max_depth = [int(x) for x in np.linspace(10, 110, num=11)]
max_depth.append(None)
min_samples_split = [2, 5, 10]
min_samples_leaf = [1, 2, 4]
bootstrap = [True, False]
random_grid = {
"randomforestregressor__n_estimators": n_estimators,
"randomforestregressor__max_depth": max_depth,
"randomforestregressor__min_samples_split": min_samples_split,
"randomforestregressor__min_samples_leaf": min_samples_leaf,
"randomforestregressor__bootstrap": bootstrap,
}
search = RandomizedSearchCV(pipeline, random_grid, random_state=0, cv=all_splits)
search.fit(X, y)
search.best_params_
XGBoost
# XGBoost param search
params = { 'max_depth': [3, 5, 6, 10, 15, 20],
'learning_rate': [0.01, 0.1, 0.2, 0.3],
'subsample': np.arange(0.5, 1.0, 0.1),
'colsample_bytree': np.arange(0.4, 1.0, 0.1),
'colsample_bylevel': np.arange(0.4, 1.0, 0.1),
'n_estimators': [100, 500, 1000]
}
# Try RandomizedSearchCV for shorter computing times
clf = GridSearchCV(estimator=xgb.XGBRegressor(),
param_grid=params,
scoring='neg_mean_squared_error',
verbose=1,
cv=all_splits
)
clf.fit(X.values, y)
print("Best parameters:", clf.best_params_)
print("Lowest RMSE: ", (-clf.best_score_)**(1/2.0))
Shap
Machine Learning explainability
Shap with XGBoost
xgb_model = xgb_model.fit(X_transformed.values, y) # Don't use a pipeline! first transform the data matrix
explainer = shap.TreeExplainer(xgb_model)
shap_values = explainer.shap_values(X_transformed[0:].values) # computing all shap values of the data (only useful if you gonna use them all)
# The API is a bit crappy and the plots are very tiquismiquis
def show_waterfall_plot_for(index, max_features=20):
shap_object = ShapObject(base_value=explainer.expected_value, # Baseline value
values=explainer.shap_values(X_transformed.iloc[index:index+1].values)[0],
feature_names=X_transformed.columns,
data=X_transformed.iloc[index] # The feature values for this observation
)
shap.waterfall_plot(shap_object, max_display=max_features)
return shap_object
# a very sunny day
shap_obj = show_waterfall_plot_for(585);
shap.force_plot(explainer.expected_value, shap_obj.value, features=X_transformed.iloc[585])