from sklearn.cluster import SpectralClustering, AgglomerativeClustering, KMeans import numpy as np from sklearn.metrics.pairwise import cosine_similarity class VideoClusterer: def __init__(self, clustering_method='uniform', n_clusters=2, similarity_threshold=0.8): self.n_clusters = n_clusters self.similarity_threshold = similarity_threshold self.clustering_method = clustering_method # Decide on the clustering method to use if clustering_method == 'uniform': self.clusterer = self.uniform_clustering elif clustering_method == 'spectral': self.clusterer = SpectralClustering(n_clusters=n_clusters, affinity='precomputed') elif clustering_method == 'agglomerative': self.clusterer = AgglomerativeClustering(n_clusters=n_clusters, metric='euclidean', linkage='ward') elif clustering_method == 'kmeans': self.clusterer = KMeans(n_clusters=n_clusters, n_init=1) else: raise ValueError(f"Invalid clustering method: {clustering_method}") def uniform_clustering(self, features): n = len(features) clusters = [] cluster_size = n // self.n_clusters remainder = n % self.n_clusters start = 0 for i in range(self.n_clusters): if i < remainder: end = start + cluster_size + 1 else: end = start + cluster_size clusters.append(list(range(start, end))) start = end return clusters def detect_outliers(self, features): dot_product_matrix = features.dot(features.T) average_similarities = np.mean(dot_product_matrix, axis=0) # Adding a small constant epsilon to the standard deviation to prevent division by zero epsilon = 1e-8 normal = (average_similarities - np.mean(average_similarities)) / (np.std(average_similarities) + epsilon) outlier_mask = np.logical_or(normal > 1.5, normal < -1.5) return outlier_mask def get_clusters(self, features): features = features.cpu().numpy() if self.clustering_method == 'uniform': return self.uniform_clustering(features) else: # For non-uniform methods, follow the original procedure outlier_mask = self.detect_outliers(features) if np.sum(~outlier_mask) > self.n_clusters: features = features[~outlier_mask] # Compute cosine similarity matrix for spectral clustering if self.clustering_method == 'spectral': similarity_matrix = cosine_similarity(features) labels = self.clusterer.fit_predict(similarity_matrix) else: # For agglomerative, k-means, and other clustering methods that don't require a precomputed matrix labels = self.clusterer.fit_predict(features) # Organize frames into clusters based on labels clusters = [[] for _ in range(self.n_clusters)] for idx, label in enumerate(labels): clusters[label].append(idx) return clusters