mirror of
https://github.com/TheAlgorithms/Python.git
synced 2025-07-05 09:21:13 +08:00
psf/black code formatting (#1277)
This commit is contained in:

committed by
Christian Clauss

parent
07f04a2e55
commit
9eac17a408
@ -41,11 +41,20 @@ import pandas as pd
|
||||
from sklearn.datasets import make_blobs, make_circles
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
|
||||
CANCER_DATASET_URL = 'http://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data'
|
||||
CANCER_DATASET_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data"
|
||||
|
||||
|
||||
class SmoSVM(object):
|
||||
def __init__(self, train, kernel_func, alpha_list=None, cost=0.4, b=0.0, tolerance=0.001, auto_norm=True):
|
||||
def __init__(
|
||||
self,
|
||||
train,
|
||||
kernel_func,
|
||||
alpha_list=None,
|
||||
cost=0.4,
|
||||
b=0.0,
|
||||
tolerance=0.001,
|
||||
auto_norm=True,
|
||||
):
|
||||
self._init = True
|
||||
self._auto_norm = auto_norm
|
||||
self._c = np.float64(cost)
|
||||
@ -91,13 +100,25 @@ class SmoSVM(object):
|
||||
self.alphas[i1], self.alphas[i2] = a1_new, a2_new
|
||||
|
||||
# 3: update threshold(b)
|
||||
b1_new = np.float64(-e1 - y1 * K(i1, i1) * (a1_new - a1) - y2 * K(i2, i1) * (a2_new - a2) + self._b)
|
||||
b2_new = np.float64(-e2 - y2 * K(i2, i2) * (a2_new - a2) - y1 * K(i1, i2) * (a1_new - a1) + self._b)
|
||||
b1_new = np.float64(
|
||||
-e1
|
||||
- y1 * K(i1, i1) * (a1_new - a1)
|
||||
- y2 * K(i2, i1) * (a2_new - a2)
|
||||
+ self._b
|
||||
)
|
||||
b2_new = np.float64(
|
||||
-e2
|
||||
- y2 * K(i2, i2) * (a2_new - a2)
|
||||
- y1 * K(i1, i2) * (a1_new - a1)
|
||||
+ self._b
|
||||
)
|
||||
if 0.0 < a1_new < self._c:
|
||||
b = b1_new
|
||||
if 0.0 < a2_new < self._c:
|
||||
b = b2_new
|
||||
if not (np.float64(0) < a2_new < self._c) and not (np.float64(0) < a1_new < self._c):
|
||||
if not (np.float64(0) < a2_new < self._c) and not (
|
||||
np.float64(0) < a1_new < self._c
|
||||
):
|
||||
b = (b1_new + b2_new) / 2.0
|
||||
b_old = self._b
|
||||
self._b = b
|
||||
@ -107,7 +128,11 @@ class SmoSVM(object):
|
||||
for s in self.unbound:
|
||||
if s == i1 or s == i2:
|
||||
continue
|
||||
self._error[s] += y1 * (a1_new - a1) * K(i1, s) + y2 * (a2_new - a2) * K(i2, s) + (self._b - b_old)
|
||||
self._error[s] += (
|
||||
y1 * (a1_new - a1) * K(i1, s)
|
||||
+ y2 * (a2_new - a2) * K(i2, s)
|
||||
+ (self._b - b_old)
|
||||
)
|
||||
|
||||
# if i1 or i2 is non-bound,update there error value to zero
|
||||
if self._is_unbound(i1):
|
||||
@ -119,7 +144,9 @@ class SmoSVM(object):
|
||||
def predict(self, test_samples, classify=True):
|
||||
|
||||
if test_samples.shape[1] > self.samples.shape[1]:
|
||||
raise ValueError("Test samples' feature length does not equal to that of train samples")
|
||||
raise ValueError(
|
||||
"Test samples' feature length does not equal to that of train samples"
|
||||
)
|
||||
|
||||
if self._auto_norm:
|
||||
test_samples = self._norm(test_samples)
|
||||
@ -173,14 +200,23 @@ class SmoSVM(object):
|
||||
k_matrix = np.zeros([self.length, self.length])
|
||||
for i in self._all_samples:
|
||||
for j in self._all_samples:
|
||||
k_matrix[i, j] = np.float64(self.Kernel(self.samples[i, :], self.samples[j, :]))
|
||||
k_matrix[i, j] = np.float64(
|
||||
self.Kernel(self.samples[i, :], self.samples[j, :])
|
||||
)
|
||||
return k_matrix
|
||||
|
||||
# Predict test sample's tag
|
||||
def _predict(self, sample):
|
||||
k = self._k
|
||||
predicted_value = np.sum(
|
||||
[self.alphas[i1] * self.tags[i1] * k(i1, sample) for i1 in self._all_samples]) + self._b
|
||||
predicted_value = (
|
||||
np.sum(
|
||||
[
|
||||
self.alphas[i1] * self.tags[i1] * k(i1, sample)
|
||||
for i1 in self._all_samples
|
||||
]
|
||||
)
|
||||
+ self._b
|
||||
)
|
||||
return predicted_value
|
||||
|
||||
# Choose alpha1 and alpha2
|
||||
@ -200,23 +236,27 @@ class SmoSVM(object):
|
||||
while True:
|
||||
all_not_obey = True
|
||||
# all sample
|
||||
print('scanning all sample!')
|
||||
print("scanning all sample!")
|
||||
for i1 in [i for i in self._all_samples if self._check_obey_kkt(i)]:
|
||||
all_not_obey = False
|
||||
yield from self._choose_a2(i1)
|
||||
|
||||
# non-bound sample
|
||||
print('scanning non-bound sample!')
|
||||
print("scanning non-bound sample!")
|
||||
while True:
|
||||
not_obey = True
|
||||
for i1 in [i for i in self._all_samples if self._check_obey_kkt(i) and self._is_unbound(i)]:
|
||||
for i1 in [
|
||||
i
|
||||
for i in self._all_samples
|
||||
if self._check_obey_kkt(i) and self._is_unbound(i)
|
||||
]:
|
||||
not_obey = False
|
||||
yield from self._choose_a2(i1)
|
||||
if not_obey:
|
||||
print('all non-bound samples fit the KKT condition!')
|
||||
print("all non-bound samples fit the KKT condition!")
|
||||
break
|
||||
if all_not_obey:
|
||||
print('all samples fit the KKT condition! Optimization done!')
|
||||
print("all samples fit the KKT condition! Optimization done!")
|
||||
break
|
||||
return False
|
||||
|
||||
@ -231,7 +271,11 @@ class SmoSVM(object):
|
||||
|
||||
if len(self.unbound) > 0:
|
||||
tmp_error = self._error.copy().tolist()
|
||||
tmp_error_dict = {index: value for index, value in enumerate(tmp_error) if self._is_unbound(index)}
|
||||
tmp_error_dict = {
|
||||
index: value
|
||||
for index, value in enumerate(tmp_error)
|
||||
if self._is_unbound(index)
|
||||
}
|
||||
if self._e(i1) >= 0:
|
||||
i2 = min(tmp_error_dict, key=lambda index: tmp_error_dict[index])
|
||||
else:
|
||||
@ -289,8 +333,20 @@ class SmoSVM(object):
|
||||
# way 1
|
||||
f1 = y1 * (e1 + b) - a1 * K(i1, i1) - s * a2 * K(i1, i2)
|
||||
f2 = y2 * (e2 + b) - a2 * K(i2, i2) - s * a1 * K(i1, i2)
|
||||
ol = l1 * f1 + L * f2 + 1 / 2 * l1 ** 2 * K(i1, i1) + 1 / 2 * L ** 2 * K(i2, i2) + s * L * l1 * K(i1, i2)
|
||||
oh = h1 * f1 + H * f2 + 1 / 2 * h1 ** 2 * K(i1, i1) + 1 / 2 * H ** 2 * K(i2, i2) + s * H * h1 * K(i1, i2)
|
||||
ol = (
|
||||
l1 * f1
|
||||
+ L * f2
|
||||
+ 1 / 2 * l1 ** 2 * K(i1, i1)
|
||||
+ 1 / 2 * L ** 2 * K(i2, i2)
|
||||
+ s * L * l1 * K(i1, i2)
|
||||
)
|
||||
oh = (
|
||||
h1 * f1
|
||||
+ H * f2
|
||||
+ 1 / 2 * h1 ** 2 * K(i1, i1)
|
||||
+ 1 / 2 * H ** 2 * K(i2, i2)
|
||||
+ s * H * h1 * K(i1, i2)
|
||||
)
|
||||
"""
|
||||
# way 2
|
||||
Use objective function check which alpha2 new could get the minimal objectives
|
||||
@ -370,14 +426,10 @@ class Kernel(object):
|
||||
def _check(self):
|
||||
if self._kernel == self._rbf:
|
||||
if self.gamma < 0:
|
||||
raise ValueError('gamma value must greater than 0')
|
||||
raise ValueError("gamma value must greater than 0")
|
||||
|
||||
def _get_kernel(self, kernel_name):
|
||||
maps = {
|
||||
'linear': self._linear,
|
||||
'poly': self._polynomial,
|
||||
'rbf': self._rbf
|
||||
}
|
||||
maps = {"linear": self._linear, "poly": self._polynomial, "rbf": self._rbf}
|
||||
return maps[kernel_name]
|
||||
|
||||
def __call__(self, v1, v2):
|
||||
@ -390,34 +442,35 @@ class Kernel(object):
|
||||
def count_time(func):
|
||||
def call_func(*args, **kwargs):
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
func(*args, **kwargs)
|
||||
end_time = time.time()
|
||||
print('smo algorithm cost {} seconds'.format(end_time - start_time))
|
||||
print("smo algorithm cost {} seconds".format(end_time - start_time))
|
||||
|
||||
return call_func
|
||||
|
||||
|
||||
@count_time
|
||||
def test_cancel_data():
|
||||
print('Hello!\r\nStart test svm by smo algorithm!')
|
||||
print("Hello!\r\nStart test svm by smo algorithm!")
|
||||
# 0: download dataset and load into pandas' dataframe
|
||||
if not os.path.exists(r'cancel_data.csv'):
|
||||
if not os.path.exists(r"cancel_data.csv"):
|
||||
request = urllib.request.Request(
|
||||
CANCER_DATASET_URL,
|
||||
headers={'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
|
||||
headers={"User-Agent": "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"},
|
||||
)
|
||||
response = urllib.request.urlopen(request)
|
||||
content = response.read().decode('utf-8')
|
||||
with open(r'cancel_data.csv', 'w') as f:
|
||||
content = response.read().decode("utf-8")
|
||||
with open(r"cancel_data.csv", "w") as f:
|
||||
f.write(content)
|
||||
|
||||
data = pd.read_csv(r'cancel_data.csv', header=None)
|
||||
data = pd.read_csv(r"cancel_data.csv", header=None)
|
||||
|
||||
# 1: pre-processing data
|
||||
del data[data.columns.tolist()[0]]
|
||||
data = data.dropna(axis=0)
|
||||
data = data.replace({'M': np.float64(1), 'B': np.float64(-1)})
|
||||
data = data.replace({"M": np.float64(1), "B": np.float64(-1)})
|
||||
samples = np.array(data)[:, :]
|
||||
|
||||
# 2: deviding data into train_data data and test_data data
|
||||
@ -425,11 +478,18 @@ def test_cancel_data():
|
||||
test_tags, test_samples = test_data[:, 0], test_data[:, 1:]
|
||||
|
||||
# 3: choose kernel function,and set initial alphas to zero(optional)
|
||||
mykernel = Kernel(kernel='rbf', degree=5, coef0=1, gamma=0.5)
|
||||
mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5)
|
||||
al = np.zeros(train_data.shape[0])
|
||||
|
||||
# 4: calculating best alphas using SMO algorithm and predict test_data samples
|
||||
mysvm = SmoSVM(train=train_data, kernel_func=mykernel, alpha_list=al, cost=0.4, b=0.0, tolerance=0.001)
|
||||
mysvm = SmoSVM(
|
||||
train=train_data,
|
||||
kernel_func=mykernel,
|
||||
alpha_list=al,
|
||||
cost=0.4,
|
||||
b=0.0,
|
||||
tolerance=0.001,
|
||||
)
|
||||
mysvm.fit()
|
||||
predict = mysvm.predict(test_samples)
|
||||
|
||||
@ -439,14 +499,18 @@ def test_cancel_data():
|
||||
for i in range(test_tags.shape[0]):
|
||||
if test_tags[i] == predict[i]:
|
||||
score += 1
|
||||
print('\r\nall: {}\r\nright: {}\r\nfalse: {}'.format(test_num, score, test_num - score))
|
||||
print(
|
||||
"\r\nall: {}\r\nright: {}\r\nfalse: {}".format(
|
||||
test_num, score, test_num - score
|
||||
)
|
||||
)
|
||||
print("Rough Accuracy: {}".format(score / test_tags.shape[0]))
|
||||
|
||||
|
||||
def test_demonstration():
|
||||
# change stdout
|
||||
print('\r\nStart plot,please wait!!!')
|
||||
sys.stdout = open(os.devnull, 'w')
|
||||
print("\r\nStart plot,please wait!!!")
|
||||
sys.stdout = open(os.devnull, "w")
|
||||
|
||||
ax1 = plt.subplot2grid((2, 2), (0, 0))
|
||||
ax2 = plt.subplot2grid((2, 2), (0, 1))
|
||||
@ -464,32 +528,50 @@ def test_demonstration():
|
||||
sys.stdout = sys.__stdout__
|
||||
print("Plot done!!!")
|
||||
|
||||
|
||||
def test_linear_kernel(ax, cost):
|
||||
train_x, train_y = make_blobs(n_samples=500, centers=2,
|
||||
n_features=2, random_state=1)
|
||||
train_x, train_y = make_blobs(
|
||||
n_samples=500, centers=2, n_features=2, random_state=1
|
||||
)
|
||||
train_y[train_y == 0] = -1
|
||||
scaler = StandardScaler()
|
||||
train_x_scaled = scaler.fit_transform(train_x, train_y)
|
||||
train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled))
|
||||
mykernel = Kernel(kernel='linear', degree=5, coef0=1, gamma=0.5)
|
||||
mysvm = SmoSVM(train=train_data, kernel_func=mykernel, cost=cost, tolerance=0.001, auto_norm=False)
|
||||
mykernel = Kernel(kernel="linear", degree=5, coef0=1, gamma=0.5)
|
||||
mysvm = SmoSVM(
|
||||
train=train_data,
|
||||
kernel_func=mykernel,
|
||||
cost=cost,
|
||||
tolerance=0.001,
|
||||
auto_norm=False,
|
||||
)
|
||||
mysvm.fit()
|
||||
plot_partition_boundary(mysvm, train_data, ax=ax)
|
||||
|
||||
|
||||
def test_rbf_kernel(ax, cost):
|
||||
train_x, train_y = make_circles(n_samples=500, noise=0.1, factor=0.1, random_state=1)
|
||||
train_x, train_y = make_circles(
|
||||
n_samples=500, noise=0.1, factor=0.1, random_state=1
|
||||
)
|
||||
train_y[train_y == 0] = -1
|
||||
scaler = StandardScaler()
|
||||
train_x_scaled = scaler.fit_transform(train_x, train_y)
|
||||
train_data = np.hstack((train_y.reshape(500, 1), train_x_scaled))
|
||||
mykernel = Kernel(kernel='rbf', degree=5, coef0=1, gamma=0.5)
|
||||
mysvm = SmoSVM(train=train_data, kernel_func=mykernel, cost=cost, tolerance=0.001, auto_norm=False)
|
||||
mykernel = Kernel(kernel="rbf", degree=5, coef0=1, gamma=0.5)
|
||||
mysvm = SmoSVM(
|
||||
train=train_data,
|
||||
kernel_func=mykernel,
|
||||
cost=cost,
|
||||
tolerance=0.001,
|
||||
auto_norm=False,
|
||||
)
|
||||
mysvm.fit()
|
||||
plot_partition_boundary(mysvm, train_data, ax=ax)
|
||||
|
||||
|
||||
def plot_partition_boundary(model, train_data, ax, resolution=100, colors=('b', 'k', 'r')):
|
||||
def plot_partition_boundary(
|
||||
model, train_data, ax, resolution=100, colors=("b", "k", "r")
|
||||
):
|
||||
"""
|
||||
We can not get the optimum w of our kernel svm model which is different from linear svm.
|
||||
For this reason, we generate randomly destributed points with high desity and prediced values of these points are
|
||||
@ -502,25 +584,44 @@ def plot_partition_boundary(model, train_data, ax, resolution=100, colors=('b',
|
||||
train_data_tags = train_data[:, 0]
|
||||
xrange = np.linspace(train_data_x.min(), train_data_x.max(), resolution)
|
||||
yrange = np.linspace(train_data_y.min(), train_data_y.max(), resolution)
|
||||
test_samples = np.array([(x, y) for x in xrange for y in yrange]).reshape(resolution * resolution, 2)
|
||||
test_samples = np.array([(x, y) for x in xrange for y in yrange]).reshape(
|
||||
resolution * resolution, 2
|
||||
)
|
||||
|
||||
test_tags = model.predict(test_samples, classify=False)
|
||||
grid = test_tags.reshape((len(xrange), len(yrange)))
|
||||
|
||||
# Plot contour map which represents the partition boundary
|
||||
ax.contour(xrange, yrange, np.mat(grid).T, levels=(-1, 0, 1), linestyles=('--', '-', '--'),
|
||||
linewidths=(1, 1, 1),
|
||||
colors=colors)
|
||||
ax.contour(
|
||||
xrange,
|
||||
yrange,
|
||||
np.mat(grid).T,
|
||||
levels=(-1, 0, 1),
|
||||
linestyles=("--", "-", "--"),
|
||||
linewidths=(1, 1, 1),
|
||||
colors=colors,
|
||||
)
|
||||
# Plot all train samples
|
||||
ax.scatter(train_data_x, train_data_y, c=train_data_tags, cmap=plt.cm.Dark2, lw=0, alpha=0.5)
|
||||
ax.scatter(
|
||||
train_data_x,
|
||||
train_data_y,
|
||||
c=train_data_tags,
|
||||
cmap=plt.cm.Dark2,
|
||||
lw=0,
|
||||
alpha=0.5,
|
||||
)
|
||||
|
||||
# Plot support vectors
|
||||
support = model.support
|
||||
ax.scatter(train_data_x[support], train_data_y[support], c=train_data_tags[support], cmap=plt.cm.Dark2)
|
||||
ax.scatter(
|
||||
train_data_x[support],
|
||||
train_data_y[support],
|
||||
c=train_data_tags[support],
|
||||
cmap=plt.cm.Dark2,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
test_cancel_data()
|
||||
test_demonstration()
|
||||
plt.show()
|
||||
|
||||
|
Reference in New Issue
Block a user