Package tf2-som
tf2-som is a fast Tensorflow 2 implementation of the Self Organizing Maps (SOM).
Installing tf2-som
pip3 install tf2-som
Importing tf2-som
import tf_som
Demo
Authors
Expand source code
# -*- coding: utf-8 -*-
########################################################################################################################
# TF_SOM
#
# Copyright © 2022 CNRS/LPSC
#
# Author: Jérôme ODIER (jerome.odier@lpsc.in2p3.fr)
# Nora ACHBAK (nora.achbak@lpsc.in2p3.fr)
#
# Repositories: https://gitlab.in2p3.fr/jodier/tf2_som/
# https://github.com/odier-xyz/tf2_som/
#
# This software is a computer program whose purpose is to provide an
# implementation of the Self Organizing Maps (SOM).
#
# This software is governed by the CeCILL-C license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/or redistribute the software under the terms of the CeCILL-C
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL-C license and that you accept its terms.
########################################################################################################################
""".. include:: ../docs/header.md
## Package **tf2-som**
**tf2-som** is a fast Tensorflow 2 implementation of the Self Organizing Maps (SOM).
<img src="som.png" alt="som" height="250" />
### Installing **tf2-som**
```sh
pip3 install tf2-som
```
### Importing **tf2-som**
```python
import tf_som
```
### Demo
[🔗 Click there](https://github.com/odier-io/tf2-som/blob/master/demo/demo.ipynb)
### Authors
- [Jérôme ODIER](https://annuaire.in2p3.fr/4121-4467/jerome-odier) ([CNRS/LPSC](https://lpsc.in2p3.fr/))
- [Nora ACHBAK](https://annuaire.in2p3.fr/7591-10426/nora-achbak) ([CNRS/LPSC](https://lpsc.in2p3.fr/))
"""
########################################################################################################################
import os
import json
import tqdm
import typing
########################################################################################################################
import numpy as np
import tensorflow as tf
########################################################################################################################
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'metadata.json'), 'r') as f:
metadata = json.load(f)
__credits__ = metadata['credits']
__version__ = metadata['version']
__author__ = ', '.join(['{} ({})'.format(x[0], x[1]) for x in zip(metadata['author_names'], metadata['author_emails'])])
########################################################################################################################
def setup_tensorflow_for_cpus(num_threads: int = None) -> None:
"""Setups Tensorflow 2 for CPU parallelization.
Arguments
---------
num_threads : int
Number of threads or **None** to let the system choose (default: **None**).
"""
####################################################################################################################
# noinspection PyBroadException
try:
################################################################################################################
if num_threads is None:
import multiprocessing
num_threads = multiprocessing.cpu_count()
################################################################################################################
tf.config.threading.set_inter_op_parallelism_threads(
num_threads
)
tf.config.threading.set_intra_op_parallelism_threads(
num_threads
)
tf.config.set_soft_device_placement(True)
################################################################################################################
except:
pass
########################################################################################################################
def normalize(df: typing.Any, dtype: type = np.float32) -> typing.Any:
"""Returns the normalized Pandas data frame.
Arguments
---------
df : pandas.DataFrame
Pandas data frame.
dtype : type
Neural network data type (default: **numpy.float32**).
"""
result = df.copy()
for i in df.columns:
max_value = df[i].max()
min_value = df[i].min()
result[i] = (df[i] - min_value) / (max_value - min_value)
return result.astype(dtype)
########################################################################################################################
def _asymptotic_decay(epoch: int, epochs: int) -> float:
return 1.0 / (1.0 + 2.0 * epoch / epochs)
########################################################################################################################
class BMUs(object):
__pdoc__ = False
"""Best Matching Units"""
def __init__(self, indices: np.ndarray, locations: np.ndarray):
#: Array of indices
self.indices: np.ndarray = indices
#: Array of locations
self.locations: np.ndarray = locations
########################################################################################################################
class SOM(object):
"""Tensorflow 2 implementation of the Self Organizing Maps (SOM)."""
####################################################################################################################
def __init__(self,
m: int, n: int, dim: int,
seed: float = None, dtype: type = np.float32,
learning_rate: float = None, sigma: float = None, epochs: int = 100, decay_function = _asymptotic_decay):
"""Initializes a Self Organizing Maps.
A rule of thumb to set the size of the grid for a dimensionality reduction
task is that it should contain \\( 5\\sqrt{N} \\) neurons where N is the
number of samples in the dataset to analyze.
Arguments
---------
m : int
Number of neuron rows.
n : int
Number of neuron columns.
dim : int
Dimensionality of the input data.
seed : int
Seed of the random generators (default: **None**).
dtype : type
Neural network data type (default: **np.float32**).
learning_rate : float
Starting value of the learning rate (default: 0.3).
sigma : float
Starting value of the neighborhood radius (default: \\( \\mathrm{max}(m,n)/2 \\)).
epochs : int
Number of epochs to train for (default: 100).
decay_function : function
Function that reduces learning_rate and sigma at each iteration (default: \\( 1/\\left(1+2\\frac{epoch}{epochs}\\right) \\)).
"""
################################################################################################################
self._m = m
self._n = n
self._dim = dim
self._seed = seed
self._dtype = dtype
self._decay_function = decay_function
################################################################################################################
self._rebuild_topography()
################################################################################################################
self._learning_rate = 0.3 if learning_rate is None else dtype(learning_rate)
self._sigma = max(m, n) / 2.0 if sigma is None else dtype(sigma)
################################################################################################################
self._epochs = abs(int(epochs))
################################################################################################################
self._two = tf.constant(2.00e+00, dtype = dtype)
self._epsilon = tf.constant(1.00e-20, dtype = dtype)
self._sqrt_two = tf.constant(1.42e+00, dtype = dtype)
################################################################################################################
self._weights = np.empty(shape = (self._m * self._n, self._dim), dtype = self._dtype)
self._quantization_errors = np.empty(shape = (self._epochs, ), dtype = self._dtype)
self._topographic_errors = np.empty(shape = (self._epochs, ), dtype = self._dtype)
####################################################################################################################
def _rebuild_topography(self):
self._topography = tf.constant(np.array(list(self._neuron_locations(self._m, self._n))))
####################################################################################################################
@staticmethod
def _neuron_locations(m: int, n: int) -> typing.Iterator[typing.List[int]]:
for i in range(m):
for j in range(n):
yield [i, j]
####################################################################################################################
@staticmethod
def _argsort_n(x: tf.Tensor, n: int) -> tf.Tensor:
if n > 1:
return tf.nn.top_k(tf.negative(x), k = n).indices
else:
return tf.expand_dims(tf.argmin(x, axis = 1), axis = 1)
####################################################################################################################
@staticmethod
def __find_bmus(weights: typing.Union[tf.Variable, tf.Tensor], input_vectors: tf.Tensor, n: int = 1) -> tf.Tensor:
################################################################################################################
# COMPUTE DISTANCE SQUARES #
################################################################################################################
distance_squares = tf.reduce_sum(
tf.square(
tf.subtract(
tf.expand_dims(input_vectors, axis = 1),
tf.expand_dims(weights, axis = 0)
)
),
axis = 2
)
################################################################################################################
# COMPUTE INDICES #
################################################################################################################
return tf.transpose(SOM._argsort_n(distance_squares, n))
####################################################################################################################
def _find_bmus(self, weights: typing.Union[tf.Variable, tf.Tensor], input_vectors: tf.Tensor, n: int = 1) -> typing.List[BMUs]:
result = []
for bmu_indices in self.__find_bmus(weights, input_vectors, n):
bmu_locations = tf.gather(self._topography, bmu_indices)
result.append(BMUs(bmu_indices, bmu_locations))
return result
####################################################################################################################
def _train(self, weights: tf.Variable, input_vectors: tf.Tensor, epoch: int) -> None:
################################################################################################################
# BEST MATCHING UNITS #
################################################################################################################
bmus = self._find_bmus(weights, input_vectors, n = 2)
################################################################################################################
# LEARNING OPERATOR #
################################################################################################################
decay_function = self._decay_function(epoch, self._epochs)
current_learning_rate = tf.cast(self._learning_rate * decay_function, dtype = self._dtype)
current_radius = tf.cast(self._sigma * decay_function, dtype = self._dtype)
################################################################################################################
bmu_distance_squares = tf.reduce_sum(
tf.square(
tf.subtract(
tf.expand_dims(self._topography, axis = 0),
tf.expand_dims(bmus[0].locations, axis = 1)
)
),
axis = 2
)
################################################################################################################
neighbourhood_func = tf.exp(tf.divide(
tf.negative(tf.cast(bmu_distance_squares, self._dtype)),
tf.multiply(self._two, tf.square(current_radius))
))
################################################################################################################
learning_rate_op = tf.multiply(neighbourhood_func, current_learning_rate)
################################################################################################################
# WEIGHT(EPOCH + 1) #
################################################################################################################
numerator = tf.reduce_sum(
tf.multiply(
tf.expand_dims(learning_rate_op, axis = -1),
tf.expand_dims(input_vectors, axis = +1)
),
axis = 0
)
denominator = tf.expand_dims(tf.reduce_sum(learning_rate_op, axis = 0), axis = -1) + self._epsilon
################################################################################################################
weights.assign(tf.divide(numerator, denominator))
################################################################################################################
# QUANTIZATION ERROR #
################################################################################################################
self._quantization_errors[epoch] = tf.reduce_mean(
tf.norm(
tf.subtract(
input_vectors,
tf.gather(weights, bmus[0].indices)
),
axis = 1
),
axis = 0
)
################################################################################################################
# TOPOGRAPHIC ERROR #
################################################################################################################
t = tf.greater(
tf.norm(
tf.subtract(
tf.cast(bmus[1].locations, dtype = self._dtype),
tf.cast(bmus[0].locations, dtype = self._dtype)
),
axis = 1
),
self._sqrt_two
)
self._topographic_errors[epoch] = tf.divide(
tf.reduce_sum(tf.cast(t, dtype = self._dtype)),
tf.cast(tf.shape(input_vectors)[0], dtype = self._dtype)
)
####################################################################################################################
@staticmethod
def _dataset_to_generator(dataset):
return dataset if callable(dataset) else lambda: (dataset, )
####################################################################################################################
def train(self, dataset: typing.Union[np.ndarray, typing.Callable], chunk_size: int = None, reset_weights: bool = True, show_progress_bar: bool = True) -> None:
"""Trains the neural network. A batch formulation of updating weights is implemented: $$ \\mathrm{bmu}(x)=\\underset{i}{\\mathrm{arg\\,min}}\\lVert x-w_i\\rVert $$ $$ n_j=\\sum_{x\\in\\mathcal{D}}\\left\\{\\begin{array}{ll}1&\\mathrm{bmu}(x)=j\\\\0&\\mathrm{otherwise}\\end{array}\\right. $$ $$ \\Theta_{ji}(e)=\\alpha(e)\\cdot\\exp\\left(-\\frac{\\lVert j-i\\rVert}{2\\sigma^2(e)}\\right) $$ $$ \\boxed{w_{i\\,\\mathrm{new}}=\\frac{\\sum_{j=1}^{n}n_j\\Theta_{ji}(e)x_j}{\\sum_{j=1}^{n}n_j\\Theta_{ji}(e)}} $$ where, at epoch \\( e \\), \\( \\alpha(e)=\\alpha_0\\mathrm{decay\\,function}(e) \\) is the learning rate and \\( \\sigma(e)=\\sigma_0\\mathrm{decay\\,function}(e) \\) is the neighborhood radius.
Parameters
----------
dataset : typing.Union[numpy.ndarray, typing.Callable]
Training dataset array or generator.
chunk_size : int
Chunk size or **None** to disable batch (default: **None**).
reset_weights : bool
Specifying whether the weights have to be reset (default: **True**).
show_progress_bar : bool
Specifying whether a progress bar have to be shown (default: **True**).
.. NOTE::
A dataset generator is a **yield** generator returning numpy arrays.
.. NOTE::
Setting the **reset_weights** variable to **False** allows to update a pre-trained model (= progressive training).
"""
################################################################################################################
weights = None
################################################################################################################
if reset_weights and self._seed is not None:
np.random.seed(self._seed + 0x000000000000)
tf.random.set_seed(self._seed + 0x000000000000)
################################################################################################################
dataset_generator = SOM._dataset_to_generator(dataset)
################################################################################################################
for epoch in tqdm.tqdm(range(self._epochs), disable = not show_progress_bar):
for dataset_part_number, dataset_array_np in enumerate(dataset_generator()):
########################################################################################################
# DEFAULT CHUNK SIZE #
########################################################################################################
if chunk_size is None:
chunk_size = dataset_array_np.shape[0]
########################################################################################################
# INITIALIZE WEIGHTS #
########################################################################################################
if epoch == 0 and dataset_part_number == 0:
if reset_weights:
rand_weights_np = np.empty(shape = (self._m * self._n, self._dim), dtype = self._dtype)
l1 = rand_weights_np.shape[0]
l2 = dataset_array_np.shape[0]
for i in range(l1):
j = np.random.randint(l2)
rand_weights_np[i] = dataset_array_np[j]
weights = tf.Variable(rand_weights_np, dtype = self._dtype)
else:
weights = tf.Variable(self._weights, dtype = self._dtype)
########################################################################################################
# TRAIN THE SELF ORGANIZING MAP #
########################################################################################################
dataset = tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array_np, dtype = self._dtype))
for chunk in dataset.shuffle(dataset_array_np.shape[0]).batch(chunk_size):
self._train(weights, chunk, epoch)
################################################################################################################
self._weights = weights.numpy()
####################################################################################################################
# noinspection PyTypeChecker
def save(self, filename: str, file_format: str = 'fits', extra: typing.Dict[str, np.ndarray] = None) -> None:
"""Saves the trained neural network to a file.
Parameters
----------
filename : str
Filename.
file_format : str
File format (supported formats: (fits, hdf5), default: fits).
extra : dict
Dictionary of extra (name, array) entries.
"""
if extra is None:
extra = {}
################################################################################################################
# FITS FORMAT #
################################################################################################################
if file_format == 'fits':
from astropy.io import fits
hdu0 = fits.PrimaryHDU()
hdu1 = fits.ImageHDU(data = self.get_centroids())
hdu2 = fits.BinTableHDU.from_columns(fits.ColDefs([
fits.Column(name = 'quantization_errors', format = 'D', array = self._quantization_errors),
fits.Column(name = 'topographic_errors', format = 'D', array = self._topographic_errors),
]))
hdu0.header['lrate'] = self._learning_rate
hdu0.header['sigma'] = self._sigma
hdu2.header['epochs'] = self._epochs
fits.HDUList([hdu0, hdu1, hdu2] + [fits.ImageHDU(name = name, data = data) for name, data in extra.items()]).writeto(filename, overwrite = True)
################################################################################################################
# HDF5 FORMAT #
################################################################################################################
elif file_format == 'hdf5':
import h5py
with h5py.File(filename, 'w') as file:
file.attrs['lrate'] = self._learning_rate
file.attrs['sigma'] = self._sigma
file.attrs['epochs'] = self._epochs
file.create_dataset('weights', data = self.get_centroids())
file.create_dataset('quantization_errors', data = self._quantization_errors)
file.create_dataset('topographic_errors', data = self._topographic_errors)
for name, data in extra.items():
file.create_dataset(name, data = data)
################################################################################################################
else:
raise ValueError('invalid format `{}` (fits, hdf5)'.format(file_format))
####################################################################################################################
def load(self, filename: str, file_format: str = 'fits') -> None:
"""Loads the trained neural network from a file.
Parameters
----------
filename : str
Filename.
file_format : str
File format (supported formats: (fits, hdf5), default: fits).
"""
################################################################################################################
# FITS FORMAT #
################################################################################################################
if file_format == 'fits':
from astropy.io import fits
with fits.open(filename) as hdus:
self._m, self._n, self._dim = hdus[1].data.shape
self._learning_rate = hdus[0].header['lrate']
self._sigma = hdus[0].header['sigma']
self._epochs = hdus[2].header['epochs']
self._weights = hdus[1].data.reshape((self._m * self._n, self._dim)).astype(self._dtype)
self._quantization_errors = hdus[2].data['quantization_errors'].astype(self._dtype)
self._topographic_errors = hdus[2].data['topographic_errors'].astype(self._dtype)
################################################################################################################
# HDF5 FORMAT #
################################################################################################################
elif file_format == 'hdf5':
import h5py
with h5py.File(filename, 'r') as file:
self._m, self._n, self._dim = file['weights'].shape
self._learning_rate = file.attrs['lrate']
self._sigma = file.attrs['sigma']
self._epochs = file.attrs['epochs']
self._weights = np.array(file['weights']).reshape((self._m * self._n, self._dim)).astype(self._dtype)
self._quantization_errors = np.array(file['quantization_errors']).astype(self._dtype)
self._topographic_errors = np.array(file['topographic_errors']).astype(self._dtype)
################################################################################################################
else:
raise ValueError('invalid format `{}` (fits, hdf5)'.format(file_format))
################################################################################################################
self._rebuild_topography()
####################################################################################################################
def distance_map(self) -> np.ndarray:
"""Returns the distance map of the neural network weights."""
################################################################################################################
centroids = self.get_centroids()
################################################################################################################
result = np.full(shape = (centroids.shape[0], centroids.shape[1], 8), fill_value = np.nan, dtype = self._dtype)
ii = 2 * [[0, -1, -1, -1, 0, +1, +1, +1]]
jj = 2 * [[-1, -1, 0, +1, +1, +1, 0, -1]]
for x in range(centroids.shape[0]):
for y in range(centroids.shape[1]):
w_2 = centroids[x, y]
e = y % 2 == 0
for k, (i, j) in enumerate(zip(ii[e], jj[e])):
if 0 <= x + i < centroids.shape[0]\
and \
0 <= y + j < centroids.shape[1]:
diff_w_2_w_1 = w_2 - centroids[x + i, y + j]
result[x, y, k] = np.sqrt(np.dot(diff_w_2_w_1, diff_w_2_w_1.T))
result = np.nansum(result, axis = 2)
return result / result.max()
####################################################################################################################
def get_weights(self) -> np.ndarray:
"""Returns the neural network weights (shape = [m * n, dim])."""
return self._weights.reshape((self._m * self._n, self._dim))
####################################################################################################################
def get_centroids(self) -> np.ndarray:
"""Returns of the neural network weights (shape = [m, n, dim])."""
return self._weights.reshape((self._m, self._n, self._dim))
####################################################################################################################
def get_quantization_errors(self) -> np.ndarray:
"""Returns the quantization errors (one value per epoch). $$ c_1=\\mathrm{1^\\mathrm{st}\\,bmu}=\\underset{i}{\\mathrm{arg\\,min}_1}\\lVert x-w_i\\rVert $$ $$ \\boxed{e_Q=\\frac{1}{\\mathcal{D}}\\sum_{x\\in\\mathcal{D}}\\lVert x-w_{c_1}\\rVert^2} $$"""
return self._quantization_errors
####################################################################################################################
def get_topographic_errors(self) -> np.ndarray:
"""Returns the topographic errors (one value per epoch). $$ c_n=\\mathrm{n^\\mathrm{th}\\,bmu}=\\underset{i}{\\mathrm{arg\\,min}_n}\\lVert x-w_i\\rVert $$ $$ t(x)=\\left\\{\\begin{array}{ll}1&\\lVert c_1-c_2\\rVert>\\sqrt{2}\\\\0&\\mathrm{otherwise}\\end{array}\\right. $$ $$ \\boxed{e_t=\\frac{1}{\\mathcal{D}}\\sum_{x\\in\\mathcal{D}}t(x)} $$"""
return self._topographic_errors
####################################################################################################################
def winners(self, dataset_array: np.ndarray, chunk_size: int = None, locations: bool = False, show_progress_bar: bool = False) -> np.ndarray:
"""Returns a vector of best matching unit indices or locations for the given input.
Parameters
----------
dataset_array : numpy.ndarray
Dataset array.
chunk_size : int
Chunk size or **None** to disable batch (default: **None**).
locations : bool
Get winner locations instead of indices (default: **False**).
show_progress_bar : bool
Specifying whether a progress bar have to be shown (default: **False**).
"""
################################################################################################################
if chunk_size is None:
chunk_size = dataset_array.shape[0]
################################################################################################################
weights = tf.constant(self._weights, dtype = self._dtype)
################################################################################################################
i = 0
if locations:
result = np.empty((dataset_array.shape[0], 2), dtype = np.int64)
for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar):
result[(i + 0) * chunk_size: (i + 1) * chunk_size] = tf.gather(self._topography, self.__find_bmus(weights, chunk, 1)[0])
i += 1
else:
result = np.empty((dataset_array.shape[0], ), dtype = np.int64)
for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar):
result[(i + 0) * chunk_size: (i + 1) * chunk_size] = self.__find_bmus(weights, chunk, 1)[0]
i += 1
################################################################################################################
return result
####################################################################################################################
def activation_map(self, dataset: typing.Union[np.ndarray, typing.Callable], chunk_size: int = None, show_progress_bar: bool = False) -> np.ndarray:
"""Returns a matrix containing the number of times the neuron (i, j) have been winner for the given input.
Parameters
----------
dataset : typing.Union[numpy.ndarray, typing.Callable]
Dataset array or generator.
chunk_size : int
Chunk size or **None** to disable batch (default: **None**).
show_progress_bar : bool
Specifying whether a progress bar have to be shown (default: **False**).
.. NOTE::
A dataset generator is a **yield** generator returning numpy arrays.
"""
################################################################################################################
dataset_generator = SOM._dataset_to_generator(dataset)
weights = tf.constant(self._weights, dtype = self._dtype)
result = np.zeros(shape = (self._m * self._n), dtype = np.int64)
################################################################################################################
for dataset_part, dataset_array_np in enumerate(dataset_generator()):
############################################################################################################
if chunk_size is None:
chunk_size = dataset_array_np.shape[0]
############################################################################################################
for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array_np, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar):
for bmu_index in self.__find_bmus(weights, chunk, n = 1)[0]:
result[bmu_index] += 1
################################################################################################################
return result.reshape(self._m, self._n)
########################################################################################################################
Functions
def setup_tensorflow_for_cpus(num_threads: int = None) ‑> None
Setups Tensorflow 2 for CPU parallelization.
Arguments
num_threads
:int
- Number of threads or None to let the system choose (default: None).
Expand source code
def setup_tensorflow_for_cpus(num_threads: int = None) -> None: """Setups Tensorflow 2 for CPU parallelization. Arguments --------- num_threads : int Number of threads or **None** to let the system choose (default: **None**). """ #################################################################################################################### # noinspection PyBroadException try: ################################################################################################################ if num_threads is None: import multiprocessing num_threads = multiprocessing.cpu_count() ################################################################################################################ tf.config.threading.set_inter_op_parallelism_threads( num_threads ) tf.config.threading.set_intra_op_parallelism_threads( num_threads ) tf.config.set_soft_device_placement(True) ################################################################################################################ except: pass
def normalize(df: Any, dtype: type = numpy.float32) ‑> Any
Returns the normalized Pandas data frame.
Arguments
df
:pandas.DataFrame
- Pandas data frame.
dtype
:type
- Neural network data type (default: numpy.float32).
Expand source code
def normalize(df: typing.Any, dtype: type = np.float32) -> typing.Any: """Returns the normalized Pandas data frame. Arguments --------- df : pandas.DataFrame Pandas data frame. dtype : type Neural network data type (default: **numpy.float32**). """ result = df.copy() for i in df.columns: max_value = df[i].max() min_value = df[i].min() result[i] = (df[i] - min_value) / (max_value - min_value) return result.astype(dtype)
Classes
class BMUs (indices: numpy.ndarray, locations: numpy.ndarray)
Expand source code
class BMUs(object): __pdoc__ = False """Best Matching Units""" def __init__(self, indices: np.ndarray, locations: np.ndarray): #: Array of indices self.indices: np.ndarray = indices #: Array of locations self.locations: np.ndarray = locations
Instance variables
var indices
Array of indices
var locations
Array of locations
class SOM (m: int, n: int, dim: int, seed: float = None, dtype: type = numpy.float32, learning_rate: float = None, sigma: float = None, epochs: int = 100, decay_function=<function _asymptotic_decay>)
Tensorflow 2 implementation of the Self Organizing Maps (SOM).
Initializes a Self Organizing Maps.
A rule of thumb to set the size of the grid for a dimensionality reduction task is that it should contain 5\sqrt{N}neurons where N is the number of samples in the dataset to analyze.
Arguments
m
:int
- Number of neuron rows.
n
:int
- Number of neuron columns.
dim
:int
- Dimensionality of the input data.
seed
:int
- Seed of the random generators (default: None).
dtype
:type
- Neural network data type (default: np.float32).
learning_rate
:float
- Starting value of the learning rate (default: 0.3).
sigma
:float
- Starting value of the neighborhood radius (default: \mathrm{max}(m,n)/2).
epochs
:int
- Number of epochs to train for (default: 100).
decay_function
:function
- Function that reduces learning_rate and sigma at each iteration (default: 1/\left(1+2\frac{epoch}{epochs}\right)).
Expand source code
class SOM(object): """Tensorflow 2 implementation of the Self Organizing Maps (SOM).""" #################################################################################################################### def __init__(self, m: int, n: int, dim: int, seed: float = None, dtype: type = np.float32, learning_rate: float = None, sigma: float = None, epochs: int = 100, decay_function = _asymptotic_decay): """Initializes a Self Organizing Maps. A rule of thumb to set the size of the grid for a dimensionality reduction task is that it should contain \\( 5\\sqrt{N} \\) neurons where N is the number of samples in the dataset to analyze. Arguments --------- m : int Number of neuron rows. n : int Number of neuron columns. dim : int Dimensionality of the input data. seed : int Seed of the random generators (default: **None**). dtype : type Neural network data type (default: **np.float32**). learning_rate : float Starting value of the learning rate (default: 0.3). sigma : float Starting value of the neighborhood radius (default: \\( \\mathrm{max}(m,n)/2 \\)). epochs : int Number of epochs to train for (default: 100). decay_function : function Function that reduces learning_rate and sigma at each iteration (default: \\( 1/\\left(1+2\\frac{epoch}{epochs}\\right) \\)). """ ################################################################################################################ self._m = m self._n = n self._dim = dim self._seed = seed self._dtype = dtype self._decay_function = decay_function ################################################################################################################ self._rebuild_topography() ################################################################################################################ self._learning_rate = 0.3 if learning_rate is None else dtype(learning_rate) self._sigma = max(m, n) / 2.0 if sigma is None else dtype(sigma) ################################################################################################################ self._epochs = abs(int(epochs)) ################################################################################################################ self._two = tf.constant(2.00e+00, dtype = dtype) self._epsilon = tf.constant(1.00e-20, dtype = dtype) self._sqrt_two = tf.constant(1.42e+00, dtype = dtype) ################################################################################################################ self._weights = np.empty(shape = (self._m * self._n, self._dim), dtype = self._dtype) self._quantization_errors = np.empty(shape = (self._epochs, ), dtype = self._dtype) self._topographic_errors = np.empty(shape = (self._epochs, ), dtype = self._dtype) #################################################################################################################### def _rebuild_topography(self): self._topography = tf.constant(np.array(list(self._neuron_locations(self._m, self._n)))) #################################################################################################################### @staticmethod def _neuron_locations(m: int, n: int) -> typing.Iterator[typing.List[int]]: for i in range(m): for j in range(n): yield [i, j] #################################################################################################################### @staticmethod def _argsort_n(x: tf.Tensor, n: int) -> tf.Tensor: if n > 1: return tf.nn.top_k(tf.negative(x), k = n).indices else: return tf.expand_dims(tf.argmin(x, axis = 1), axis = 1) #################################################################################################################### @staticmethod def __find_bmus(weights: typing.Union[tf.Variable, tf.Tensor], input_vectors: tf.Tensor, n: int = 1) -> tf.Tensor: ################################################################################################################ # COMPUTE DISTANCE SQUARES # ################################################################################################################ distance_squares = tf.reduce_sum( tf.square( tf.subtract( tf.expand_dims(input_vectors, axis = 1), tf.expand_dims(weights, axis = 0) ) ), axis = 2 ) ################################################################################################################ # COMPUTE INDICES # ################################################################################################################ return tf.transpose(SOM._argsort_n(distance_squares, n)) #################################################################################################################### def _find_bmus(self, weights: typing.Union[tf.Variable, tf.Tensor], input_vectors: tf.Tensor, n: int = 1) -> typing.List[BMUs]: result = [] for bmu_indices in self.__find_bmus(weights, input_vectors, n): bmu_locations = tf.gather(self._topography, bmu_indices) result.append(BMUs(bmu_indices, bmu_locations)) return result #################################################################################################################### def _train(self, weights: tf.Variable, input_vectors: tf.Tensor, epoch: int) -> None: ################################################################################################################ # BEST MATCHING UNITS # ################################################################################################################ bmus = self._find_bmus(weights, input_vectors, n = 2) ################################################################################################################ # LEARNING OPERATOR # ################################################################################################################ decay_function = self._decay_function(epoch, self._epochs) current_learning_rate = tf.cast(self._learning_rate * decay_function, dtype = self._dtype) current_radius = tf.cast(self._sigma * decay_function, dtype = self._dtype) ################################################################################################################ bmu_distance_squares = tf.reduce_sum( tf.square( tf.subtract( tf.expand_dims(self._topography, axis = 0), tf.expand_dims(bmus[0].locations, axis = 1) ) ), axis = 2 ) ################################################################################################################ neighbourhood_func = tf.exp(tf.divide( tf.negative(tf.cast(bmu_distance_squares, self._dtype)), tf.multiply(self._two, tf.square(current_radius)) )) ################################################################################################################ learning_rate_op = tf.multiply(neighbourhood_func, current_learning_rate) ################################################################################################################ # WEIGHT(EPOCH + 1) # ################################################################################################################ numerator = tf.reduce_sum( tf.multiply( tf.expand_dims(learning_rate_op, axis = -1), tf.expand_dims(input_vectors, axis = +1) ), axis = 0 ) denominator = tf.expand_dims(tf.reduce_sum(learning_rate_op, axis = 0), axis = -1) + self._epsilon ################################################################################################################ weights.assign(tf.divide(numerator, denominator)) ################################################################################################################ # QUANTIZATION ERROR # ################################################################################################################ self._quantization_errors[epoch] = tf.reduce_mean( tf.norm( tf.subtract( input_vectors, tf.gather(weights, bmus[0].indices) ), axis = 1 ), axis = 0 ) ################################################################################################################ # TOPOGRAPHIC ERROR # ################################################################################################################ t = tf.greater( tf.norm( tf.subtract( tf.cast(bmus[1].locations, dtype = self._dtype), tf.cast(bmus[0].locations, dtype = self._dtype) ), axis = 1 ), self._sqrt_two ) self._topographic_errors[epoch] = tf.divide( tf.reduce_sum(tf.cast(t, dtype = self._dtype)), tf.cast(tf.shape(input_vectors)[0], dtype = self._dtype) ) #################################################################################################################### @staticmethod def _dataset_to_generator(dataset): return dataset if callable(dataset) else lambda: (dataset, ) #################################################################################################################### def train(self, dataset: typing.Union[np.ndarray, typing.Callable], chunk_size: int = None, reset_weights: bool = True, show_progress_bar: bool = True) -> None: """Trains the neural network. A batch formulation of updating weights is implemented: $$ \\mathrm{bmu}(x)=\\underset{i}{\\mathrm{arg\\,min}}\\lVert x-w_i\\rVert $$ $$ n_j=\\sum_{x\\in\\mathcal{D}}\\left\\{\\begin{array}{ll}1&\\mathrm{bmu}(x)=j\\\\0&\\mathrm{otherwise}\\end{array}\\right. $$ $$ \\Theta_{ji}(e)=\\alpha(e)\\cdot\\exp\\left(-\\frac{\\lVert j-i\\rVert}{2\\sigma^2(e)}\\right) $$ $$ \\boxed{w_{i\\,\\mathrm{new}}=\\frac{\\sum_{j=1}^{n}n_j\\Theta_{ji}(e)x_j}{\\sum_{j=1}^{n}n_j\\Theta_{ji}(e)}} $$ where, at epoch \\( e \\), \\( \\alpha(e)=\\alpha_0\\mathrm{decay\\,function}(e) \\) is the learning rate and \\( \\sigma(e)=\\sigma_0\\mathrm{decay\\,function}(e) \\) is the neighborhood radius. Parameters ---------- dataset : typing.Union[numpy.ndarray, typing.Callable] Training dataset array or generator. chunk_size : int Chunk size or **None** to disable batch (default: **None**). reset_weights : bool Specifying whether the weights have to be reset (default: **True**). show_progress_bar : bool Specifying whether a progress bar have to be shown (default: **True**). .. NOTE:: A dataset generator is a **yield** generator returning numpy arrays. .. NOTE:: Setting the **reset_weights** variable to **False** allows to update a pre-trained model (= progressive training). """ ################################################################################################################ weights = None ################################################################################################################ if reset_weights and self._seed is not None: np.random.seed(self._seed + 0x000000000000) tf.random.set_seed(self._seed + 0x000000000000) ################################################################################################################ dataset_generator = SOM._dataset_to_generator(dataset) ################################################################################################################ for epoch in tqdm.tqdm(range(self._epochs), disable = not show_progress_bar): for dataset_part_number, dataset_array_np in enumerate(dataset_generator()): ######################################################################################################## # DEFAULT CHUNK SIZE # ######################################################################################################## if chunk_size is None: chunk_size = dataset_array_np.shape[0] ######################################################################################################## # INITIALIZE WEIGHTS # ######################################################################################################## if epoch == 0 and dataset_part_number == 0: if reset_weights: rand_weights_np = np.empty(shape = (self._m * self._n, self._dim), dtype = self._dtype) l1 = rand_weights_np.shape[0] l2 = dataset_array_np.shape[0] for i in range(l1): j = np.random.randint(l2) rand_weights_np[i] = dataset_array_np[j] weights = tf.Variable(rand_weights_np, dtype = self._dtype) else: weights = tf.Variable(self._weights, dtype = self._dtype) ######################################################################################################## # TRAIN THE SELF ORGANIZING MAP # ######################################################################################################## dataset = tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array_np, dtype = self._dtype)) for chunk in dataset.shuffle(dataset_array_np.shape[0]).batch(chunk_size): self._train(weights, chunk, epoch) ################################################################################################################ self._weights = weights.numpy() #################################################################################################################### # noinspection PyTypeChecker def save(self, filename: str, file_format: str = 'fits', extra: typing.Dict[str, np.ndarray] = None) -> None: """Saves the trained neural network to a file. Parameters ---------- filename : str Filename. file_format : str File format (supported formats: (fits, hdf5), default: fits). extra : dict Dictionary of extra (name, array) entries. """ if extra is None: extra = {} ################################################################################################################ # FITS FORMAT # ################################################################################################################ if file_format == 'fits': from astropy.io import fits hdu0 = fits.PrimaryHDU() hdu1 = fits.ImageHDU(data = self.get_centroids()) hdu2 = fits.BinTableHDU.from_columns(fits.ColDefs([ fits.Column(name = 'quantization_errors', format = 'D', array = self._quantization_errors), fits.Column(name = 'topographic_errors', format = 'D', array = self._topographic_errors), ])) hdu0.header['lrate'] = self._learning_rate hdu0.header['sigma'] = self._sigma hdu2.header['epochs'] = self._epochs fits.HDUList([hdu0, hdu1, hdu2] + [fits.ImageHDU(name = name, data = data) for name, data in extra.items()]).writeto(filename, overwrite = True) ################################################################################################################ # HDF5 FORMAT # ################################################################################################################ elif file_format == 'hdf5': import h5py with h5py.File(filename, 'w') as file: file.attrs['lrate'] = self._learning_rate file.attrs['sigma'] = self._sigma file.attrs['epochs'] = self._epochs file.create_dataset('weights', data = self.get_centroids()) file.create_dataset('quantization_errors', data = self._quantization_errors) file.create_dataset('topographic_errors', data = self._topographic_errors) for name, data in extra.items(): file.create_dataset(name, data = data) ################################################################################################################ else: raise ValueError('invalid format `{}` (fits, hdf5)'.format(file_format)) #################################################################################################################### def load(self, filename: str, file_format: str = 'fits') -> None: """Loads the trained neural network from a file. Parameters ---------- filename : str Filename. file_format : str File format (supported formats: (fits, hdf5), default: fits). """ ################################################################################################################ # FITS FORMAT # ################################################################################################################ if file_format == 'fits': from astropy.io import fits with fits.open(filename) as hdus: self._m, self._n, self._dim = hdus[1].data.shape self._learning_rate = hdus[0].header['lrate'] self._sigma = hdus[0].header['sigma'] self._epochs = hdus[2].header['epochs'] self._weights = hdus[1].data.reshape((self._m * self._n, self._dim)).astype(self._dtype) self._quantization_errors = hdus[2].data['quantization_errors'].astype(self._dtype) self._topographic_errors = hdus[2].data['topographic_errors'].astype(self._dtype) ################################################################################################################ # HDF5 FORMAT # ################################################################################################################ elif file_format == 'hdf5': import h5py with h5py.File(filename, 'r') as file: self._m, self._n, self._dim = file['weights'].shape self._learning_rate = file.attrs['lrate'] self._sigma = file.attrs['sigma'] self._epochs = file.attrs['epochs'] self._weights = np.array(file['weights']).reshape((self._m * self._n, self._dim)).astype(self._dtype) self._quantization_errors = np.array(file['quantization_errors']).astype(self._dtype) self._topographic_errors = np.array(file['topographic_errors']).astype(self._dtype) ################################################################################################################ else: raise ValueError('invalid format `{}` (fits, hdf5)'.format(file_format)) ################################################################################################################ self._rebuild_topography() #################################################################################################################### def distance_map(self) -> np.ndarray: """Returns the distance map of the neural network weights.""" ################################################################################################################ centroids = self.get_centroids() ################################################################################################################ result = np.full(shape = (centroids.shape[0], centroids.shape[1], 8), fill_value = np.nan, dtype = self._dtype) ii = 2 * [[0, -1, -1, -1, 0, +1, +1, +1]] jj = 2 * [[-1, -1, 0, +1, +1, +1, 0, -1]] for x in range(centroids.shape[0]): for y in range(centroids.shape[1]): w_2 = centroids[x, y] e = y % 2 == 0 for k, (i, j) in enumerate(zip(ii[e], jj[e])): if 0 <= x + i < centroids.shape[0]\ and \ 0 <= y + j < centroids.shape[1]: diff_w_2_w_1 = w_2 - centroids[x + i, y + j] result[x, y, k] = np.sqrt(np.dot(diff_w_2_w_1, diff_w_2_w_1.T)) result = np.nansum(result, axis = 2) return result / result.max() #################################################################################################################### def get_weights(self) -> np.ndarray: """Returns the neural network weights (shape = [m * n, dim]).""" return self._weights.reshape((self._m * self._n, self._dim)) #################################################################################################################### def get_centroids(self) -> np.ndarray: """Returns of the neural network weights (shape = [m, n, dim]).""" return self._weights.reshape((self._m, self._n, self._dim)) #################################################################################################################### def get_quantization_errors(self) -> np.ndarray: """Returns the quantization errors (one value per epoch). $$ c_1=\\mathrm{1^\\mathrm{st}\\,bmu}=\\underset{i}{\\mathrm{arg\\,min}_1}\\lVert x-w_i\\rVert $$ $$ \\boxed{e_Q=\\frac{1}{\\mathcal{D}}\\sum_{x\\in\\mathcal{D}}\\lVert x-w_{c_1}\\rVert^2} $$""" return self._quantization_errors #################################################################################################################### def get_topographic_errors(self) -> np.ndarray: """Returns the topographic errors (one value per epoch). $$ c_n=\\mathrm{n^\\mathrm{th}\\,bmu}=\\underset{i}{\\mathrm{arg\\,min}_n}\\lVert x-w_i\\rVert $$ $$ t(x)=\\left\\{\\begin{array}{ll}1&\\lVert c_1-c_2\\rVert>\\sqrt{2}\\\\0&\\mathrm{otherwise}\\end{array}\\right. $$ $$ \\boxed{e_t=\\frac{1}{\\mathcal{D}}\\sum_{x\\in\\mathcal{D}}t(x)} $$""" return self._topographic_errors #################################################################################################################### def winners(self, dataset_array: np.ndarray, chunk_size: int = None, locations: bool = False, show_progress_bar: bool = False) -> np.ndarray: """Returns a vector of best matching unit indices or locations for the given input. Parameters ---------- dataset_array : numpy.ndarray Dataset array. chunk_size : int Chunk size or **None** to disable batch (default: **None**). locations : bool Get winner locations instead of indices (default: **False**). show_progress_bar : bool Specifying whether a progress bar have to be shown (default: **False**). """ ################################################################################################################ if chunk_size is None: chunk_size = dataset_array.shape[0] ################################################################################################################ weights = tf.constant(self._weights, dtype = self._dtype) ################################################################################################################ i = 0 if locations: result = np.empty((dataset_array.shape[0], 2), dtype = np.int64) for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar): result[(i + 0) * chunk_size: (i + 1) * chunk_size] = tf.gather(self._topography, self.__find_bmus(weights, chunk, 1)[0]) i += 1 else: result = np.empty((dataset_array.shape[0], ), dtype = np.int64) for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar): result[(i + 0) * chunk_size: (i + 1) * chunk_size] = self.__find_bmus(weights, chunk, 1)[0] i += 1 ################################################################################################################ return result #################################################################################################################### def activation_map(self, dataset: typing.Union[np.ndarray, typing.Callable], chunk_size: int = None, show_progress_bar: bool = False) -> np.ndarray: """Returns a matrix containing the number of times the neuron (i, j) have been winner for the given input. Parameters ---------- dataset : typing.Union[numpy.ndarray, typing.Callable] Dataset array or generator. chunk_size : int Chunk size or **None** to disable batch (default: **None**). show_progress_bar : bool Specifying whether a progress bar have to be shown (default: **False**). .. NOTE:: A dataset generator is a **yield** generator returning numpy arrays. """ ################################################################################################################ dataset_generator = SOM._dataset_to_generator(dataset) weights = tf.constant(self._weights, dtype = self._dtype) result = np.zeros(shape = (self._m * self._n), dtype = np.int64) ################################################################################################################ for dataset_part, dataset_array_np in enumerate(dataset_generator()): ############################################################################################################ if chunk_size is None: chunk_size = dataset_array_np.shape[0] ############################################################################################################ for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array_np, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar): for bmu_index in self.__find_bmus(weights, chunk, n = 1)[0]: result[bmu_index] += 1 ################################################################################################################ return result.reshape(self._m, self._n)
Methods
def train(self, dataset: Union[numpy.ndarray, Callable], chunk_size: int = None, reset_weights: bool = True, show_progress_bar: bool = True) ‑> None
Trains the neural network. A batch formulation of updating weights is implemented: \mathrm{bmu}(x)=\underset{i}{\mathrm{arg\,min}}\lVert x-w_i\rVertn_j=\sum_{x\in\mathcal{D}}\left\{\begin{array}{ll}1&\mathrm{bmu}(x)=j\\0&\mathrm{otherwise}\end{array}\right.\Theta_{ji}(e)=\alpha(e)\cdot\exp\left(-\frac{\lVert j-i\rVert}{2\sigma^2(e)}\right)\boxed{w_{i\,\mathrm{new}}=\frac{\sum_{j=1}^{n}n_j\Theta_{ji}(e)x_j}{\sum_{j=1}^{n}n_j\Theta_{ji}(e)}}where, at epoch e, \alpha(e)=\alpha_0\mathrm{decay\,function}(e)is the learning rate and \sigma(e)=\sigma_0\mathrm{decay\,function}(e)is the neighborhood radius.
Parameters
dataset
:typing.Union[numpy.ndarray, typing.Callable]
- Training dataset array or generator.
chunk_size
:int
- Chunk size or None to disable batch (default: None).
reset_weights
:bool
- Specifying whether the weights have to be reset (default: True).
show_progress_bar
:bool
- Specifying whether a progress bar have to be shown (default: True).
Note
A dataset generator is a yield generator returning numpy arrays.
Note
Setting the reset_weights variable to False allows to update a pre-trained model (= progressive training).
Expand source code
def train(self, dataset: typing.Union[np.ndarray, typing.Callable], chunk_size: int = None, reset_weights: bool = True, show_progress_bar: bool = True) -> None: """Trains the neural network. A batch formulation of updating weights is implemented: $$ \\mathrm{bmu}(x)=\\underset{i}{\\mathrm{arg\\,min}}\\lVert x-w_i\\rVert $$ $$ n_j=\\sum_{x\\in\\mathcal{D}}\\left\\{\\begin{array}{ll}1&\\mathrm{bmu}(x)=j\\\\0&\\mathrm{otherwise}\\end{array}\\right. $$ $$ \\Theta_{ji}(e)=\\alpha(e)\\cdot\\exp\\left(-\\frac{\\lVert j-i\\rVert}{2\\sigma^2(e)}\\right) $$ $$ \\boxed{w_{i\\,\\mathrm{new}}=\\frac{\\sum_{j=1}^{n}n_j\\Theta_{ji}(e)x_j}{\\sum_{j=1}^{n}n_j\\Theta_{ji}(e)}} $$ where, at epoch \\( e \\), \\( \\alpha(e)=\\alpha_0\\mathrm{decay\\,function}(e) \\) is the learning rate and \\( \\sigma(e)=\\sigma_0\\mathrm{decay\\,function}(e) \\) is the neighborhood radius. Parameters ---------- dataset : typing.Union[numpy.ndarray, typing.Callable] Training dataset array or generator. chunk_size : int Chunk size or **None** to disable batch (default: **None**). reset_weights : bool Specifying whether the weights have to be reset (default: **True**). show_progress_bar : bool Specifying whether a progress bar have to be shown (default: **True**). .. NOTE:: A dataset generator is a **yield** generator returning numpy arrays. .. NOTE:: Setting the **reset_weights** variable to **False** allows to update a pre-trained model (= progressive training). """ ################################################################################################################ weights = None ################################################################################################################ if reset_weights and self._seed is not None: np.random.seed(self._seed + 0x000000000000) tf.random.set_seed(self._seed + 0x000000000000) ################################################################################################################ dataset_generator = SOM._dataset_to_generator(dataset) ################################################################################################################ for epoch in tqdm.tqdm(range(self._epochs), disable = not show_progress_bar): for dataset_part_number, dataset_array_np in enumerate(dataset_generator()): ######################################################################################################## # DEFAULT CHUNK SIZE # ######################################################################################################## if chunk_size is None: chunk_size = dataset_array_np.shape[0] ######################################################################################################## # INITIALIZE WEIGHTS # ######################################################################################################## if epoch == 0 and dataset_part_number == 0: if reset_weights: rand_weights_np = np.empty(shape = (self._m * self._n, self._dim), dtype = self._dtype) l1 = rand_weights_np.shape[0] l2 = dataset_array_np.shape[0] for i in range(l1): j = np.random.randint(l2) rand_weights_np[i] = dataset_array_np[j] weights = tf.Variable(rand_weights_np, dtype = self._dtype) else: weights = tf.Variable(self._weights, dtype = self._dtype) ######################################################################################################## # TRAIN THE SELF ORGANIZING MAP # ######################################################################################################## dataset = tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array_np, dtype = self._dtype)) for chunk in dataset.shuffle(dataset_array_np.shape[0]).batch(chunk_size): self._train(weights, chunk, epoch) ################################################################################################################ self._weights = weights.numpy()
def save(self, filename: str, file_format: str = 'fits', extra: Dict[str, numpy.ndarray] = None) ‑> None
Saves the trained neural network to a file.
Parameters
filename
:str
- Filename.
file_format
:str
- File format (supported formats: (fits, hdf5), default: fits).
extra
:dict
- Dictionary of extra (name, array) entries.
Expand source code
def save(self, filename: str, file_format: str = 'fits', extra: typing.Dict[str, np.ndarray] = None) -> None: """Saves the trained neural network to a file. Parameters ---------- filename : str Filename. file_format : str File format (supported formats: (fits, hdf5), default: fits). extra : dict Dictionary of extra (name, array) entries. """ if extra is None: extra = {} ################################################################################################################ # FITS FORMAT # ################################################################################################################ if file_format == 'fits': from astropy.io import fits hdu0 = fits.PrimaryHDU() hdu1 = fits.ImageHDU(data = self.get_centroids()) hdu2 = fits.BinTableHDU.from_columns(fits.ColDefs([ fits.Column(name = 'quantization_errors', format = 'D', array = self._quantization_errors), fits.Column(name = 'topographic_errors', format = 'D', array = self._topographic_errors), ])) hdu0.header['lrate'] = self._learning_rate hdu0.header['sigma'] = self._sigma hdu2.header['epochs'] = self._epochs fits.HDUList([hdu0, hdu1, hdu2] + [fits.ImageHDU(name = name, data = data) for name, data in extra.items()]).writeto(filename, overwrite = True) ################################################################################################################ # HDF5 FORMAT # ################################################################################################################ elif file_format == 'hdf5': import h5py with h5py.File(filename, 'w') as file: file.attrs['lrate'] = self._learning_rate file.attrs['sigma'] = self._sigma file.attrs['epochs'] = self._epochs file.create_dataset('weights', data = self.get_centroids()) file.create_dataset('quantization_errors', data = self._quantization_errors) file.create_dataset('topographic_errors', data = self._topographic_errors) for name, data in extra.items(): file.create_dataset(name, data = data) ################################################################################################################ else: raise ValueError('invalid format `{}` (fits, hdf5)'.format(file_format))
def load(self, filename: str, file_format: str = 'fits') ‑> None
Loads the trained neural network from a file.
Parameters
filename
:str
- Filename.
file_format
:str
- File format (supported formats: (fits, hdf5), default: fits).
Expand source code
def load(self, filename: str, file_format: str = 'fits') -> None: """Loads the trained neural network from a file. Parameters ---------- filename : str Filename. file_format : str File format (supported formats: (fits, hdf5), default: fits). """ ################################################################################################################ # FITS FORMAT # ################################################################################################################ if file_format == 'fits': from astropy.io import fits with fits.open(filename) as hdus: self._m, self._n, self._dim = hdus[1].data.shape self._learning_rate = hdus[0].header['lrate'] self._sigma = hdus[0].header['sigma'] self._epochs = hdus[2].header['epochs'] self._weights = hdus[1].data.reshape((self._m * self._n, self._dim)).astype(self._dtype) self._quantization_errors = hdus[2].data['quantization_errors'].astype(self._dtype) self._topographic_errors = hdus[2].data['topographic_errors'].astype(self._dtype) ################################################################################################################ # HDF5 FORMAT # ################################################################################################################ elif file_format == 'hdf5': import h5py with h5py.File(filename, 'r') as file: self._m, self._n, self._dim = file['weights'].shape self._learning_rate = file.attrs['lrate'] self._sigma = file.attrs['sigma'] self._epochs = file.attrs['epochs'] self._weights = np.array(file['weights']).reshape((self._m * self._n, self._dim)).astype(self._dtype) self._quantization_errors = np.array(file['quantization_errors']).astype(self._dtype) self._topographic_errors = np.array(file['topographic_errors']).astype(self._dtype) ################################################################################################################ else: raise ValueError('invalid format `{}` (fits, hdf5)'.format(file_format)) ################################################################################################################ self._rebuild_topography()
def distance_map(self) ‑> numpy.ndarray
Returns the distance map of the neural network weights.
Expand source code
def distance_map(self) -> np.ndarray: """Returns the distance map of the neural network weights.""" ################################################################################################################ centroids = self.get_centroids() ################################################################################################################ result = np.full(shape = (centroids.shape[0], centroids.shape[1], 8), fill_value = np.nan, dtype = self._dtype) ii = 2 * [[0, -1, -1, -1, 0, +1, +1, +1]] jj = 2 * [[-1, -1, 0, +1, +1, +1, 0, -1]] for x in range(centroids.shape[0]): for y in range(centroids.shape[1]): w_2 = centroids[x, y] e = y % 2 == 0 for k, (i, j) in enumerate(zip(ii[e], jj[e])): if 0 <= x + i < centroids.shape[0]\ and \ 0 <= y + j < centroids.shape[1]: diff_w_2_w_1 = w_2 - centroids[x + i, y + j] result[x, y, k] = np.sqrt(np.dot(diff_w_2_w_1, diff_w_2_w_1.T)) result = np.nansum(result, axis = 2) return result / result.max()
def get_weights(self) ‑> numpy.ndarray
Returns the neural network weights (shape = [m * n, dim]).
Expand source code
def get_weights(self) -> np.ndarray: """Returns the neural network weights (shape = [m * n, dim]).""" return self._weights.reshape((self._m * self._n, self._dim))
def get_centroids(self) ‑> numpy.ndarray
Returns of the neural network weights (shape = [m, n, dim]).
Expand source code
def get_centroids(self) -> np.ndarray: """Returns of the neural network weights (shape = [m, n, dim]).""" return self._weights.reshape((self._m, self._n, self._dim))
def get_quantization_errors(self) ‑> numpy.ndarray
Returns the quantization errors (one value per epoch). c_1=\mathrm{1^\mathrm{st}\,bmu}=\underset{i}{\mathrm{arg\,min}_1}\lVert x-w_i\rVert\boxed{e_Q=\frac{1}{\mathcal{D}}\sum_{x\in\mathcal{D}}\lVert x-w_{c_1}\rVert^2}
Expand source code
def get_quantization_errors(self) -> np.ndarray: """Returns the quantization errors (one value per epoch). $$ c_1=\\mathrm{1^\\mathrm{st}\\,bmu}=\\underset{i}{\\mathrm{arg\\,min}_1}\\lVert x-w_i\\rVert $$ $$ \\boxed{e_Q=\\frac{1}{\\mathcal{D}}\\sum_{x\\in\\mathcal{D}}\\lVert x-w_{c_1}\\rVert^2} $$""" return self._quantization_errors
def get_topographic_errors(self) ‑> numpy.ndarray
Returns the topographic errors (one value per epoch). c_n=\mathrm{n^\mathrm{th}\,bmu}=\underset{i}{\mathrm{arg\,min}_n}\lVert x-w_i\rVertt(x)=\left\{\begin{array}{ll}1&\lVert c_1-c_2\rVert>\sqrt{2}\\0&\mathrm{otherwise}\end{array}\right.\boxed{e_t=\frac{1}{\mathcal{D}}\sum_{x\in\mathcal{D}}t(x)}
Expand source code
def get_topographic_errors(self) -> np.ndarray: """Returns the topographic errors (one value per epoch). $$ c_n=\\mathrm{n^\\mathrm{th}\\,bmu}=\\underset{i}{\\mathrm{arg\\,min}_n}\\lVert x-w_i\\rVert $$ $$ t(x)=\\left\\{\\begin{array}{ll}1&\\lVert c_1-c_2\\rVert>\\sqrt{2}\\\\0&\\mathrm{otherwise}\\end{array}\\right. $$ $$ \\boxed{e_t=\\frac{1}{\\mathcal{D}}\\sum_{x\\in\\mathcal{D}}t(x)} $$""" return self._topographic_errors
def winners(self, dataset_array: numpy.ndarray, chunk_size: int = None, locations: bool = False, show_progress_bar: bool = False) ‑> numpy.ndarray
Returns a vector of best matching unit indices or locations for the given input.
Parameters
dataset_array
:numpy.ndarray
- Dataset array.
chunk_size
:int
- Chunk size or None to disable batch (default: None).
locations
:bool
- Get winner locations instead of indices (default: False).
show_progress_bar
:bool
- Specifying whether a progress bar have to be shown (default: False).
Expand source code
def winners(self, dataset_array: np.ndarray, chunk_size: int = None, locations: bool = False, show_progress_bar: bool = False) -> np.ndarray: """Returns a vector of best matching unit indices or locations for the given input. Parameters ---------- dataset_array : numpy.ndarray Dataset array. chunk_size : int Chunk size or **None** to disable batch (default: **None**). locations : bool Get winner locations instead of indices (default: **False**). show_progress_bar : bool Specifying whether a progress bar have to be shown (default: **False**). """ ################################################################################################################ if chunk_size is None: chunk_size = dataset_array.shape[0] ################################################################################################################ weights = tf.constant(self._weights, dtype = self._dtype) ################################################################################################################ i = 0 if locations: result = np.empty((dataset_array.shape[0], 2), dtype = np.int64) for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar): result[(i + 0) * chunk_size: (i + 1) * chunk_size] = tf.gather(self._topography, self.__find_bmus(weights, chunk, 1)[0]) i += 1 else: result = np.empty((dataset_array.shape[0], ), dtype = np.int64) for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar): result[(i + 0) * chunk_size: (i + 1) * chunk_size] = self.__find_bmus(weights, chunk, 1)[0] i += 1 ################################################################################################################ return result
def activation_map(self, dataset: Union[numpy.ndarray, Callable], chunk_size: int = None, show_progress_bar: bool = False) ‑> numpy.ndarray
Returns a matrix containing the number of times the neuron (i, j) have been winner for the given input.
Parameters
dataset
:typing.Union[numpy.ndarray, typing.Callable]
- Dataset array or generator.
chunk_size
:int
- Chunk size or None to disable batch (default: None).
show_progress_bar
:bool
- Specifying whether a progress bar have to be shown (default: False).
Note
A dataset generator is a yield generator returning numpy arrays.
Expand source code
def activation_map(self, dataset: typing.Union[np.ndarray, typing.Callable], chunk_size: int = None, show_progress_bar: bool = False) -> np.ndarray: """Returns a matrix containing the number of times the neuron (i, j) have been winner for the given input. Parameters ---------- dataset : typing.Union[numpy.ndarray, typing.Callable] Dataset array or generator. chunk_size : int Chunk size or **None** to disable batch (default: **None**). show_progress_bar : bool Specifying whether a progress bar have to be shown (default: **False**). .. NOTE:: A dataset generator is a **yield** generator returning numpy arrays. """ ################################################################################################################ dataset_generator = SOM._dataset_to_generator(dataset) weights = tf.constant(self._weights, dtype = self._dtype) result = np.zeros(shape = (self._m * self._n), dtype = np.int64) ################################################################################################################ for dataset_part, dataset_array_np in enumerate(dataset_generator()): ############################################################################################################ if chunk_size is None: chunk_size = dataset_array_np.shape[0] ############################################################################################################ for chunk in tqdm.tqdm(tf.data.Dataset.from_tensor_slices(tf.constant(dataset_array_np, dtype = self._dtype)).batch(chunk_size), disable = not show_progress_bar): for bmu_index in self.__find_bmus(weights, chunk, n = 1)[0]: result[bmu_index] += 1 ################################################################################################################ return result.reshape(self._m, self._n)