Uncategorized

Spectral Clustering From Scratch Using Python


Spectral clustering is a complex form of machine learning data clustering. Briefly, the source data is transformed into a reduced-dimension form and then standard k-means clustering is applied to the transformed data. Spectral clustering can sometimes uncover patterns in data that simpler techniques cannot.

Even though the scikit-learn library has a SpectralClustering module, I decided to implement spectral clustering from scratch using Python. It was a big challenge and took me much longer to complete than expected.


I verified my from-scratch implementation of spectral clustering by comparing with the scikit-learn SpectralClustering module. Results are identical.

In high-level pseudo-code, there are five steps in spectral clustering:


1. read source data into memory as a matrix X
2. use X to compute an affinity matrix A
3. use A to compute a Laplacian matrix L
4. use L to compute an eigenvector embedding matrix E 
5. perform standard clustering on E (k-means)

The Python code that corresponds to the five steps is:


X = np.loadtxt(".\\Data\\dummy_data_10.txt",
  usecols = (0,1), delimiter = ",", comments="#",
  dtype=np.float64) 

def cluster(self, X):
  A = self.make_affinity(X)
  L = self.make_laplacian(A)
  E = self.make_embedding(L)
  result = self.process_embedding(E) 
  return result

Each of the steps is a significant problem. The affinity matrix holds the similarity between all pairs of data items, measured by RBF (radial basis function). The Laplacian matrix is kind of a graph representation of the affinity matrix. The embedding is the k eigenvectors of the Laplacian that correspond to the k smallest eigenvalues. When standard k-means clustering is applied to the eigenvectors embedding, the source data is indirectly clustered. Whew!


I used a 10-item dummy dataset. Here is the data before and after clustering.

By far the most difficult part of the implementation was the code to compute the eigenvalues and eigenvectors of the normalized Laplacian matrix. Computing eigenvalues and eigenvectors required code to compute the QR decomposition of a matrix — another extremely difficult problem.

As it turned out, another one of the surprisingly difficult parts of the from-scratch spectral clustering implementation was normalizing the Laplacian matrix (computing the un-normalized Laplacian matrix was fairly easy).

The code for k-means clustering isn’t terribly complicated but it is long. I put a KMeans class inside the main Spectral class.

I used a tiny dummy dataset with just 10 items to test my spectral clustering code. To verify my results, I ran the scikit SpectralClustering module:


import numpy as np
from sklearn.cluster import SpectralClustering

X = np.array([[0.20, 0.20],
[0.20, 0.70],
[0.30, 0.90],
[0.50, 0.50],
[0.50, 0.60],
[0.60, 0.50],
[0.70, 0.90],
[0.40, 0.10],
[0.90, 0.70],
[0.80, 0.20]])

print("Data: ")
print(X)

sc = SpectralClustering(n_clusters=2,
  affinity='rbf', gamma=0.10,
  assign_labels="kmeans",
  random_state=0)
clustering = sc.fit(X)

print("Affinity matrix: ")
print(sc.affinity_matrix_)

print("Spectral clustering: ")
print(clustering.labels_)

The module has a built-in affinity_matrix_ property but no way to access the Laplacian matrix so I inserted a print statement inside the _spectral_embedding.py scikit source code on my machine.

Implementing spectral clustering from scratch using Python was a lot of fun (in my strange world anyway).



I sometimes mentally cluster songs in my head. Here are three songs used in TV advertising in the 1960s that I think have similarities. Left: The song “The Disadvantages of You” was used in Benson and Hedges cigarette ads. Bad product. Good song. Center: “No Matter What Shape (Your Somach’s In)” was used in Alka Seltzer ads. Good song but I prefer Tums. Right” “Calcutta” was a big hit and was used in several regional TV ads for things like furniture stores. Catchy tune even today.






Demo code. Replace “lt”, “gt”, “lte”, “gte” with Boolean operator symbols.


# spectral_scratch.py

import numpy as np
import matplotlib.pyplot as plt

# -----------------------------------------------------------

