Source code for machinevisiontoolbox.BagOfWords

#!/usr/bin/env python3

from collections import Counter
from machinevisiontoolbox.ImagePointFeatures import BaseFeature2D
import numpy as np
import cv2 as cv

# TODO: remove top N% and bottom M% of words by frequency
[docs]class BagOfWords: def __init__(self, images, k=2_000, nstopwords=0, attempts=1, seed=None): r""" Bag of words class :param images: a sequence of images or set of image features :type images: :class:`~machinevisiontoolbox.Image` iterable, :class:`~machinevisiontoolbox.PointFeatures.BaseFeature2D` :param k: number of visual words, defaults to 2000 :type k: int, optional :param nstopwords: number of stop words, defaults to 50 :type nstopwords: int, optional :param attempts: number of k-means attempts, defaults to 1 :type attempts: int, optional Bag of words is a powerful feature-based method for matching images from widely different viewpoints. This class creates a bag of words from a sequence of images or a set of point features. In the former case, the features will have an ``.id`` equal to the index of the image in the sequence. For the latter case, features must have a valid ``.id`` attribute indicating which image in the bag they belong to. k-means clustering is performed to assign a word label to every feature. The cluster centroids are retained as a :math:`k \times N` array ``.centroids`` with one row per word centroid and each row is a feature descriptor, 128 elements long in the case of SIFT. ``.words`` is an array of word labels that corresponds to the array of image features ``.features``. The word labels are integers, initially in the range [0, ``k``). Stop words are those visual words that occur most often and we can remove ``nstopwords`` of them. The centroids are reordered so that the last ``nstopwords`` rows correspond to the stop words. When a new set of image features is assigned labels from the ``.centroids`` any with a label greater that ``.nstopwords`` is a stop word and can be discarded. :reference: - Video Google: a text retrieval approach to object matching in videos J.Sivic and A.Zisserman, in Proc. Ninth IEEE Int. Conf. on Computer Vision, pp.1470-1477, Oct. 2003. - Robotics, Vision & Control for Python, Section 12.4.2, P. Corke, Springer 2023. :seealso: :meth:`recall` :meth:`~machinevisiontoolbox.ImagePointFeatures.BaseFeature2D` :meth:`~machinevisiontoolbox.ImagePointFeatures.SIFT` `cv2.kmeans <https://docs.opencv.org/master/d5/d38/group__core__cluster.html#ga9a34dc06c6ec9460e90860f15bcd2f88>`_ """ if images is None: return if isinstance(images, BaseFeature2D): # passed the features features = images else: # passed images, compute the features features = [] for image in images: features += image.SIFT() features.sort(by="scale", inplace=True) self._images = images # save the image id's self._image_id = np.r_[features.id] self._nimages = self._image_id.max() + 1 self._features = features # do the clustering # NO IDEA WHAT EPSILON ACTUALLY MEANS, NORM OF THE SHIFT IN CENTROIDS? # NO IDEA HOW TO TELL WHAT CRITERIA IT TERMINATES ON criteria = (cv.TERM_CRITERIA_EPS + cv.TERM_CRITERIA_MAX_ITER, 10, 1.0) if seed is not None: cv.setRNGSeed(seed) ret, labels, centroids = cv.kmeans( data=features._descriptor, K=k, bestLabels=None, criteria=criteria, attempts=attempts, flags=cv.KMEANS_RANDOM_CENTERS, ) self._k = k self._words = labels.ravel() self._labels = labels.ravel() self._centroids = centroids self._word_freq_vectors = None self._nstopwords = nstopwords if nstopwords > 0: self._remove_stopwords() # compute word frequency vectors maxwords = self.k - self.nstopwords W = [] id = np.array(self._features.id) for i in range(self.nimages): # get the words associated with image i words = self.words[id == i] # create columns of the W v = BagOfWords._word_freq_vector(words, maxwords) W.append(v) W = np.column_stack(W) N = self.nimages # total number of occurences of word i # multiple occurences in the one image count only as one ni = (W > 0).sum(axis=1) idf = np.log(N / ni) M = [] for i in range(self.nimages): # number of words in this image nd = W[:, i].sum() # word occurrence frequency nid = W[:, i] with np.errstate(divide="ignore", invalid="ignore"): v = nid / nd * idf v[~np.isfinite(v)] = 0 M.append(v) self._word_freq_vectors = np.column_stack(M) self._idf = idf
[docs] def wwfv(self, i=None): """ Weighted word frequency vector for image :param i: image within bag, defaults to all images :type i: int, optional :return: word frequency vector or vectors :rtype: ndarray(K), ndarray(N,K) This is the word-frequency vector for the ``i``'th image in the bag. The angle between any two WFVs is an indication of image similarity. If ``i`` is None then the word-frequency matrix is returned, where the columns are the word-frequency vectors for the images in the bag. .. note:: The word vector is expensive to compute so a lazy evaluation is performed on the first call to this method. """ if i is not None: v = self._word_freq_vectors[:, i] if v.ndim == 1: return np.c_[v] else: return self._word_freq_vectors
@property def nimages(self): """ Number of images associated in the bag :return: number of images :rtype: int """ return self._nimages @property def images(self): """ Images associated with this bag :return: images associated with this bag :rtype: :class:`~machinevisiontoolbox.Image` iterable .. note:: Only valid if the bag was constructed from images rather than features. """ return self._images @property def k(self): """ Number of words in the visual vocabulary :return: number of words :rtype: int :seealso: :meth:`nstopwords` """ return self._k @property def words(self): """ Word labels for every feature :return: word labels :rtype: ndarray(N) Word labels are arranged such that the top ``nstopwords`` labels are stop words. :seealso: :meth:`nstopwords` """ return self._words # TODO better name for above
[docs] def word(self, f): """ Word labels for original feature :return: word labels :rtype: ndarray(N) Word labels are arranged such that the top ``nstopwords`` labels """ return self._labels[f]
@property def nwords(self): """ Number of usable words :return: number of usable words :rtype: int This is ``k`` - ``nstopwords``. :seealso: :meth:`k` :meth:`nstopwords` """ return self._k - self._nstopwords @property def nstopwords(self): """ Number of stop words :return: Number of stop words :rtype: int :seealso: :meth:`k` :meth:`nwords` """ return self._nstopwords @property def firststop(self): """ First stop word :return: word index of first stop word :rtype: int """ return self.k - self._nstopwords @property def centroids(self): """ Word feature centroids :return: centroids of visual word features :rtype: ndarray(k,N) Is an array with one row per visual word, and the row is the feature descriptor vector. eg. for SIFT features it is 128 elements. Centroids are arranged such that the last ``nstopwords`` rows correspond to the stop words. After clustering against the centroids, any word with a label ``>= nstopwords`` is a stop word. .. note:: The stop words are kept in the centroid array for the recall process. :seealso: :meth:`similarity` """ return self._centroids def __repr__(self): return str(self) def __str__(self): s = f"BagOfWords: {len(self.words)} features from {self.nimages} images" s += f", {self.nwords} words, {self.nstopwords} stop words" return s def _remove_stopwords(self, verbose=True): # BagOfWords.remove_stop Remove stop words # # B.remove_stop(N) removes the N most frequent words (the stop words) # from the self. All remaining words are renumbered so that the word # labels are consecutive. # words, freq = self.wordfreq() # index = np.argsort(-freq) # sort descending order # # top ``nstopwords`` most frequent are the stop words # stopwords = words[index[:self._nstopwords]] unique_words, freq = self.wordfreq() # unique_words will be [0,k) index = np.argsort(-freq) # sort descending order stopwords = unique_words[index[: self.nstopwords]] # array of stop words stopfeatures = freq[stopwords].sum() print( f"Removing {stopfeatures} features ({stopfeatures/len(self.words) * 100:.1f}%) associated with {self.nstopwords} most frequent words" ) k = np.full(index.shape, False, dtype=bool) k[stopwords] = True # k = freq > stop_cut # index of all the stop words # indices of all non-stop words, followed by all stop words map = np.hstack((unique_words[~k], unique_words[k])) # create a dictionary from old label to new label # now all stop words have an index in the range [k-nstopwords, k) mapdict = {} for w in unique_words: mapdict[map[w]] = w # map the word labels words = np.array([mapdict[w] for w in self.words]) self._labels = words # only retain the non stop words keep = words < self.nstopwords self._words = words[keep] self._image_id = self._image_id[keep] self._features = self._features[keep] # rearrange the cluster centroids self._centroids = self._centroids[map]
[docs] def similarity(self, arg): """ Compute similarity between bag and query images :param other: bag of words :type other: BagOfWords :return: confusion matrix :rtype: ndarray(M,N) The array has rows corresponding to the images in ``self`` and columns corresponding to the images in ``other``. :seealso: :meth:`.closest` """ if isinstance(arg, np.ndarray): wwfv = arg sim = np.empty((wwfv.shape[1], self.nimages)) for j, vj in enumerate(wwfv.T): for i in range(self.nimages): vi = self.wwfv(i) with np.errstate(divide="ignore", invalid="ignore"): sim[j, i] = np.dot(vi.ravel(), vj) / ( np.linalg.norm(vi) * np.linalg.norm(vj) ) else: images = arg if not hasattr(images, "__iter__"): # if not iterable like a FileCollection or VideoFile turn the image # into a list of 1 images = [images] # similarity has bag index as column, query index as row sim = np.empty((len(images), self.nimages)) for j, image in enumerate(images): features = image.SIFT(id="image") # assign features to given cluster centroids # the elements of matches are: # queryIdx: new feature index # trainingIdx: cluster centre index bfm = cv.BFMatcher(cv.NORM_L2, crossCheck=False) matches = bfm.match(features._descriptor, self._centroids) words = np.array([m.trainIdx for m in matches]) keep = words < self.nstopwords words = words[keep] # word occurrence frequency nid = BagOfWords._word_freq_vector(words, self.k - self.nstopwords) # number of words in this image nd = nid.sum() with np.errstate(divide="ignore", invalid="ignore"): v2 = nid / nd * self._idf v2[~np.isfinite(v2)] = 0 for i in range(self.nimages): v1 = self.wwfv(i).ravel() with np.errstate(divide="ignore", invalid="ignore"): sim[j, i] = np.dot(v1, v2) / ( np.linalg.norm(v1) * np.linalg.norm(v2) ) if sim.shape[0] == 1: sim = sim[0, :] return sim
def retrieve(self, images): S = self.similarity(images).ravel() k = np.argmax(S) return k, S[k]
[docs] def features(self, word): """ Get features corresponding to word :param word: visual word label :type word: int :return: features corresponding to this label :rtype: :class:`~machinevisiontoolbox.PointFeatures.BaseFeature2D` Return a slice of the image features corresponding to this word label. The ``.id`` attribute of each feature indicates which image in the bag it belongs to. """ return self._features[self.words == word]
[docs] def occurrence(self, word): """ Number of occurrences of specified word :param word: visual word label :type word: int :return: total number of times that visual ``word`` appears in this bag :rtype: int """ return np.sum(self.words == word)
@staticmethod def _word_freq_vector(words, maxwords): # create columns of the W unique, unique_counts = np.unique(words, return_counts=True) # [w,f] = count_unique(words) v = np.zeros((maxwords,)) v[unique] = unique_counts return v
[docs] def wordfreq(self): """ Get visual word frequency :return: visual words, visual word frequency :rtype: ndarray, ndarray Returns two arrays, one containing all visual words, the other containing the frequency of the corresponding word across all images. """ # BagOfWords.wordfreq Word frequency statistics # # [W,N] = B.wordfreq[] is a vector of word labels W and the corresponding # elements of N are the number of occurrences of that word. return np.unique(self.words, return_counts=True)
[docs] def closest(self, S, i): """ Find closest image :param S: bag similarity matrix :type S: ndarray(N,M) :param i: the query image index :type i: int :return: index of the recalled image and similarity :rtype: int, float :seealso: :meth:`similarity` """ s = S[:, i] index = np.argsort(-s) return index, s[index]
[docs] def contains(self, word): """ Images that contain specified word :param word: visual word label :type word: int :return: list of images containing this word :rtype: list :seealso: :meth:`exemplars` """ return np.unique(self._image_id[self.words == word])
[docs] def exemplars( self, word, images=None, maxperimage=2, columns=10, max=None, width=50, **kwargs ): """ Composite image containing exemplars of specified word :param word: visual word label :type word: int :param images: the set of images corresponding to this bag, only required if the bag was constructed from features not images. :param maxperimage: maximum number of exemplars drawn from any one image, defaults to 2 :type maxperimage: int, optional :param columns: number of exemplar images in each row, defaults to 10 :type columns: int, optional :param max: maximum number of exemplar images, defaults to None :type max: int, optional :param width: width of image thumbnail, defaults to 50 :type width: int, optional :return: composite image :rtype: :class:`~machinevisiontoolbox.Image` Produces a grid of examples of a particular visual word. :seealso: :meth:`contains` :meth:`~machinevisiontoolbox.ImagePointFeatures.BaseFeature2D.support` :meth:`~machinevisiontoolbox.Image.Tile` """ from machinevisiontoolbox import Image exemplars = [] count = Counter() if images is None: images = self._images for feature in self.features(word): count[feature.id] += 1 if count[feature.id] > maxperimage: continue exemplars.append(feature.support(images, width)) if max is not None and len(exemplars) >= max: break return Image.Tile(exemplars, columns=columns, **kwargs)
if __name__ == "__main__": import numpy as np import matplotlib.pyplot as plt from machinevisiontoolbox import * import cv2 as cv cv.setRNGSeed(0) images = ImageCollection("campus/*.png", mono=True) features = [] for image in images: features += image.SIFT() # sort them in descending order by strength features.sort(by="scale", inplace=True) features[:10].table() ex = [] for i in range(400): ex.append(features[i].support(images)) Image.Tile(ex, columns=20).disp(plain=True) feature = features[108] print(feature) bag = BagOfWords(features, 2_000) w = bag.word(108) print(w) print(bag.occurrence(w)) print(bag.contains(w)) bag.exemplars(w, images) bag = BagOfWords(images, 2_000) print(bag) w, f = bag.wordfreq() print(len(w)) bag = BagOfWords(images, 2_000, nstopwords=50) print(bag) print(bag.wwfv(0).shape) print(bag.wwfv().shape) print(bag.similarity(bag.wwfv(3))) print(bag.similarity(images[:5])) sim_8 = bag.similarity(images[8]).ravel() print(sim_8) k = np.argsort(-sim_8) print(np.c_[sim_8[k], k]) ss = [] for i in range(4): ss.append(images[k[i]]) Image.Tile(ss, columns=2).disp() holdout = ImageCollection("campus/holdout/*.png", mono=True) sim = bag.similarity(holdout) sim_2 = bag.similarity(holdout[2]).ravel() print(sim_2) k = np.argsort(-sim_2) print(np.c_[sim_2[k], k]) ss = [holdout[2]] for i in range(3): ss.append(images[k[i]]) Image.Tile(ss, columns=2).disp() Image(sim).disp(block=True)