Many applications (dataset cleaning being a key example) require being able to decide whether a new sample belongs to a given distribution - an inlier - or not. (an outlier.)
outlier detection: training data contains outliers (samples that are far from the others). Outlier detectors fit the regions where training data is most concentrated
novelty detection: training data is contaminated - we want to detect whether a new observation is a novelty.
Both are used for anomaly detection. Outliers/anomalies cannot form dense clusters - estimators assume they are located in low density regions.
Learning method: estimator.fit(X_train)
Testing new observations: estimator.predict(X_test)
- Inliers are labeled 1; outliers are labeled -1.
Predict applies a threshold
parameter on a scoring function (score_samples
) method.
The decision_function
method is defined from the scoring function - negative values are outliers and non-negative ones are inliers: estimator.decision_function(X_test)
LOF does not support predict
, decision_function
or score_samples
by default - only a fit_predict
method. (This estimator was originally meant to be applied for outlier detection.)
The abnormality scores of the training samples are accessible via negative_outlier_factor_
.
If you really want to use LocalOutlierFactor
for novelty detection (predicting labels or computing abnormality scores of new data), you can instantiate the estimator with the novelty parameter set to True
before fitting. In this case, fit_predict
is not available.
Each dataset has 15% of samples generated with random uniform noise.
Decision boundaries between inliers and outliers are displayed in black except for Local Outlier Factor (LOF) - it has no predict method for new data.
OneClassSVM is sensitive to outliers - it does not perform well. It is best suited for novelty detection when the training set is not contaminated by outliers.
That said, outlier detection in high-dimension, or without any assumptions on the distribution of the inlying data is very challenging. One-class SVM might give useful results in these situations.
EllipticEnvelope assumes the data is Gaussian and learns an ellipse. It degrades when the data is not unimodal. It is robust to outliers.
IsolationForest & LocalOutlierFactor perform reasonably well for multi-modal datasets. Its advantage is shown in the third data set where the two modes have different densities.
This is explained by the local aspect of LOF - it only compares the abnormality score of one sample with the scores of its neighbors.
The last dataset is uniformly distributed in a hypercube. Except for the OneClassSVM which overfits a little, all estimators present decent solutions. Look closely at the abnormality scores - a good estimator should assign similar scores to all the samples.
Note: the model parameters are handpicked - in practice they need to be adjusted. In the absence of labelled data, the problem is completely unsupervised so model selection can be a challenge.
import time
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from sklearn import svm
from sklearn.datasets import make_moons, make_blobs
from sklearn.covariance import EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
matplotlib.rcParams['contour.negative_linestyle'] = 'solid'
# Example settings
n_samples = 300
outliers_fraction = 0.15
n_outliers = int(outliers_fraction * n_samples)
n_inliers = n_samples - n_outliers
anomaly_algorithms = [
("Robust covariance",
EllipticEnvelope(contamination=outliers_fraction)),
("One-Class SVM",
svm.OneClassSVM(nu=outliers_fraction,
kernel="rbf",
gamma=0.1)),
("Isolation Forest",
IsolationForest(contamination=outliers_fraction,
random_state=42)),
("Local Outlier Factor", LocalOutlierFactor(
n_neighbors=35,
contamination=outliers_fraction))]
# Define datasets
blobs_params = dict(random_state=0, n_samples=n_inliers, n_features=2)
datasets = [
make_blobs(centers=[[0, 0], [0, 0]],
cluster_std=0.5,
**blobs_params)[0],
make_blobs(centers=[[2, 2], [-2, -2]],
cluster_std=[0.5, 0.5],
**blobs_params)[0],
make_blobs(centers=[[2, 2], [-2, -2]],
cluster_std=[1.5, .3],
**blobs_params)[0],
4.0*(make_moons(n_samples=n_samples,
noise=.05,
random_state=0)[0] - np.array([0.5, 0.25])),
14.0*(np.random.RandomState(42).rand(n_samples, 2) - 0.5)]
xx, yy = np.meshgrid(np.linspace(-7, 7, 150),
np.linspace(-7, 7, 150))
plt.figure(figsize=(len(anomaly_algorithms)*2+3, 12.5))
plt.subplots_adjust(left=.02, right=.98, bottom=.001, top=.96,
wspace=.05, hspace=.01)
plot_num = 1
rng = np.random.RandomState(42)
for i_dataset, X in enumerate(datasets):
X = np.concatenate([X,
rng.uniform(low=-6, high=6,
size=(n_outliers, 2))],
axis=0)
for name, algorithm in anomaly_algorithms:
t0 = time.time()
algorithm.fit(X)
t1 = time.time()
plt.subplot(len(datasets), len(anomaly_algorithms), plot_num)
if i_dataset == 0:
plt.title(name, size=18)
if name == "Local Outlier Factor":
y_pred = algorithm.fit_predict(X)
else:
y_pred = algorithm.fit(X).predict(X)
if name != "Local Outlier Factor": # LOF does not implement predict
Z = algorithm.predict(np.c_[xx.ravel(),
yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contour(xx, yy, Z, levels=[0],
linewidths=2, colors='black')
colors = np.array(['#377eb8', '#ff7f00'])
plt.scatter(X[:, 0],
X[:, 1],
s=10, color=colors[(y_pred + 1) // 2])
plt.xlim(-7, 7);
plt.ylim(-7, 7)
plt.xticks(())
plt.yticks(())
plt.text(.99, .01, ('%.2fs' % (t1 - t0)).lstrip('0'),
transform=plt.gca().transAxes, size=15,
horizontalalignment='right')
plot_num += 1
Consider a dataset from a single distribution. Are new observations so different that we can doubt it comes from the same distribution?
One-Class SVM requires the choice of a kernel (typically, RBF) and a scalar parameter to define a frontier.
nu
(aka the margin) corresponds to the probability of finding a new, but regular, observation outside the frontier.
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.font_manager
from sklearn import svm
xx, yy = np.meshgrid(np.linspace(-5, 5, 500),
np.linspace(-5, 5, 500))
# numpy.r_: Translates slice objects to concatenation along the first axis.
X = 0.3*np.random.randn(100, 2); X_train = np.r_[X+2, X-2]
X = 0.3*np.random.randn( 20, 2); X_test = np.r_[X+2, X-2]
X_outliers = np.random.uniform(low=-4, high=4, size=(20, 2))
clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1).fit(X_train)
y_pred_train = clf.predict(X_train)
y_pred_test = clf.predict(X_test)
y_pred_outliers = clf.predict(X_outliers)
n_error_train = y_pred_train[ y_pred_train == -1].size
n_error_test = y_pred_test[ y_pred_test == -1].size
n_error_outliers = y_pred_outliers[y_pred_outliers == 1].size
# plot the line, the points, and the nearest vectors to the plane
Z = clf.decision_function(np.c_[xx.ravel(),
yy.ravel()])
Z = Z.reshape(xx.shape)
plt.title("Novelty Detection - OneClassSVM")
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.PuBu)
plt.contourf(xx, yy, Z, levels=[0, Z.max()], colors='palevioletred')
a = plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='darkred')
s = 40
b1 = plt.scatter(X_train[ :, 0], X_train[ :, 1], c='white', s=s, edgecolors='k')
b2 = plt.scatter(X_test[ :, 0], X_test[ :, 1], c='blueviolet', s=s, edgecolors='k')
c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='gold', s=s, edgecolors='k')
plt.axis('tight')
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.legend([a.collections[0], b1, b2, c],
["learned frontier", "training data",
"new regular data", "new abnormal data"],
loc="upper left",
prop=matplotlib.font_manager.FontProperties(size=11))
plt.xlabel(
"error train: %d/200 ; errors novel regular: %d/40 ; "
"errors novel abnormal: %d/40"
% (n_error_train, n_error_test, n_error_outliers))
Text(0.5, 0, 'error train: 18/200 ; errors novel regular: 8/40 ; errors novel abnormal: 0/40')
Two South American mammals with past observations & 14 environmental variables.
Only positive samples (no unsuccessful observations) - treat problem as density estimation & use OneClassSVM
Use basemap to plot geography outlines.
from time import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import Bunch
from sklearn.datasets import fetch_species_distributions
from sklearn import svm, metrics
# if basemap is available, we'll use it. otherwise, we'll improvise later...
try:
from mpl_toolkits.basemap import Basemap
basemap = True
except ImportError:
basemap = False
# Construct map grid from batch object
# params: batch : object returned by :func:`fetch_species_distributions`
# returns: (xgrid, ygrid) : 1-D arrays = values in batch.coverages
def construct_grids(batch):
xmin = batch.x_left_lower_corner + batch.grid_size
ymin = batch.y_left_lower_corner + batch.grid_size
xmax = xmin + (batch.Nx * batch.grid_size)
ymax = ymin + (batch.Ny * batch.grid_size)
xgrid = np.arange(xmin, xmax, batch.grid_size)
ygrid = np.arange(ymin, ymax, batch.grid_size)
return (xgrid, ygrid)
# Create bunch with info about specified organism
# Uses test/train record arrays to extract species data
def create_species_bunch(species_name, train, test, coverages, xgrid, ygrid):
bunch = Bunch(name=' '.join(species_name.split("_")[:2]))
species_name = species_name.encode('ascii')
points = dict(test=test, train=train)
for label, pts in points.items():
# choose points associated with the desired species
pts = pts[pts['species'] == species_name]
bunch['pts_%s' % label] = pts
# determine coverage values for each of the training & testing points
ix = np.searchsorted(xgrid, pts['dd long'])
iy = np.searchsorted(ygrid, pts['dd lat'])
bunch['cov_%s' % label] = coverages[:, -iy, ix].T
return bunch
def plot_species_distribution(species=("bradypus_variegatus_0",
"microryzomys_minutus_0")):
if len(species) > 2:
print("When >2 species are provided, only 1st 2 will be used")
t0 = time()
data = fetch_species_distributions()
xgrid, ygrid = construct_grids(data)
X, Y = np.meshgrid(xgrid, ygrid[::-1])
BV_bunch = create_species_bunch(species[0],
data.train, data.test,
data.coverages, xgrid, ygrid)
MM_bunch = create_species_bunch(species[1],
data.train, data.test,
data.coverages, xgrid, ygrid)
np.random.seed(13)
background_points = np.c_[np.random.randint(low=0, high=data.Ny,
size=10000),
np.random.randint(low=0, high=data.Nx,
size=10000)].T
# We'll make use of the fact that coverages[6] has measurements at all
# land points. This will help us decide between land and water.
land_reference = data.coverages[6]
for i, species in enumerate([BV_bunch, MM_bunch]):
mean = species.cov_train.mean(axis=0)
std = species.cov_train.std(axis=0)
train_cover_std = (species.cov_train - mean) / std
# Fit OneClassSVM
clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.5)
clf.fit(train_cover_std)
# Plot map of South America
plt.subplot(1, 2, i + 1)
if basemap:
print("using basemap")
m = Basemap(projection='cyl', llcrnrlat=Y.min(),
urcrnrlat=Y.max(), llcrnrlon=X.min(),
urcrnrlon=X.max(), resolution='c')
m.drawcoastlines()
m.drawcountries()
else:
print("from coverage")
plt.contour(X, Y, land_reference,
levels=[-9998], colors="k",
linestyles="solid")
plt.xticks([])
plt.yticks([])
Z = np.ones((data.Ny, data.Nx), dtype=np.float64)
# We'll predict only for the land points.
idx = np.where(land_reference > -9999)
coverages_land = data.coverages[:, idx[0], idx[1]].T
pred = clf.decision_function((coverages_land - mean) / std)
Z *= pred.min()
Z[idx[0], idx[1]] = pred
levels = np.linspace(Z.min(), Z.max(), 25)
Z[land_reference == -9999] = -9999
# plot contours of the prediction
plt.contourf(X, Y, Z, levels=levels, cmap=plt.cm.Reds)
plt.colorbar(format='%.2f')
# scatter training/testing points
plt.scatter(species.pts_train['dd long'], species.pts_train['dd lat'],
s=2 ** 2, c='black',
marker='^', label='train')
plt.scatter(species.pts_test['dd long'], species.pts_test['dd lat'],
s=2 ** 2, c='black',
marker='x', label='test')
plt.legend()
plt.title(species.name)
plt.axis('equal')
# Compute AUC with regards to background points
pred_background = Z[background_points[0], background_points[1]]
pred_test = clf.decision_function((species.cov_test - mean) / std)
scores = np.r_[pred_test, pred_background]
y = np.r_[np.ones(pred_test.shape), np.zeros(pred_background.shape)]
fpr, tpr, thresholds = metrics.roc_curve(y, scores)
roc_auc = metrics.auc(fpr, tpr)
plt.text(-35, -70, "AUC: %.3f" % roc_auc, ha="right")
print("\n Area under the ROC curve : %f" % roc_auc)
print("\ntime elapsed: %.2fs" % (time() - t0))
plot_species_distribution()
from coverage Area under the ROC curve : 0.868443 from coverage Area under the ROC curve : 0.993919 time elapsed: 4.33s
Similar to novelty detection - the goal is to separate "good" observations from polluting ones. We don’t have a clean dataset to train any tool.
One approach is to assume the good data comes from a known distribution (e.g. data are Gaussian distributed). We can use this to define the “shape” of the data - and can define data sufficiently far from the shape to be outliers.
EllipticEnvelope fits a robust covariance estimate to the data - thereby fitting an ellipse to the central data points.
This method uses Mahalanobis distances to derive a distance measure.
import numpy as np
from sklearn.covariance import EllipticEnvelope
from sklearn.svm import OneClassSVM
import matplotlib.pyplot as plt
import matplotlib.font_manager
from sklearn.datasets import load_wine
classifiers = {
"Empirical": EllipticEnvelope(contamination=0.25, support_fraction=1.0),
"Robust (MCD)": EllipticEnvelope(contamination=0.25),
"OCSVM": OneClassSVM(nu=0.25, gamma=0.35)}
colors = ['m', 'g', 'b']
legend1, legend2 = {},{}
X1 = load_wine()['data'][:, [1, 2]] # two clusters
# Learn a frontier for outlier detection with several classifiers
xx1, yy1 = np.meshgrid(np.linspace(0, 6, 500),
np.linspace(1, 4.5, 500))
for i, (clf_name, clf) in enumerate(classifiers.items()):
plt.figure(1)
clf.fit(X1)
Z1 = clf.decision_function(np.c_[xx1.ravel(),
yy1.ravel()])
Z1 = Z1.reshape(xx1.shape)
legend1[clf_name] = plt.contour(
xx1, yy1, Z1,
levels=[0], linewidths=2, colors=colors[i])
legend1_values_list = list(legend1.values())
legend1_keys_list = list(legend1.keys())
plt.figure(1) # two clusters
plt.title("Outlier detection / wine dataset)")
plt.scatter(X1[:, 0], X1[:, 1], color='black')
bbox_args = dict(boxstyle="round", fc="0.8")
arrow_args = dict(arrowstyle="->")
plt.annotate("outlying points", xy=(4, 2),
xycoords="data", textcoords="data",
xytext=(3, 1.25), bbox=bbox_args, arrowprops=arrow_args)
plt.xlim((xx1.min(), xx1.max()))
plt.ylim((yy1.min(), yy1.max()))
plt.legend((legend1_values_list[0].collections[0],
legend1_values_list[1].collections[0],
legend1_values_list[2].collections[0]),
(legend1_keys_list[0], legend1_keys_list[1], legend1_keys_list[2]),
loc="upper center",
prop=matplotlib.font_manager.FontProperties(size=11))
plt.ylabel("ash")
plt.xlabel("malic_acid")
Text(0.5, 0, 'malic_acid')
Especially useful in high-dimensional problems.
‘isolates’ observations by randomly selecting a feature, then randomly selecting a split value between the [max,min] values of the feature.
Recursive partitioning can be represented with a tree structure - the #splits required to isolate a sample equals the path length from the root to the terminating node. This value, averaged over a forest of such trees, is a measure of normality and our decision function.
Random partitioning produces noticeably shorter paths for anomalies. Hence, when a forest of random trees returns shorter path lengths for particular samples, they are highly likely to be anomalies.
This implementation is based on ExtraTreeRegressor. The max depth of each tree is set to the #samples used to build the tree (see Liu et al., 2008 for details).
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
rng = np.random.RandomState(42)
X = 0.3*rng.randn(100, 2); X_train = np.r_[X+2, X-2]
X = 0.3*rng.randn( 20, 2); X_test = np.r_[X+2, X-2]
X_outliers = rng.uniform(low=-4, high=4, size=(20, 2))
clf = IsolationForest(max_samples=100, random_state=rng).fit(X_train)
y_pred_train = clf.predict(X_train)
y_pred_test = clf.predict(X_test)
y_pred_outliers = clf.predict(X_outliers)
xx, yy = np.meshgrid(np.linspace(-5, 5, 50),
np.linspace(-5, 5, 50))
Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.title("IsolationForest")
plt.contourf(xx, yy, Z, cmap=plt.cm.Blues_r)
b1 = plt.scatter(X_train[ :, 0], X_train[ :, 1], c='white', s=20, edgecolor='k')
b2 = plt.scatter(X_test[ :, 0], X_test[ :, 1], c='green', s=20, edgecolor='k')
c = plt.scatter( X_outliers[:, 0], X_outliers[:, 1], c='red', s=20, edgecolor='k')
plt.axis('tight')
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.legend([b1, b2, c],
["training", "new (normal)", "new (abnormal)"],
loc="upper left")
<matplotlib.legend.Legend at 0x7fc7d6eedee0>
Another efficient way to perform outlier detection on moderately high dimensional datasets is to use the Local Outlier Factor (LOF) algorithm.
Computes a score that describes a degree of abnormality of the observations. It measures the local density deviation of a given data point vs its neighbors. The idea is to detect samples with a substantially lower density than their neighbors.
In practice the local density is obtained from k-nearest neighbors
. A LOF score equals the ratio of the average local density: a normal instance should have a local density similar to that of its neighbors - abnormal data should have much smaller local density.
n_neighbors
is chosen 1) greater than the minimum #objects a cluster has to contain (so that other objects can be local outliers relative to this cluster), and 2) smaller than the maximum #nearby objects that can potentially be local outliers.
This info is typically not available. Using n_neighbors=20
is a good start. If the proportion of outliers is high (maybe >10%), n_neighbors
should be greater (maybe>35).
LOF considers both the local and global properties of datasets - it can perform well even when abnormal samples have different underlying densities. The question is not how isolated the sample is - but how isolated it is with respect to the surrounding neighborhood.
LOF has no predict
, decision_function
and score_samples
methods - only a fit_predict
method. Training data abnormality scores are available via negative_outlier_factor_
.
predict
, decision_function
and score_samples
can be used on new unseen data when LOF is applied for novelty detection (when novelty=True
).
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor as LOF
np.random.seed(42)
X_inliers = 0.3*np.random.randn(100, 2)
X_inliers = np.r_[X_inliers+2,
X_inliers-2]
X_outliers = np.random.uniform(low=-4, high=4, size=(20, 2))
X = np.r_[X_inliers,
X_outliers]
n_outliers = len(X_outliers)
ground_truth = np.ones(len(X), dtype=int)
ground_truth[-n_outliers:] = -1
clf = LOF(n_neighbors=20, contamination=0.1)
# use fit_predict to compute predicted labels of training samples
# (when LOF is used for outlier detection, the estimator has no predict,
# decision_function and score_samples methods).
y_pred = clf.fit_predict(X)
n_errors = (y_pred != ground_truth).sum()
X_scores = clf.negative_outlier_factor_
plt.title("Local Outlier Factor (LOF)")
plt.scatter(X[:, 0],
X[:, 1],
color='k', s=3., label='Data points')
# plot circles with radius proportional to the outlier scores
radius = (X_scores.max() - X_scores) / (X_scores.max() - X_scores.min())
plt.scatter(X[:, 0],
X[:, 1],
s=1000 * radius, edgecolors='r',
facecolors='none', label='Outlier scores')
plt.axis('tight')
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.xlabel("prediction errors: %d" % (n_errors))
legend = plt.legend(loc='upper left')
legend.legendHandles[0]._sizes = [10]
legend.legendHandles[1]._sizes = [20]
To use LOF for novelty detection (predicting labels, abnormality scores of new unseen data), build an estimator with novelty=True
before fitting.
Only use predict
, decision_function
and score_samples
on new, unseen data - NOT on training samples. This would lead to wrong results.
Abnormality scores of training samples are available via negative_outlier_factor_
.
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor as LOF
np.random.seed(42)
xx, yy = np.meshgrid(np.linspace(-5, 5, 500),
np.linspace(-5, 5, 500))
X = 0.3*np.random.randn(100, 2); X_train = np.r_[X+2, X-2]
X = 0.3*np.random.randn( 20, 2); X_test = np.r_[X+2, X-2]
X_outliers = np.random.uniform(low=-4, high=4, size=(20, 2))
clf = LOF(n_neighbors=20,
novelty=True,
contamination=0.1).fit(X_train)
# DO NOT use predict, decision_function and score_samples on X_train as this
# would give wrong results but only on new unseen data (not used in X_train),
# e.g. X_test, X_outliers or the meshgrid
y_pred_test = clf.predict(X_test)
y_pred_outliers = clf.predict(X_outliers)
n_error_test = y_pred_test[y_pred_test == -1].size
n_error_outliers = y_pred_outliers[y_pred_outliers == 1].size
Z = clf.decision_function(np.c_[xx.ravel(),
yy.ravel()])
Z = Z.reshape(xx.shape)
plt.title("Novelty Detection with LOF")
plt.contourf(xx, yy, Z,
levels=np.linspace(Z.min(), 0, 7),
cmap=plt.cm.PuBu)
a = plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='darkred')
plt.contourf( xx, yy, Z, levels=[0, Z.max()], colors='palevioletred')
s = 40
b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=s, edgecolors='k')
b2 = plt.scatter(X_test[ :, 0], X_test[:, 1], c='blueviolet', s=s, edgecolors='k')
c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='gold', s=s, edgecolors='k')
plt.axis('tight')
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.legend([a.collections[0], b1, b2, c],
["learned frontier", "training data",
"new data (normal)", "new data (abnormal)"],
loc="upper left",
prop=matplotlib.font_manager.FontProperties(size=11))
plt.xlabel(
"errors novel regular: %d/40 ; errors novel abnormal: %d/40"
% (n_error_test, n_error_outliers))
Text(0.5, 0, 'errors novel regular: 8/40 ; errors novel abnormal: 0/40')