"""
Visual Bag of Words model for image classification and retrieval.
"""
from collections import Counter
from typing import Any
import cv2
import numpy as np
from machinevisiontoolbox.ImagePointFeatures import BaseFeature2D
from machinevisiontoolbox import Image
# TODO: remove top N% and bottom M% of words by frequency
[docs]
class BagOfWords:
def __init__(
self,
images,
k: int = 2_000,
nstopwords: int = 0,
attempts: int = 1,
seed: int | None = None,
) -> None:
r"""
Bag of words class
:param images: a sequence of images or set of image features
:type images: :class:`~machinevisiontoolbox.Image` iterable, :class:`~machinevisiontoolbox.PointFeatures.BaseFeature2D`
:param k: number of visual words, defaults to 2000
:type k: int, optional
:param nstopwords: number of stop words, defaults to 50
:type nstopwords: int, optional
:param attempts: number of k-means attempts, defaults to 1
:type attempts: int, optional
Bag of words is a powerful feature-based method for matching images
from widely different viewpoints.
This class creates a bag of words from a sequence of images or a set of
point features. In the former case, the features will have an ``.id``
equal to the index of the image in the sequence. For the latter case,
features must have a valid ``.id`` attribute indicating which image in
the bag they belong to.
k-means clustering is performed to assign a word label to every feature.
The cluster centroids are retained as a :math:`k \times N` array
``.centroids`` with one row per word centroid and each row is a feature
descriptor, 128 elements long in the case of SIFT.
``.words`` is an array of word labels that corresponds to the array of
image features ``.features``. The word labels are integers, initially
in the range [0, ``k``).
Stop words are those visual words that occur most often and we can
remove ``nstopwords`` of them. The centroids are reordered so that the
last ``nstopwords`` rows correspond to the stop words. When a new set
of image features is assigned labels from the ``.centroids`` any with a
label greater that ``.nstopwords`` is a stop word and can be discarded.
:reference:
- Video Google: a text retrieval approach to object matching in videos
J.Sivic and A.Zisserman,
in Proc. Ninth IEEE Int. Conf. on Computer Vision,
pp.1470-1477, Oct. 2003.
- Robotics, Vision & Control for Python, Section 12.4.2,
P. Corke, Springer 2023.
:seealso: :meth:`recall` :meth:`~machinevisiontoolbox.ImagePointFeatures.BaseFeature2D`
:meth:`~machinevisiontoolbox.ImagePointFeatures.SIFT`
`cv2.kmeans <https://docs.opencv.org/4.x/d5/d38/group__core__cluster.html#ga9a34dc06c6ec9460e90860f15bcd2f88>`_
"""
if images is None:
return
if isinstance(images, BaseFeature2D):
# passed the features
features = images
else:
# passed images, compute the features
features = []
for image in images:
features += image.SIFT()
features.sort(by="scale", inplace=True)
self._images = images
# save the image id's
self._image_id = np.r_[features.id]
self._nimages = self._image_id.max() + 1
self._features = features
# do the clustering
# NO IDEA WHAT EPSILON ACTUALLY MEANS, NORM OF THE SHIFT IN CENTROIDS?
# NO IDEA HOW TO TELL WHAT CRITERIA IT TERMINATES ON
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
if seed is not None:
cv2.setRNGSeed(seed)
ret, labels, centroids = cv2.kmeans(
data=features._descriptor,
K=k,
bestLabels=None,
criteria=criteria,
attempts=attempts,
flags=cv2.KMEANS_RANDOM_CENTERS,
)
self._k = k
self._words = labels.ravel()
self._labels = labels.ravel()
self._centroids = centroids
self._word_freq_vectors = None
self._nstopwords = nstopwords
if nstopwords > 0:
self._remove_stopwords()
# compute word frequency vectors
maxwords = self.k - self.nstopwords
W = []
id = np.array(self._features.id)
for i in range(self.nimages):
# get the words associated with image i
words = self.words[id == i]
# create columns of the W
v = BagOfWords._word_freq_vector(words, maxwords)
W.append(v)
W = np.column_stack(W)
N = self.nimages
# total number of occurrences of word i
# multiple occurrences in the one image count only as one
ni = (W > 0).sum(axis=1)
idf = np.log(N / ni)
M = []
for i in range(self.nimages):
# number of words in this image
nd = W[:, i].sum()
# word occurrence frequency
nid = W[:, i]
with np.errstate(divide="ignore", invalid="ignore"):
v = nid / nd * idf
v[~np.isfinite(v)] = 0
M.append(v)
self._word_freq_vectors = np.column_stack(M)
self._idf = idf
[docs]
def wwfv(self, i: int | None = None) -> np.ndarray:
"""
Weighted word frequency vector for image
:param i: image within bag, defaults to all images
:type i: int, optional
:return: word frequency vector or vectors
:rtype: ndarray(K), ndarray(N,K)
This is the word-frequency vector for the ``i``'th image in the bag. The
angle between any two WFVs is an indication of image similarity.
If ``i`` is None then the word-frequency matrix is returned, where the
columns are the word-frequency vectors for the images in the bag.
.. note:: The word vector is expensive to compute so a lazy evaluation
is performed on the first call to this method.
"""
if i is not None:
v = self._word_freq_vectors[:, i]
if v.ndim == 1:
return np.c_[v]
else:
return self._word_freq_vectors
@property
def nimages(self) -> int:
"""
Number of images associated in the bag
:return: number of images
:rtype: int
"""
return self._nimages
@property
def images(self) -> Any:
"""
Images associated with this bag
:return: images associated with this bag
:rtype: :class:`~machinevisiontoolbox.Image` iterable
.. note:: Only valid if the bag was constructed from images rather than features.
"""
return self._images
@property
def k(self) -> int:
"""
Number of words in the visual vocabulary
:return: number of words
:rtype: int
:seealso: :meth:`nstopwords`
"""
return self._k
@property
def words(self) -> np.ndarray:
"""
Word labels for every feature
:return: word labels
:rtype: ndarray(N)
Word labels are arranged such that the top ``nstopwords`` labels are
stop words.
:seealso: :meth:`nstopwords`
"""
return self._words
# TODO better name for above
[docs]
def word(self, f: int) -> int:
"""
Word labels for original feature
:return: word labels
:rtype: ndarray(N)
Word labels are arranged such that the top ``nstopwords`` labels
"""
return self._labels[f]
@property
def nwords(self) -> int:
"""
Number of usable words
:return: number of usable words
:rtype: int
This is ``k`` - ``nstopwords``.
:seealso: :meth:`k` :meth:`nstopwords`
"""
return self._k - self._nstopwords
@property
def nstopwords(self) -> int:
"""
Number of stop words
:return: Number of stop words
:rtype: int
:seealso: :meth:`k` :meth:`nwords`
"""
return self._nstopwords
@property
def firststop(self) -> int:
"""
First stop word
:return: word index of first stop word
:rtype: int
"""
return self.k - self._nstopwords
@property
def centroids(self) -> np.ndarray:
"""
Word feature centroids
:return: centroids of visual word features
:rtype: ndarray(k,N)
Is an array with one row per visual word, and the row is the feature
descriptor vector. eg. for SIFT features it is 128 elements.
Centroids are arranged such that the last ``nstopwords`` rows correspond
to the stop words. After clustering against the centroids, any word
with a label ``>= nstopwords`` is a stop word.
.. note:: The stop words are kept in the centroid array for the recall process.
:seealso: :meth:`similarity`
"""
return self._centroids
def __repr__(self) -> str:
return str(self)
def __str__(self) -> str:
s = f"BagOfWords: {len(self.words)} features from {self.nimages} images"
s += f", {self.nwords} words, {self.nstopwords} stop words"
return s
def _remove_stopwords(self, verbose: bool = True) -> None:
# BagOfWords.remove_stop Remove stop words
#
# B.remove_stop(N) removes the N most frequent words (the stop words)
# from the self. All remaining words are renumbered so that the word
# labels are consecutive.
# words, freq = self.wordfreq()
# index = np.argsort(-freq) # sort descending order
# # top ``nstopwords`` most frequent are the stop words
# stopwords = words[index[:self._nstopwords]]
unique_words, freq = self.wordfreq()
# unique_words will be [0,k)
index = np.argsort(-freq) # sort descending order
stopwords = unique_words[index[: self.nstopwords]] # array of stop words
stopfeatures = freq[stopwords].sum()
print(
f"Removing {stopfeatures} features ({stopfeatures/len(self.words) * 100:.1f}%) associated with {self.nstopwords} most frequent words"
)
k = np.full(index.shape, False, dtype=bool)
k[stopwords] = True
# k = freq > stop_cut # index of all the stop words
# indices of all non-stop words, followed by all stop words
map = np.hstack((unique_words[~k], unique_words[k]))
# create a dictionary from old label to new label
# now all stop words have an index in the range [k-nstopwords, k)
mapdict = {}
for w in unique_words:
mapdict[map[w]] = w
# map the word labels
words = np.array([mapdict[w] for w in self.words])
self._labels = words
# only retain the non stop words
keep = words < self.nstopwords
self._words = words[keep]
self._image_id = self._image_id[keep]
self._features = self._features[keep]
# rearrange the cluster centroids
self._centroids = self._centroids[map]
[docs]
def similarity(self, query) -> np.ndarray:
"""
Compute similarity between bag of words and query
:param query: bag of words or image features
:type query: BagOfWords or ndarray
:return: similarity matrix
:rtype: ndarray(M,N)
The array has rows corresponding to the images in ``self`` and
columns corresponding to the queries in ``query``.
``query`` can be:
- a single image, a list of images, or an Image iterator (like VideoFile,
ZipArchive etc.) for which the visual words are computed using the same
dictionary of visual words as the bag.
- a set of image features, in which case the similarity is computed between the
bag and the query features, or
:seealso: :meth:`.closest`
"""
if isinstance(query, np.ndarray):
wwfv = query
sim = np.empty((wwfv.shape[1], self.nimages))
for j, vj in enumerate(wwfv.T):
for i in range(self.nimages):
vi = self.wwfv(i)
with np.errstate(divide="ignore", invalid="ignore"):
sim[j, i] = np.dot(vi.ravel(), vj) / (
np.linalg.norm(vi) * np.linalg.norm(vj)
)
else:
images = query
if isinstance(images, Image):
images = [images]
# similarity has bag index as column, query index as row
sim = np.empty((len(images), self.nimages))
for j, image in enumerate(images):
features = image.SIFT(id="image")
# assign features to given cluster centroids
# the elements of matches are:
# queryIdx: new feature index
# trainingIdx: cluster centre index
bfm = cv2.BFMatcher(normType=cv2.NORM_L2, crossCheck=False)
matches = bfm.match(features._descriptor, self._centroids)
words = np.array([m.trainIdx for m in matches])
keep = words < self.nstopwords
words = words[keep]
# word occurrence frequency
nid = BagOfWords._word_freq_vector(words, self.k - self.nstopwords)
# number of words in this image
nd = nid.sum()
with np.errstate(divide="ignore", invalid="ignore"):
v2 = nid / nd * self._idf
v2[~np.isfinite(v2)] = 0
for i in range(self.nimages):
v1 = self.wwfv(i).ravel()
with np.errstate(divide="ignore", invalid="ignore"):
sim[j, i] = np.dot(v1, v2) / (
np.linalg.norm(v1) * np.linalg.norm(v2)
)
if sim.shape[0] == 1:
sim = sim[0, :]
return sim
def retrieve(self, images) -> tuple[int, float]:
S = self.similarity(images).ravel()
k = np.argmax(S)
return k, S[k]
[docs]
def features(self, word: int) -> BaseFeature2D:
"""
Get features corresponding to word
:param word: visual word label
:type word: int
:return: features corresponding to this label
:rtype: :class:`~machinevisiontoolbox.PointFeatures.BaseFeature2D`
Return a slice of the image features corresponding to this word label.
The ``.id`` attribute of each feature indicates which image in the bag
it belongs to.
"""
return self._features[self.words == word]
[docs]
def occurrence(self, word: int) -> int:
"""
Number of occurrences of specified word
:param word: visual word label
:type word: int
:return: total number of times that visual ``word`` appears in this bag
:rtype: int
"""
return np.sum(self.words == word)
@staticmethod
def _word_freq_vector(words, maxwords: int) -> np.ndarray:
# create columns of the W
unique, unique_counts = np.unique(words, return_counts=True)
# [w,f] = count_unique(words)
v = np.zeros((maxwords,))
v[unique] = unique_counts
return v
[docs]
def wordfreq(self) -> tuple[np.ndarray, np.ndarray]:
"""
Get visual word frequency
:return: visual words, visual word frequency
:rtype: ndarray, ndarray
Returns two arrays, one containing all visual words, the other containing
the frequency of the corresponding word across all images.
"""
# BagOfWords.wordfreq Word frequency statistics
#
# [W,N] = B.wordfreq[] is a vector of word labels W and the corresponding
# elements of N are the number of occurrences of that word.
return np.unique(self.words, return_counts=True)
[docs]
def closest(self, S: np.ndarray, i: int) -> tuple[np.ndarray, np.ndarray]:
"""
Find closest image
:param S: bag similarity matrix
:type S: ndarray(N,M)
:param i: the query image index
:type i: int
:return: index of the recalled image and similarity
:rtype: int, float
:seealso: :meth:`similarity`
"""
s = S[:, i]
index = np.argsort(-s)
return index, s[index]
[docs]
def contains(self, word: int) -> np.ndarray:
"""
Images that contain specified word
:param word: visual word label
:type word: int
:return: list of images containing this word
:rtype: list
:seealso: :meth:`exemplars`
"""
return np.unique(self._image_id[self.words == word])
[docs]
def exemplars(
self,
word: int,
images=None,
maxperimage: int = 2,
columns: int = 10,
max: int | None = None,
width: int = 50,
**kwargs,
):
"""
Composite image containing exemplars of specified word
:param word: visual word label
:type word: int
:param images: the set of images corresponding to this bag, only
required if the bag was constructed from features not images.
:param maxperimage: maximum number of exemplars drawn from any one image, defaults to 2
:type maxperimage: int, optional
:param columns: number of exemplar images in each row, defaults to 10
:type columns: int, optional
:param max: maximum number of exemplar images, defaults to None
:type max: int, optional
:param width: width of image thumbnail, defaults to 50
:type width: int, optional
:return: composite image
:rtype: :class:`~machinevisiontoolbox.Image`
Produces a grid of examples of a particular visual word.
:seealso: :meth:`contains`
:meth:`~machinevisiontoolbox.ImagePointFeatures.BaseFeature2D.support`
:meth:`~machinevisiontoolbox.Image.Tile`
"""
from machinevisiontoolbox import Image
exemplars = []
count = Counter()
if images is None:
images = self._images
for feature in self.features(word):
count[feature.id] += 1
if count[feature.id] > maxperimage:
continue
exemplars.append(feature.support(images, width))
if max is not None and len(exemplars) >= max:
break
return Image.Tile(exemplars, columns=columns, **kwargs)
if __name__ == "__main__":
# No dedicated test file for this module.
pass