AutoRegressive flows and RealNVP
In this post, we are going to take a look at Autoregressive flows and RealNVP. This is the summary of lecture "Probabilistic Deep Learning with Tensorflow 2" from Imperial College London.
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import matplotlib.pyplot as plt
tfd = tfp.distributions
tfpl = tfp.layers
tfb = tfp.bijectors
plt.rcParams['figure.figsize'] = (10, 6)
print("Tensorflow Version: ", tf.__version__)
print("Tensorflow Probability Version: ", tfp.__version__)
Masked Autoregressive Flows
If the flow has autogressive property, then its log det jacobian calculation may be easy since log det jacobian matrix is lower triangular. Tensorflow bijector has AutogressiveNetwork class for this usage. Actually, it is not a bijector, though. Example case is an implementation of MADE architecture, which stands for Masked Autoencoder for Distribution Estimation. If you have interests about this, take a look at this paper, published in ICML 2015.
# x[i] = z[i] * scale(x[0:i-1]) + loc(x[0:i-1]), i=0,...,D-1
# event_shape : input_shape
# hidden_units : the number of nodes in each hidden layers. In this case, it has two hidden layers with 16 nodes.
# params: the extra dimension for output shape
made = tfb.AutoregressiveNetwork(
params=2, event_shape=[3], hidden_units=[16, 16], activation='sigmoid'
)
made(tf.random.normal([2, 3]))
In this case, input has 2x3 shape, and output has 2 batched 2x3 array, that is (2, 3, 2). Note that the right most element (2) is from params
argument. Using this, we can define masked autoregressive flow bijector.
maf_bijector = tfb.MaskedAutoregressiveFlow(shift_and_log_scale_fn=made)
Then we can write the pseudocode for forward transformation like this,
def forward(z):
x = tf.zeros_like(z)
for _ in range(D):
shift, log_scale = shift_and_log_scale_fn(x)
x = z * tf.math.exp(log_scale) + shift
return x
When the random variable z
is given, shift and scale factor is calculated for each feature. Afterthat, all features in x will be updated correctly.
def inverse(x):
shift, log_scale = shift_and_log_scale_fn(x)
return (x - shift) / tf.math.exp(log_scale)
The inverse operation is much simpler.
normal = tfd.Normal(loc=0, scale=1)
maf = tfd.TransformedDistribution(tfd.Sample(normal, sample_shape=[3]), maf_bijector)
maf
Or we can implement it like this,
maf_bijector = tfb.MaskedAutoregressiveFlow(
lambda y: (made(y)[..., 0], None),
is_constant_jacobian=True
)
maf = tfd.TransformedDistribution(tfd.Sample(normal, sample_shape=[3]), maf_bijector)
maf
In this case, masked autoregressive bijector has no scale. It means that the jacobian of this transformation will constant and that is identity. So we defined is_constant_jacobian
argument to True
for the efficiency. For the reference, if you want to track whole variables of MADE network in tensorflow. we need to reference the model variable.
maf_bijector._made = made
iaf_bijector = tfb.Invert(tfb.MaskedAutoregressiveFlow(shift_and_log_scale_fn=made))
iaf = tfd.TransformedDistribution(tfd.Sample(normal, sample_shape=[3]), iaf_bijector)
iaf
# x[0: d] = z[0: d]
# x[d: D] = z[d: D] * scale(z[0: d]) + loc(z[0: d])
shift_and_log_scale_fn = tfb.real_nvp_default_template(
hidden_layers=[32, 32], activation='relu'
)
shift_and_log_scale_fn(tf.random.normal([2]), 1)
We can use only shifted flow by using shift_only
keyword. In this case, scale parameter is fixed to 1.
shift_and_log_scale_fn = tfb.real_nvp_default_template(hidden_layers=[32, 32], activation='relu', shift_only=True)
shift_and_log_scale_fn(tf.random.normal([2]), 1)
This kind of model is called NICE, which stands for Nonlinear Independent Components Estimation. This shift-only version has the property of jacobian matrix to identity. In this case, bijective transformation is volume preserving. Actually, RealNVP is acronym for real-valued Non-Volume Preserving.
realnvp_bijector = tfb.RealNVP(
num_masked=2, shift_and_log_scale_fn=shift_and_log_scale_fn
)
def forward(z):
x = tf.zeros_like(z)
x[0: d] = z[0: d]
shift, log_scale = shift_and_log_scale_fn(z[0: d])
x[d: D] = z[d: D] * tf.math.exp(log_scale) + shift
return x
def inverse(x):
z = tf.zeros_like(x)
z[0: d] = x[0: d]
shift, log_scale = shift_and_log_scale_fn(x[0: d])
z[d: D] = (x[d: D] - shift) * tf.math.exp(-log_scale)
return z
mvn = tfd.MultivariateNormalDiag(loc=[0., 0., 0.])
realnvp = tfd.TransformedDistribution(distribution=mvn, bijector=realnvp_bijector)
We can define the fraction of masked data by using fraction_masked
keyword, instead of using num_masked
.
realnvp_bijector = tfb.RealNVP(
fraction_masked=0.5, shift_and_log_scale_fn=shift_and_log_scale_fn
)
permute = tfb.Permute(permutation=[1, 2, 0])
realnvp1 = tfb.RealNVP(fraction_masked=0.5, shift_and_log_scale_fn=tfb.real_nvp_default_template(hidden_layers=[32, 32]))
realnvp2 = tfb.RealNVP(fraction_masked=0.5, shift_and_log_scale_fn=tfb.real_nvp_default_template(hidden_layers=[32, 32]))
realnvp3 = tfb.RealNVP(fraction_masked=0.5, shift_and_log_scale_fn=tfb.real_nvp_default_template(hidden_layers=[32, 32]))
chained_bijector = tfb.Chain([realnvp3, permute, realnvp2, permute, realnvp1])
realnvp = tfd.TransformedDistribution(distribution=mvn, bijector=chained_bijector)
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
n_samples = 1000
noisy_moons = datasets.make_moons(n_samples=n_samples, noise=0.05)
X, y = noisy_moons
X_data = StandardScaler().fit_transform(X)
xlim, ylim = [-2, 2], [-2, 2]
y_label = y.astype(np.bool_)
X_train, y_train = X_data[..., 0], X_data[..., 1]
plt.scatter(X_train[y_label], y_train[y_label], s=10, color='blue')
plt.scatter(X_train[~y_label], y_train[~y_label], s=10, color='red')
plt.legend(['label: 1', 'label: 0'])
plt.xlim(xlim)
plt.ylim(ylim)
plt.show()
base_dist = tfd.Normal(loc=0, scale=1)
def make_masked_autoregressive_flow(hidden_units=[16, 16], activation='relu'):
made = tfb.AutoregressiveNetwork(params=2, event_shape=[2], hidden_units=hidden_units, activation=activation)
return tfb.MaskedAutoregressiveFlow(shift_and_log_scale_fn=made)
trainable_dist = tfd.TransformedDistribution(distribution=tfd.Sample(base_dist, sample_shape=[2]), bijector=make_masked_autoregressive_flow())
trainable_dist
from mpl_toolkits.axes_grid1 import make_axes_locatable
from tensorflow.compat.v1 import logging
logging.set_verbosity(logging.ERROR)
def plot_contour_prob(dist, rows=1, title=[''], scale_fig=4):
cols = int(len(dist) / rows)
xx = np.linspace(-5.0, 5.0, 100)
yy = np.linspace(-5.0, 5.0, 100)
X, Y = np.meshgrid(xx, yy)
fig, ax = plt.subplots(rows, cols, figsize=(scale_fig * cols, scale_fig * rows))
fig.tight_layout(pad=4.5)
i = 0
for r in range(rows):
for c in range(cols):
Z = dist[i].prob(np.dstack((X, Y)))
if len(dist) == 1:
axi = ax
elif rows == 1:
axi = ax[c]
else:
axi = ax[r, c]
# Plot contour
p = axi.contourf(X, Y, Z)
# Add a colorbar
divider = make_axes_locatable(axi)
cax = divider.append_axes("right", size="5%", pad=0.1)
cbar = fig.colorbar(p, cax=cax)
# Set title and labels
axi.set_title('Filled Contours Plot: ' + str(title[i]))
axi.set_xlabel('x')
axi.set_ylabel('y')
i += 1
plt.show()
activation='relu'
maf = tfd.TransformedDistribution(tfd.Sample(base_dist, sample_shape=[2]), make_masked_autoregressive_flow(activation=activation))
plot_contour_prob([maf], scale_fig=6, title=[activation])
activation='sigmoid'
maf = tfd.TransformedDistribution(tfd.Sample(base_dist, sample_shape=[2]), make_masked_autoregressive_flow(activation=activation))
plot_contour_prob([maf], scale_fig=6, title=[activation])
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
x = base_dist.sample((1000, 2))
names = [base_dist.name, trainable_dist.bijector.name]
samples = [x, trainable_dist.bijector.forward(x)]
def _plot(results, rows=1, legend=False):
cols = int(len(results) / rows)
f, arr = plt.subplots(rows, cols, figsize=(4 * cols, 4 * rows))
i = 0
for r in range(rows):
for c in range(cols):
res = results[i]
X, Y = res[..., 0].numpy(), res[..., 1].numpy()
if rows == 1:
p = arr[c]
else:
p = arr[r, c]
p.scatter(X, Y, s=10, color='red')
p.set_xlim([-5, 5])
p.set_ylim([-5, 5])
p.set_title(names[i])
i += 1
plt.show()
_plot(samples)
from tensorflow.keras.callbacks import LambdaCallback
def train_dist_routine(trainable_distribution, n_epochs=200, batch_size=None, n_disp=100):
x_ = Input(shape=(2,), dtype=tf.float32)
log_prob_ = trainable_distribution.log_prob(x_)
model = Model(x_, log_prob_)
model.compile(optimizer=tf.optimizers.Adam(),
loss=lambda _, log_prob: -log_prob)
ns = X_data.shape[0]
if batch_size is None:
batch_size = ns
# Display the loss every n_disp epoch
epoch_callback = LambdaCallback(
on_epoch_end=lambda epoch, logs:
print('\n Epoch {}/{}'.format(epoch+1, n_epochs, logs),
'\n\t ' + (': {:.4f}, '.join(logs.keys()) + ': {:.4f}').format(*logs.values()))
if epoch % n_disp == 0 else False
)
history = model.fit(x=X_data,
y=np.zeros((ns, 0), dtype=np.float32),
batch_size=batch_size,
epochs=n_epochs,
validation_split=0.2,
shuffle=True,
verbose=False,
callbacks=[epoch_callback])
return history
history = train_dist_routine(trainable_dist, n_epochs=600, n_disp=50)
train_losses = history.history['loss']
val_losses = history.history['val_loss']
plt.plot(train_losses, label='train')
plt.plot(val_losses, label='valid')
plt.legend(loc='best')
plt.xlabel('Epochs')
plt.ylabel('Negative log likelihood')
plt.title('Training and validation loss curves')
plt.show()
x = base_dist.sample((1000, 2))
names = [base_dist.name, trainable_dist.bijector.name]
samples = [x, trainable_dist.bijector.forward(x)]
_plot(samples)
def visualize_training_data(samples):
f, arr = plt.subplots(1, 2, figsize=(15, 6))
names = ['Data', 'Trainable']
samples = [tf.constant(X_data), samples[-1]]
for i in range(2):
res = samples[i]
X, Y = res[..., 0].numpy(), res[..., 1].numpy()
arr[i].scatter(X, Y, s=10, color='red')
arr[i].set_xlim([-2, 2])
arr[i].set_ylim([-2, 2])
arr[i].set_title(names[i])
visualize_training_data(samples)
plot_contour_prob([trainable_dist], scale_fig=6)
num_bijectors = 6
bijectors = []
for i in range(num_bijectors):
masked_auto_i = make_masked_autoregressive_flow(hidden_units=[256, 256], activation='relu')
bijectors.append(masked_auto_i)
bijectors.append(tfb.Permute(permutation=[1, 0]))
flow_bijector = tfb.Chain(list(reversed(bijectors[:-1])))
trainable_dist = tfd.TransformedDistribution(distribution=tfd.Sample(base_dist, sample_shape=[2]), bijector=flow_bijector)
trainable_dist
def make_samples():
x = base_dist.sample((1000, 2))
samples = [x]
names = [base_dist.name]
for bijector in reversed(trainable_dist.bijector.bijectors):
x = bijector.forward(x)
samples.append(x)
names.append(bijector.name)
return names, samples
names, samples = make_samples()
_plot(samples, 3)
visualize_training_data(samples)
history = train_dist_routine(trainable_dist, n_epochs=600, n_disp=50)
train_losses = history.history['loss']
val_losses = history.history['val_loss']
plt.plot(train_losses, label='train')
plt.plot(val_losses, label='valid')
plt.legend(loc='best')
plt.xlabel('Epochs')
plt.ylabel('Negative log likelihood')
plt.title('Training and validation loss curves')
plt.show()
names, samples = make_samples()
_plot(samples, 3)
visualize_training_data(samples)
plot_contour_prob([trainable_dist], scale_fig=6)