class Spectral:
  def __init__(self, k, gamma):
    self.k = k
    self.gamma = gamma

  def cluster(self, X):
    A = self.make_affinity(X)
    L = self.make_laplacian(A)
    E = self.make_embedding(L)
    result = self.process_embedding(E)
    print("\nLaplacian: "); print(L)
    print("\nAffinity: "); print(A)
    return result

  def make_affinity(self, X):
    n = len(X)
    result = np.zeros((n,n), dtype=np.float64)
    for i in range(n):
      for j in range(i,n):
        rbf = self.my_rbf(X[i], X[j], self.gamma)
        result[i][j] = rbf
        result[j][i] = rbf
    return result

  def my_rbf(self, v1, v2, gamma):
    dim = len(v1)
    sum = 0.0
    for i in range(dim):
      sum += (v1[i] - v2[i])**2
    return np.exp(-gamma * sum)

  def make_laplacian(self, A):
    n = len(A)
    D = np.zeros((n,n), dtype=np.float64)
    for i in range(n):
      row_sum = 0.0
      for j in range(n):
        row_sum += A[i][j]
      D[i][i] = row_sum
    result = D - A
    return self.normalize_laplacian(result)

  def normalize_laplacian(self, L):
    n = len(L)
    result = np.copy(L)
    for i in range(n):
      result[i][i] = 0.0

    w = np.zeros(n, dtype=np.float64)
    for j in range(n):
      col_sum = 0.0  # in-degree version
      for i in range(n):
        col_sum += np.abs(result[i][j])
      w[j] = np.sqrt(col_sum)

    for i in range(n):
      for j in range(n):
        result[i][j] /= (w[j] * w[i])

    for i in range(n):
      result[i][i] = 1.0
    return result

  def make_embedding(self, L):
    eig_vals, eig_vecs = self.my_eigen(L)
    all_indices = np.argsort(eig_vals)
    indices = np.zeros(self.k, dtype=np.int64)
    for i in range(self.k):
      indices[i] = all_indices[i]
    extracted = eig_vecs[:,indices]
    # print(extracted); input()
    return extracted

  def process_embedding(self, E):
    km = self.KMeans(E, self.k)
    result = km.cluster()
    return result

  # ---------------------------------------------------------
  # eigen functions
  # ---------------------------------------------------------

  def my_eigen(self, A):
    # A must be square, symmetric
    # return (eigenvalues, eigenvectors) tuple
    n = len(A)
    X = np.copy(A)
    pq = np.eye(n)
    max_ct = 10000

    ct = 0
    while ct "lt" max_ct:
      Q, R = self.my_qr(X)
      pq = np.matmul(pq, Q)  # accum Q
      X = np.matmul(R, Q)  # note order
      ct += 1

      if self.is_upper_tri(X, 1.0e-8) == True:
        break

    if ct == max_ct:
      print("WARN: no converge ")

    # eigenvalues are diag elements of X
    e_vals = np.zeros(n, dtype=np.float64)
    for i in range(n):
      e_vals[i] = X[i][i]

    # eigenvectors are columns of pq
    e_vecs = np.copy(pq)
  
    return (e_vals, e_vecs)

  def my_qr(self, A, standard=False):
    # QR decomposition. standard: make R diag positive
    # Householder algorithm, i.e., not Gram-Schmidt or Givens
    # if not square, verify m greater-than n ("tall")
    # if standard==True verify m == n

    m, n = A.shape  
    Q = np.eye(m)  # or my_eye(m) -- see below
    R = np.copy(A)
    if m == n: end = n-1
    else: end = n
    for i in range(0, end):
      H = np.eye(m)

      # -------------------
      a = R[i:, i]  # partial column vector
      norm_a = np.linalg.norm(a)  # 
      if a[0] "lt" 0.0: norm_a = -norm_a
      v = a / (a[0] + norm_a)
      v[0] = 1.0
      h = np.eye(len(a))  # H reflection
      h -= (2 / np.dot(v, v)) * \
        np.matmul(v[:,None], v[None,:])
      # -------------------

      H[i:, i:] = h  # copy h into H
      Q = np.matmul(Q, H)
      R = np.matmul(H, R)

    if standard == True:  # A must be square
      S = np.zeros((n,n))  # signs of diagonal
      for i in range(n):
        if R[i][i] "lt" 0.0: S[i][i] = -1.0
        else: S[i][i] = +1.0
      Q = np.matmul(Q, S)
      R = np.matmul(S, R)

    return Q, R

  def is_upper_tri(self, A, tol):
    n = len(A)
    for i in range(0,n):
      for j in range(0,i):  # lower
        if np.abs(A[i][j]) "gt" tol:
          return False
    return True

  # ---------------------------------------------------------
  #
  # nested k-means class
  #
  # ----------------------------------------------------------

  class KMeans:
    def __init__(self, data, k):
      self.data = data  # copy by ref
      self.k = k  # number clusters
      self.N = len(data)  # number of data items
      self.dim = len(data[0])  # data item dim 
      self.trials = self.N  # to seek best clustering
      self.max_iter = self.N * 2  # internal updates loop
      self.initialize(seed=0)

  # ---------------------------------------------------------

    def initialize(self, seed):  # called several times
      self.rnd = np.random.RandomState(seed)
      self.clustering = np.zeros(shape=(self.N),
        dtype=np.int64)
      self.means = np.zeros(shape=(self.k,self.dim),
        dtype=np.float64)
      # "Random Partition" init (i.e., not Forgy init)
      indices = np.arange(self.N)
      self.rnd.shuffle(indices)
      for i in range(self.k):  # first k items
        self.clustering[indices[i]] = i
      for i in range(self.k, self.N):
        self.clustering[indices[i]] = \
          self.rnd.randint(0, self.k)  # remaining items
      self.update_means()  # compute means

  # ---------------------------------------------------------
 
    def distance(self, item, mean):
      # Euclidean distance from a data item to a mean
      # alternative: dist = np.linalg.norm(item-mean)
      # allows easy swap to different distance function
      sum = 0.0
      dim = len(item)
      for j in range(dim):
        sum += (item[j] - mean[j]) ** 2
      return np.sqrt(sum)

  # ---------------------------------------------------------

    def update_means(self):
      # assumes no 0-count clusters
      counts = np.zeros(shape=(self.k), dtype=np.int64)
      for i in range(self.N):  # walk thru each data item
        c_id = self.clustering[i]
        counts[c_id] += 1
      for kk in range(self.k):  # each mean
        if counts[kk] == 0:
          print("FATAL: zero-count enter update_means() ")
          input()
      counts = \
        np.zeros(shape=(self.k), dtype=np.int64)  # reset
      new_means = \
        np.zeros(shape=self.means.shape, dtype=np.float64)  
      for i in range(self.N):  # walk thru each data item
        c_id = self.clustering[i]
        counts[c_id] += 1
        for j in range(self.dim):
          new_means[c_id,j] += self.data[i,j]

      for kk in range(self.k):  # each mean
        if counts[kk] == 0:
          return False  # bad attempt, no change to means

      for kk in range(self.k):  # each mean
        for j in range(self.dim):
          new_means[kk,j] /= counts[kk]  # not zero

      for kk in range(self.k):  # each mean
        for j in range(self.dim):
          self.means[kk,j] = new_means[kk,j]  # by value

      return True

  # ---------------------------------------------------------

    def update_clustering(self):
      # return False if bad clustering; don't update

      # verify no zero-count clusters pre-condition
      counts = np.zeros(shape=(self.k), dtype=np.int64)
      for i in range(self.N):  # walk thru each data item
        c_id = self.clustering[i]
        counts[c_id] += 1
      for kk in range(self.k):  # each cluster_ID
        if counts[kk] == 0:
          print("FATAL: zero-count in update_clustering() ")
          input()

      new_clustering = np.copy(self.clustering)  # proposed
      # from curr item to each mean
      distances = np.zeros(shape=(self.k), dtype=np.float64)

      for i in range(self.N):  # walk thru each data item
        for kk in range(self.k):  # distance to each mean
          distances[kk] = \
            self.distance(self.data[i], self.means[kk])  
        new_id = np.argmin(distances)  # new cluster ID
        new_clustering[i] = new_id
  
      if np.array_equal(self.clustering,
        new_clustering):  # no change
        return False  # short-circuit out

      # make sure that no new clustering counts zero
      counts = np.zeros(shape=(self.k), dtype=np.int64)
      for i in range(self.N):
        c_id = new_clustering[i]
        counts[c_id] += 1
  
      for kk in range(self.k):  # could np.count_nonzero
        if counts[kk] == 0:  # proposed clustering is bad
          return False       # don't update clustering

      # no counts have gone 0, so update the clustering
      for i in range(self.N):
        self.clustering[i] = new_clustering[i]  # by val

      return True

  # ----------------------------------------------------------

    def cluster_once(self):
      ok = True  # good clustering update, good means update
      sanity_ct = 1
      while sanity_ct "lte" self.max_iter: 
        ok = self.update_clustering()
        if ok == False:  # no update to clustering
          break          # done
        ok = self.update_means()
        if ok == False:  # no update to means
          break          # done
        sanity_ct += 1

      return self.clustering

  # ---------------------------------------------------------

    def sum_squared_dists(self):
      # sum squared distances between all data points
      # and their cluster mean; WCSS
      sum = 0.0
      for i in range(self.N):  # examine each point
        c_id = self.clustering[i]  # get cluster ID
        m = self.means[c_id]       # get mean
        ss = np.linalg.norm(self.data[i] - m)**2
        sum += ss
      return sum 

  # ---------------------------------------------------------

    def cluster(self):
      best_ssd = self.sum_squared_dists()  # init clustering
      best_clustering = np.copy(self.clustering)
      for i in range(self.trials):
        # print("-----")
        self.initialize(seed=i)
        clustering = self.cluster_once()
        ssd = self.sum_squared_dists()
        # print(clustering)
        # print(ssd)
        if ssd "lt" best_ssd:
          best_ssd = ssd
          best_clustering = np.copy(clustering)
        # print(best_ssd)
        # print("-----")

      # print("\nbest clustering = ")
      # print(best_clustering)
      return best_clustering

  # ---------------------------------------------------------
  # end KMeans class
  # ---------------------------------------------------------

