Spectral clustering is a complex form of machine learning data clustering. Briefly, the source data is transformed into a reduced-dimension form and then standard k-means clustering is applied to the transformed data. Spectral clustering can sometimes uncover patterns in data that simpler techniques cannot.
Even though the scikit-learn library has a SpectralClustering module, I decided to implement spectral clustering from scratch using Python. It was a big challenge and took me much longer to complete than expected.
I verified my from-scratch implementation of spectral clustering by comparing with the scikit-learn SpectralClustering module. Results are identical.
In high-level pseudo-code, there are five steps in spectral clustering:
1. read source data into memory as a matrix X 2. use X to compute an affinity matrix A 3. use A to compute a Laplacian matrix L 4. use L to compute an eigenvector embedding matrix E 5. perform standard clustering on E (k-means)
The Python code that corresponds to the five steps is:
X = np.loadtxt(".\\Data\\dummy_data_10.txt", usecols = (0,1), delimiter = ",", comments="#", dtype=np.float64) def cluster(self, X): A = self.make_affinity(X) L = self.make_laplacian(A) E = self.make_embedding(L) result = self.process_embedding(E) return result
Each of the steps is a significant problem. The affinity matrix holds the similarity between all pairs of data items, measured by RBF (radial basis function). The Laplacian matrix is kind of a graph representation of the affinity matrix. The embedding is the k eigenvectors of the Laplacian that correspond to the k smallest eigenvalues. When standard k-means clustering is applied to the eigenvectors embedding, the source data is indirectly clustered. Whew!
I used a 10-item dummy dataset. Here is the data before and after clustering.
By far the most difficult part of the implementation was the code to compute the eigenvalues and eigenvectors of the normalized Laplacian matrix. Computing eigenvalues and eigenvectors required code to compute the QR decomposition of a matrix — another extremely difficult problem.
As it turned out, another one of the surprisingly difficult parts of the from-scratch spectral clustering implementation was normalizing the Laplacian matrix (computing the un-normalized Laplacian matrix was fairly easy).
The code for k-means clustering isn’t terribly complicated but it is long. I put a KMeans class inside the main Spectral class.
I used a tiny dummy dataset with just 10 items to test my spectral clustering code. To verify my results, I ran the scikit SpectralClustering module:
import numpy as np from sklearn.cluster import SpectralClustering X = np.array([[0.20, 0.20], [0.20, 0.70], [0.30, 0.90], [0.50, 0.50], [0.50, 0.60], [0.60, 0.50], [0.70, 0.90], [0.40, 0.10], [0.90, 0.70], [0.80, 0.20]]) print("Data: ") print(X) sc = SpectralClustering(n_clusters=2, affinity='rbf', gamma=0.10, assign_labels="kmeans", random_state=0) clustering = sc.fit(X) print("Affinity matrix: ") print(sc.affinity_matrix_) print("Spectral clustering: ") print(clustering.labels_)
The module has a built-in affinity_matrix_ property but no way to access the Laplacian matrix so I inserted a print statement inside the _spectral_embedding.py scikit source code on my machine.
Implementing spectral clustering from scratch using Python was a lot of fun (in my strange world anyway).
I sometimes mentally cluster songs in my head. Here are three songs used in TV advertising in the 1960s that I think have similarities. Left: The song “The Disadvantages of You” was used in Benson and Hedges cigarette ads. Bad product. Good song. Center: “No Matter What Shape (Your Somach’s In)” was used in Alka Seltzer ads. Good song but I prefer Tums. Right” “Calcutta” was a big hit and was used in several regional TV ads for things like furniture stores. Catchy tune even today.
Demo code. Replace “lt”, “gt”, “lte”, “gte” with Boolean operator symbols.
# spectral_scratch.py import numpy as np import matplotlib.pyplot as plt # ----------------------------------------------------------- class Spectral: def __init__(self, k, gamma): self.k = k self.gamma = gamma def cluster(self, X): A = self.make_affinity(X) L = self.make_laplacian(A) E = self.make_embedding(L) result = self.process_embedding(E) print("\nLaplacian: "); print(L) print("\nAffinity: "); print(A) return result def make_affinity(self, X): n = len(X) result = np.zeros((n,n), dtype=np.float64) for i in range(n): for j in range(i,n): rbf = self.my_rbf(X[i], X[j], self.gamma) result[i][j] = rbf result[j][i] = rbf return result def my_rbf(self, v1, v2, gamma): dim = len(v1) sum = 0.0 for i in range(dim): sum += (v1[i] - v2[i])**2 return np.exp(-gamma * sum) def make_laplacian(self, A): n = len(A) D = np.zeros((n,n), dtype=np.float64) for i in range(n): row_sum = 0.0 for j in range(n): row_sum += A[i][j] D[i][i] = row_sum result = D - A return self.normalize_laplacian(result) def normalize_laplacian(self, L): n = len(L) result = np.copy(L) for i in range(n): result[i][i] = 0.0 w = np.zeros(n, dtype=np.float64) for j in range(n): col_sum = 0.0 # in-degree version for i in range(n): col_sum += np.abs(result[i][j]) w[j] = np.sqrt(col_sum) for i in range(n): for j in range(n): result[i][j] /= (w[j] * w[i]) for i in range(n): result[i][i] = 1.0 return result def make_embedding(self, L): eig_vals, eig_vecs = self.my_eigen(L) all_indices = np.argsort(eig_vals) indices = np.zeros(self.k, dtype=np.int64) for i in range(self.k): indices[i] = all_indices[i] extracted = eig_vecs[:,indices] # print(extracted); input() return extracted def process_embedding(self, E): km = self.KMeans(E, self.k) result = km.cluster() return result # --------------------------------------------------------- # eigen functions # --------------------------------------------------------- def my_eigen(self, A): # A must be square, symmetric # return (eigenvalues, eigenvectors) tuple n = len(A) X = np.copy(A) pq = np.eye(n) max_ct = 10000 ct = 0 while ct "lt" max_ct: Q, R = self.my_qr(X) pq = np.matmul(pq, Q) # accum Q X = np.matmul(R, Q) # note order ct += 1 if self.is_upper_tri(X, 1.0e-8) == True: break if ct == max_ct: print("WARN: no converge ") # eigenvalues are diag elements of X e_vals = np.zeros(n, dtype=np.float64) for i in range(n): e_vals[i] = X[i][i] # eigenvectors are columns of pq e_vecs = np.copy(pq) return (e_vals, e_vecs) def my_qr(self, A, standard=False): # QR decomposition. standard: make R diag positive # Householder algorithm, i.e., not Gram-Schmidt or Givens # if not square, verify m greater-than n ("tall") # if standard==True verify m == n m, n = A.shape Q = np.eye(m) # or my_eye(m) -- see below R = np.copy(A) if m == n: end = n-1 else: end = n for i in range(0, end): H = np.eye(m) # ------------------- a = R[i:, i] # partial column vector norm_a = np.linalg.norm(a) # if a[0] "lt" 0.0: norm_a = -norm_a v = a / (a[0] + norm_a) v[0] = 1.0 h = np.eye(len(a)) # H reflection h -= (2 / np.dot(v, v)) * \ np.matmul(v[:,None], v[None,:]) # ------------------- H[i:, i:] = h # copy h into H Q = np.matmul(Q, H) R = np.matmul(H, R) if standard == True: # A must be square S = np.zeros((n,n)) # signs of diagonal for i in range(n): if R[i][i] "lt" 0.0: S[i][i] = -1.0 else: S[i][i] = +1.0 Q = np.matmul(Q, S) R = np.matmul(S, R) return Q, R def is_upper_tri(self, A, tol): n = len(A) for i in range(0,n): for j in range(0,i): # lower if np.abs(A[i][j]) "gt" tol: return False return True # --------------------------------------------------------- # # nested k-means class # # ---------------------------------------------------------- class KMeans: def __init__(self, data, k): self.data = data # copy by ref self.k = k # number clusters self.N = len(data) # number of data items self.dim = len(data[0]) # data item dim self.trials = self.N # to seek best clustering self.max_iter = self.N * 2 # internal updates loop self.initialize(seed=0) # --------------------------------------------------------- def initialize(self, seed): # called several times self.rnd = np.random.RandomState(seed) self.clustering = np.zeros(shape=(self.N), dtype=np.int64) self.means = np.zeros(shape=(self.k,self.dim), dtype=np.float64) # "Random Partition" init (i.e., not Forgy init) indices = np.arange(self.N) self.rnd.shuffle(indices) for i in range(self.k): # first k items self.clustering[indices[i]] = i for i in range(self.k, self.N): self.clustering[indices[i]] = \ self.rnd.randint(0, self.k) # remaining items self.update_means() # compute means # --------------------------------------------------------- def distance(self, item, mean): # Euclidean distance from a data item to a mean # alternative: dist = np.linalg.norm(item-mean) # allows easy swap to different distance function sum = 0.0 dim = len(item) for j in range(dim): sum += (item[j] - mean[j]) ** 2 return np.sqrt(sum) # --------------------------------------------------------- def update_means(self): # assumes no 0-count clusters counts = np.zeros(shape=(self.k), dtype=np.int64) for i in range(self.N): # walk thru each data item c_id = self.clustering[i] counts[c_id] += 1 for kk in range(self.k): # each mean if counts[kk] == 0: print("FATAL: zero-count enter update_means() ") input() counts = \ np.zeros(shape=(self.k), dtype=np.int64) # reset new_means = \ np.zeros(shape=self.means.shape, dtype=np.float64) for i in range(self.N): # walk thru each data item c_id = self.clustering[i] counts[c_id] += 1 for j in range(self.dim): new_means[c_id,j] += self.data[i,j] for kk in range(self.k): # each mean if counts[kk] == 0: return False # bad attempt, no change to means for kk in range(self.k): # each mean for j in range(self.dim): new_means[kk,j] /= counts[kk] # not zero for kk in range(self.k): # each mean for j in range(self.dim): self.means[kk,j] = new_means[kk,j] # by value return True # --------------------------------------------------------- def update_clustering(self): # return False if bad clustering; don't update # verify no zero-count clusters pre-condition counts = np.zeros(shape=(self.k), dtype=np.int64) for i in range(self.N): # walk thru each data item c_id = self.clustering[i] counts[c_id] += 1 for kk in range(self.k): # each cluster_ID if counts[kk] == 0: print("FATAL: zero-count in update_clustering() ") input() new_clustering = np.copy(self.clustering) # proposed # from curr item to each mean distances = np.zeros(shape=(self.k), dtype=np.float64) for i in range(self.N): # walk thru each data item for kk in range(self.k): # distance to each mean distances[kk] = \ self.distance(self.data[i], self.means[kk]) new_id = np.argmin(distances) # new cluster ID new_clustering[i] = new_id if np.array_equal(self.clustering, new_clustering): # no change return False # short-circuit out # make sure that no new clustering counts zero counts = np.zeros(shape=(self.k), dtype=np.int64) for i in range(self.N): c_id = new_clustering[i] counts[c_id] += 1 for kk in range(self.k): # could np.count_nonzero if counts[kk] == 0: # proposed clustering is bad return False # don't update clustering # no counts have gone 0, so update the clustering for i in range(self.N): self.clustering[i] = new_clustering[i] # by val return True # ---------------------------------------------------------- def cluster_once(self): ok = True # good clustering update, good means update sanity_ct = 1 while sanity_ct "lte" self.max_iter: ok = self.update_clustering() if ok == False: # no update to clustering break # done ok = self.update_means() if ok == False: # no update to means break # done sanity_ct += 1 return self.clustering # --------------------------------------------------------- def sum_squared_dists(self): # sum squared distances between all data points # and their cluster mean; WCSS sum = 0.0 for i in range(self.N): # examine each point c_id = self.clustering[i] # get cluster ID m = self.means[c_id] # get mean ss = np.linalg.norm(self.data[i] - m)**2 sum += ss return sum # --------------------------------------------------------- def cluster(self): best_ssd = self.sum_squared_dists() # init clustering best_clustering = np.copy(self.clustering) for i in range(self.trials): # print("-----") self.initialize(seed=i) clustering = self.cluster_once() ssd = self.sum_squared_dists() # print(clustering) # print(ssd) if ssd "lt" best_ssd: best_ssd = ssd best_clustering = np.copy(clustering) # print(best_ssd) # print("-----") # print("\nbest clustering = ") # print(best_clustering) return best_clustering # --------------------------------------------------------- # end KMeans class # --------------------------------------------------------- def main(): print("\nBegin spectral clustering from scratch ") np.set_printoptions(precision=4, suppress=True, floatmode="fixed", linewidth=200) # X = np.loadtxt(".\\Data\\dummy_data_10.txt", # usecols = (0,1), delimiter = ",", comments="#", # dtype=np.float64) X = np.array([[0.20, 0.20], [0.20, 0.70], [0.30, 0.90], [0.50, 0.50], [0.50, 0.60], [0.60, 0.50], [0.70, 0.90], [0.40, 0.10], [0.90, 0.70], [0.80, 0.20]]) print("\nData: ") print(X) k = 2 gamma = 10.0 print("\nClustering with k=2, gamma=0.10 ") sp = Spectral(k, gamma) clustering = sp.cluster(X) print("\nSpectral clustering: ") print(clustering) print("\nEnd demo ") colormap = np.array(['green', 'green']) plt.scatter(X[:, 0], X[:, 1], c=colormap[clustering], s=100) plt.xlim(0, 1); plt.ylim(0, 1) plt.xticks(np.arange(0.0, 1.1, 0.10)) plt.yticks(np.arange(0.0, 1.1, 0.10)) plt.grid() plt.show() colormap = np.array(['red', 'blue']) plt.scatter(X[:, 0], X[:, 1], c=colormap[clustering], s=100) plt.xlim(0, 1); plt.ylim(0, 1) plt.xticks(np.arange(0.0, 1.1, 0.10)) plt.yticks(np.arange(0.0, 1.1, 0.10)) plt.grid() plt.show() if __name__ == "__main__": main()