def main():
  print("\nBegin spectral clustering from scratch ")
  np.set_printoptions(precision=4, suppress=True,
    floatmode="fixed", linewidth=200)

  # X = np.loadtxt(".\\Data\\dummy_data_10.txt",
  #   usecols = (0,1), delimiter = ",", comments="#",
  #   dtype=np.float64) 

  X = np.array([[0.20, 0.20],
    [0.20, 0.70],
    [0.30, 0.90],
    [0.50, 0.50],
    [0.50, 0.60],
    [0.60, 0.50],
    [0.70, 0.90],
    [0.40, 0.10],
    [0.90, 0.70],
    [0.80, 0.20]])

  print("\nData: ")
  print(X)

  k = 2
  gamma = 10.0
  print("\nClustering with k=2, gamma=0.10 ")
  sp = Spectral(k, gamma)
  clustering = sp.cluster(X)
  print("\nSpectral clustering: ")
  print(clustering)

  print("\nEnd demo ")

  colormap = np.array(['green', 'green'])
  plt.scatter(X[:, 0], X[:, 1],
    c=colormap[clustering], s=100)
  plt.xlim(0, 1); plt.ylim(0, 1)
  plt.xticks(np.arange(0.0, 1.1, 0.10))
  plt.yticks(np.arange(0.0, 1.1, 0.10))
  plt.grid()
  plt.show()

  colormap = np.array(['red', 'blue'])
  plt.scatter(X[:, 0], X[:, 1],
    c=colormap[clustering], s=100)
  plt.xlim(0, 1); plt.ylim(0, 1)
  plt.xticks(np.arange(0.0, 1.1, 0.10))
  plt.yticks(np.arange(0.0, 1.1, 0.10))
  plt.grid()
  plt.show()

if __name__ == "__main__":
  main()



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *