Practice with the Tensorflow 2 Functional API.
In this post, it will demonstrate how to build models with the Functional syntax. You'll build one using the Sequential API and see how you can do the same with the Functional API. Both will arrive at the same architecture and you can train and evaluate it as usual. This is the summary of lecture "Custom Models, Layers and Loss functions with Tensorflow" from DeepLearning.AI.
- Packages
- Part 1 - Comparing Functional API with Sequential API
- Part 2 - Build a Multi-output Model
- Part 3 - Implement a Siamese Network
- Application - Multiple Output Models using the Keras Functional API
import tensorflow as tf
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense, Dropout, Lambda
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from PIL import Image, ImageFont, ImageDraw
import itertools
import random
mnist = tf.keras.datasets.mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train, X_test = X_train / 255.0, X_test / 255.0
def build_model_with_sequential():
# instantiate a Sequential class and linearly stack the layers of your model
seq_model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation=tf.nn.relu),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
return seq_model
def build_model_wtih_functional():
# instantiate the input Tensor
input_layer = tf.keras.Input(shape=(28, 28))
# stack the layers using the syntax: new_layer()(previous_layer)
flatten_layer = tf.keras.layers.Flatten()(input_layer)
first_dense = tf.keras.layers.Dense(128, activation=tf.nn.relu)(flatten_layer)
output_layer = tf.keras.layers.Dense(10, activation=tf.nn.softmax)(first_dense)
# declare inputs and outputs
func_model = Model(inputs=input_layer, outputs=output_layer)
return func_model
model = build_model_with_sequential()
plot_model(model, show_shapes=True, show_layer_names=True, to_file='./image/sequential_model.png')
model = build_model_wtih_functional()
plot_model(model, show_shapes=True, show_layer_names=True, to_file='./image/functional_model.png')
You can see that both model has same architecture.
model.compile(optimizer=tf.optimizers.Adam(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.fit(X_train, y_train, epochs=5)
model.evaluate(X_test, y_test)
Part 2 - Build a Multi-output Model
In this section, we'll show how you can build models with more than one output. The dataset we will be working on is available from the UCI Machine Learning Repository. It is an Energy Efficiency dataset which uses the bulding features (e.g. wall area, roof area) as inputs and has two outputs: Cooling Load and Heating Load.
def format_output(data):
y1 = data.pop('Y1')
y1 = np.array(y1)
y2 = data.pop('Y2')
y2 = np.array(y2)
return y1, y2
def norm(x):
return (x - train_stats['mean']) / train_stats['std']
def plot_diff(y_true, y_pred, title=''):
plt.scatter(y_true, y_pred)
plt.title(title)
plt.xlabel('True Values')
plt.ylabel('Predictions')
plt.axis('equal')
plt.axis('square')
plt.xlim(plt.xlim())
plt.ylim(plt.ylim())
plt.plot([-100, 100], [-100, 100])
def plot_metrics(history, metric_name, title, ylim=5):
plt.title(title)
plt.ylim(0, ylim)
plt.plot(history.history[metric_name], color='blue', label=metric_name)
plt.plot(history.history['val_' + metric_name], color='green', label='val_' + metric_name)
URI = './dataset/ENB2012_data.xlsx'
# Use pandas excel reader
df = pd.read_excel(URI)
df.dropna(axis=1, inplace=True)
df = df.sample(frac=1).reset_index(drop=True)
# Split the data into train and test with 80 train / 20 test
train, test = train_test_split(df, test_size=0.2)
train_stats = train.describe()
# Get Y1 and Y2 as the 2 outputs and format them as np arrays
train_stats.pop('Y1')
train_stats.pop('Y2')
train_stats = train_stats.transpose()
train_Y = format_output(train)
test_Y = format_output(test)
# Normalize the train and test data
norm_train_X = norm(train)
norm_test_X = norm(test)
df.head()
input_layer = Input(shape=(len(train.columns), ))
first_dense = Dense(units=128, activation='relu')(input_layer)
second_dense = Dense(units=128, activation='relu')(first_dense)
# Y1 output will be fed directly from the second dense
y1_output = Dense(units=1, name='y1_output')(second_dense)
third_dense = Dense(units=64, activation='relu')(second_dense)
# Y2 output will come via the third dense
y2_output = Dense(units=1, name='y2_output')(third_dense)
# Define the model with the input layer and a list of output layers
model = Model(inputs=input_layer, outputs=[y1_output, y2_output])
model.summary()
plot_model(model, show_shapes=True, show_layer_names=True, to_file='./image/multi_output_model.png')
optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)
model.compile(optimizer=optimizer,
loss={
'y1_output':'mse',
'y2_output':'mse'
},
metrics={
'y1_output':tf.keras.metrics.RootMeanSquaredError(),
'y2_output':tf.keras.metrics.RootMeanSquaredError()
})
history = model.fit(norm_train_X, train_Y,
epochs=500, batch_size=10, validation_data=(norm_test_X, test_Y), verbose=0)
loss, Y1_loss, Y2_loss, Y1_rmse, Y2_rmse = model.evaluate(x=norm_test_X, y=test_Y)
print("Loss = {}, Y1_loss = {}, Y1_mse = {}, Y2_loss = {}, Y2_mse = {}".format(loss, Y1_loss, Y1_rmse, Y2_loss, Y2_rmse))
Y_pred = model.predict(norm_test_X)
plot_diff(test_Y[0], Y_pred[0], title='Y1')
plot_diff(test_Y[1], Y_pred[1], title='Y2')
plot_metrics(history, metric_name='y1_output_root_mean_squared_error', title='Y1 RMSE', ylim=6)
plot_metrics(history, metric_name='y2_output_root_mean_squared_error', title='Y2 RMSE', ylim=7)
def create_pairs(x, digit_indices):
'''
Positive and negative pair creation.
Alternates between positive and negative pairs.
'''
pairs = []
labels = []
n = min([len(digit_indices[d]) for d in range(10)]) - 1
for d in range(10):
for i in range(n):
z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
pairs += [[x[z1], x[z2]]]
inc = random.randrange(1, 10)
dn = (d + inc) % 10
z1, z2 = digit_indices[d][i], digit_indices[dn][i]
pairs += [[x[z1], x[z2]]]
labels += [1, 0]
return np.array(pairs), np.array(labels)
def create_pairs_on_set(images, labels):
digit_indices = [np.where(labels == i)[0] for i in range(10)]
pairs, y = create_pairs(images, digit_indices)
y = y.astype('float32')
return pairs, y
def show_image(image):
plt.figure()
plt.imshow(image)
plt.colorbar()
plt.grid(False)
plt.show()
You can now download and prepare our train and test sets. You will also create pairs of images that will go into the multi-input model.
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()
# prepare train and test sets
train_images = train_images.astype('float32')
test_images = test_images.astype('float32')
# normalize values
train_images = train_images / 255.0
test_images = test_images / 255.0
# create pairs on train and test sets
train_pairs, train_y = create_pairs_on_set(train_images, train_labels)
test_pairs, test_y = create_pairs_on_set(test_images, test_labels)
You can see a sample pair of images below.
this_pair = 8
# show images at this index
show_image(test_pairs[this_pair][0])
show_image(test_pairs[this_pair][1])
# print the label for this pair
print(test_y[this_pair])
show_image(train_pairs[:,0][0])
show_image(train_pairs[:,0][1])
show_image(train_pairs[:,1][0])
show_image(train_pairs[:,1][1])
def initialize_base_network():
input = Input(shape=(28,28,), name="base_input")
x = Flatten(name="flatten_input")(input)
x = Dense(128, activation='relu', name="first_base_dense")(x)
x = Dropout(0.1, name="first_dropout")(x)
x = Dense(128, activation='relu', name="second_base_dense")(x)
x = Dropout(0.1, name="second_dropout")(x)
x = Dense(128, activation='relu', name="third_base_dense")(x)
return Model(inputs=input, outputs=x)
def euclidean_distance(vects):
x, y = vects
sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
return K.sqrt(K.maximum(sum_square, K.epsilon()))
def eucl_dist_output_shape(shapes):
shape1, shape2 = shapes
return (shape1[0], 1)
Let's see how our base network looks. This is where the two inputs will pass through to generate an output vector.
base_network = initialize_base_network()
plot_model(base_network, show_shapes=True, show_layer_names=True, to_file='./image/base-siamese-model.png')
Let's now build the Siamese network. The plot will show two inputs going to the base network.
input_a = Input(shape=(28,28,), name="left_input")
vect_output_a = base_network(input_a)
# create the right input and point to the base network
input_b = Input(shape=(28,28,), name="right_input")
vect_output_b = base_network(input_b)
# measure the similarity of the two vector outputs
output = Lambda(euclidean_distance, name="output_layer", output_shape=eucl_dist_output_shape)([vect_output_a, vect_output_b])
# specify the inputs and output of the model
model = Model([input_a, input_b], output)
# plot model graph
plot_model(model, show_shapes=True, show_layer_names=True, to_file='./image/outer-siamese-model.png')
def contrastive_loss_with_margin(margin):
def contrastive_loss(y_true, y_pred):
'''
Contrastive loss from Hadsell-et-al.'06
http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
'''
square_pred = K.square(y_pred)
margin_square = K.square(K.maximum(margin - y_pred, 0))
return (y_true * square_pred + (1 - y_true) * margin_square)
return contrastive_loss
model.compile(loss=contrastive_loss_with_margin(margin=1), optimizer=RMSprop())
history = model.fit([train_pairs[:,0], train_pairs[:,1]], train_y, epochs=20, batch_size=128, validation_data=([test_pairs[:,0], test_pairs[:,1]], test_y))
def compute_accuracy(y_true, y_pred):
'''
Compute classification accuracy with a fixed threshold on distances.
'''
pred = y_pred.ravel() < 0.5
return np.mean(pred == y_true)
loss = model.evaluate(x=[test_pairs[:,0],test_pairs[:,1]], y=test_y)
y_pred_train = model.predict([train_pairs[:,0], train_pairs[:,1]])
train_accuracy = compute_accuracy(train_y, y_pred_train)
y_pred_test = model.predict([test_pairs[:,0], test_pairs[:,1]])
test_accuracy = compute_accuracy(test_y, y_pred_test)
print("Loss = {}, Train Accuracy = {} Test Accuracy = {}".format(loss, train_accuracy, test_accuracy))
def plot_metrics(history, metric_name, title, ylim=5):
plt.title(title)
plt.ylim(0,ylim)
plt.plot(history.history[metric_name],color='blue',label=metric_name)
plt.plot(history.history['val_' + metric_name],color='green',label='val_' + metric_name)
plt.legend()
plot_metrics(history, metric_name='loss', title="Loss", ylim=0.2)
def visualize_images():
plt.rc('image', cmap='gray_r')
plt.rc('grid', linewidth=0)
plt.rc('xtick', top=False, bottom=False, labelsize='large')
plt.rc('ytick', left=False, right=False, labelsize='large')
plt.rc('axes', facecolor='F8F8F8', titlesize="large", edgecolor='white')
plt.rc('text', color='a8151a')
plt.rc('figure', facecolor='F0F0F0')# Matplotlib fonts
# utility to display a row of digits with their predictions
def display_images(left, right, predictions, labels, title, n):
plt.figure(figsize=(17,3))
plt.title(title)
plt.yticks([])
plt.xticks([])
plt.grid(None)
left = np.reshape(left, [n, 28, 28])
left = np.swapaxes(left, 0, 1)
left = np.reshape(left, [28, 28*n])
plt.imshow(left)
plt.figure(figsize=(17,3))
plt.yticks([])
plt.xticks([28*x+14 for x in range(n)], predictions)
for i,t in enumerate(plt.gca().xaxis.get_ticklabels()):
if predictions[i] > 0.5: t.set_color('red') # bad predictions in red
plt.grid(None)
right = np.reshape(right, [n, 28, 28])
right = np.swapaxes(right, 0, 1)
right = np.reshape(right, [28, 28*n])
plt.imshow(right)
You can see sample results for 10 pairs of items below.
y_pred_train = np.squeeze(y_pred_train)
indexes = np.random.choice(len(y_pred_train), size=10)
display_images(train_pairs[:, 0][indexes], train_pairs[:, 1][indexes], y_pred_train[indexes], train_y[indexes], "clothes and their dissimilarity", 10)
Application - Multiple Output Models using the Keras Functional API
In this section, we will use the Keras Functional API to train a model to predict two outputs, and it will use the Wine Quality Dataset from the UCI machine learning repository. It has separate datasets for red wine and white wine.
Normally, the wines are classified into one of the quality ratings specified in the attributes. In this exercise, you will combine the two datasets to predict the wine quality and whether the wine is red or white solely from the attributes.
You will model wine quality estimations as a regression problem and wine type detection as a binary classification problem.
Load Dataset
You will now load the dataset from the UCI Machine Learning Repository.
URI = './dataset/winequality-white.csv'
# load the dataset from the URL
white_df = pd.read_csv(URI, sep=";")
# fill the `is_red` column with zeros.
white_df["is_red"] = 0
# keep only the first of duplicate items
white_df = white_df.drop_duplicates(keep='first')
print(white_df.alcohol[0])
print(white_df.alcohol[100])
URI = './dataset/winequality-red.csv'
# load the dataset from the URL
red_df = pd.read_csv(URI, sep=";")
# fill the `is_red` column with ones.
red_df["is_red"] = 1
# keep only the first of duplicate items
red_df = red_df.drop_duplicates(keep='first')
print(red_df.alcohol[0])
print(red_df.alcohol[100])
df = pd.concat([red_df, white_df], ignore_index=True)
df.alcohol[0]
df.alcohol[100]
In a real-world scenario, you should shuffle the data.
df = df.iloc[np.random.permutation(len(df))]
This will chart the quality of the wines.
df['quality'].hist(bins=20);
Imbalanced data
You can see from the plot above that the wine quality dataset is imbalanced.
- Since there are very few observations with quality equal to 3, 4, 8 and 9, you can drop these observations from your dataset.
- You can do this by removing data belonging to all classes except those > 4 and < 8.
df = df[(df['quality'] > 4) & (df['quality'] < 8 )]
# reset index and drop the old one
df = df.reset_index(drop=True)
print(df.alcohol[0])
print(df.alcohol[100])
You can plot again to see the new range of data and quality
df['quality'].hist(bins=20);
Train Test Split
Next, you can split the datasets into training, test and validation datasets.
- The data frame should be split 80:20 into
train
andtest
sets. - The resulting
train
should then be split 80:20 intotrain
andval
sets. - The
train_test_split
parametertest_size
takes a float value that ranges between 0. and 1, and represents the proportion of the dataset that is allocated to the test set. The rest of the data is allocated to the training set.
train, test = train_test_split(df, test_size=0.2, random_state=1)
# split train into 80:20 train and val sets
train, val = train_test_split(train, test_size=0.2, random_state=1)
Here's where you can explore the training stats. You can pop the labels 'is_red' and 'quality' from the data as these will be used as the labels
train_stats = train.describe()
train_stats.pop('is_red')
train_stats.pop('quality')
train_stats = train_stats.transpose()
train_stats
Get the labels
The features and labels are currently in the same dataframe.
- You will want to store the label columns
is_red
andquality
separately from the feature columns. - The following function,
format_output
, gets these two columns from the dataframe (it's given to you). -
format_output
also formats the data into numpy arrays. - Please use the
format_output
and apply it to thetrain
,val
andtest
sets to get dataframes for the labels.
def format_output(data):
is_red = data.pop('is_red')
is_red = np.array(is_red)
quality = data.pop('quality')
quality = np.array(quality)
return (quality, is_red)
train_Y = format_output(train)
# format the output of the val set
val_Y = format_output(val)
# format the output of the test set
test_Y = format_output(test)
Notice that after you get the labels, the train
, val
and test
dataframes no longer contain the label columns, and contain just the feature columns.
- This is because you used
.pop
in theformat_output
function.
train.head()
def norm(x):
return (x - train_stats['mean']) / train_stats['std']
norm_train_X = norm(train)
# normalize the val set
norm_val_X = norm(val)
# normalize the test set
norm_test_X = norm(test)
Define the Model
Define the model using the functional API. The base model will be 2 Dense
layers of 128 neurons each, and have the 'relu'
activation.
- Check out the documentation for tf.keras.layers.Dense
def base_model(inputs):
# connect a Dense layer with 128 neurons and a relu activation
x = Dense(units=128, activation='relu')(inputs)
# connect another Dense layer with 128 neurons and a relu activation
x = Dense(units=128, activation='relu')(x)
return x
Define output layers of the model
You will add output layers to the base model.
- The model will need two outputs.
One output layer will predict wine quality, which is a numeric value.
- Define a
Dense
layer with 1 neuron. - Since this is a regression output, the activation can be left as its default value
None
.
The other output layer will predict the wine type, which is either red 1
or not red 0
(white).
- Define a
Dense
layer with 1 neuron. - Since there are two possible categories, you can use a sigmoid activation for binary classification.
Define the Model
- Define the
Model
object, and set the following parameters:-
inputs
: pass in the inputs to the model as a list. -
outputs
: pass in a list of the outputs that you just defined: wine quality, then wine type. - Note: please list the wine quality before wine type in the outputs, as this will affect the calculated loss if you choose the other order.
-
def final_model(inputs):
# get the base model
x = base_model(inputs)
# connect the output Dense layer for regression
wine_quality = Dense(units='1', name='wine_quality')(x)
# connect the output Dense layer for classification. this will use a sigmoid activation.
wine_type = Dense(units='1', activation='sigmoid', name='wine_type')(x)
# define the model using the input and output layers
model = Model(inputs=inputs, outputs=[wine_quality, wine_type])
return model
Compiling the Model
Next, compile the model. When setting the loss parameter of model.compile
, you're setting the loss for each of the two outputs (wine quality and wine type).
To set more than one loss, use a dictionary of key-value pairs.
- You can look at the docs for the losses here.
- Note: For the desired spelling, please look at the "Functions" section of the documentation and not the "classes" section on that same page.
- wine_type: Since you will be performing binary classification on wine type, you should use the binary crossentropy loss function for it. Please pass this in as a string.
- Hint, this should be all lowercase. In the documentation, you'll see this under the "Functions" section, not the "Classes" section.
- wine_quality: since this is a regression output, use the mean squared error. Please pass it in as a string, all lowercase.
- Hint: You may notice that there are two aliases for mean squared error. Please use the shorter name.
You will also set the metric for each of the two outputs. Again, to set metrics for two or more outputs, use a dictionary with key value pairs.
- The metrics documentation is linked here.
- For the wine type, please set it to accuracy as a string, all lowercase.
- For wine quality, please use the root mean squared error. Instead of a string, you'll set it to an instance of the class RootMeanSquaredError, which belongs to the tf.keras.metrics module.
inputs = tf.keras.layers.Input(shape=(11,))
rms = tf.keras.optimizers.RMSprop(learning_rate=0.0001)
model = final_model(inputs)
model.compile(optimizer=rms,
loss = {'wine_type' : 'binary_crossentropy',
'wine_quality' : 'mean_squared_error'
},
metrics = {'wine_type' : 'accuracy',
'wine_quality': tf.keras.metrics.RootMeanSquaredError()
}
)
Training the Model
Fit the model to the training inputs and outputs.
- Check the documentation for model.fit.
- Remember to use the normalized training set as inputs.
- For the validation data, please use the normalized validation set.
history = model.fit(x=norm_train_X, y=train_Y,
epochs = 40, validation_data=(norm_val_X, val_Y))
loss, wine_quality_loss, wine_type_loss, wine_quality_rmse, wine_type_accuracy = model.evaluate(x=norm_val_X, y=val_Y)
print()
print(f'loss: {loss}')
print(f'wine_quality_loss: {wine_quality_loss}')
print(f'wine_type_loss: {wine_type_loss}')
print(f'wine_quality_rmse: {wine_quality_rmse}')
print(f'wine_type_accuracy: {wine_type_accuracy}')
predictions = model.predict(norm_test_X)
quality_pred = predictions[0]
type_pred = predictions[1]
quality_pred[0]
def plot_metrics(history, metric_name, title, ylim=5):
plt.title(title)
plt.ylim(0,ylim)
plt.plot(history.history[metric_name],color='blue',label=metric_name)
plt.plot(history.history['val_' + metric_name],color='green',label='val_' + metric_name)
def plot_confusion_matrix(y_true, y_pred, title='', labels=[0,1]):
cm = confusion_matrix(y_true, y_pred)
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(cm)
plt.title('Confusion matrix of the classifier')
fig.colorbar(cax)
ax.set_xticklabels([''] + labels)
ax.set_yticklabels([''] + labels)
plt.xlabel('Predicted')
plt.ylabel('True')
fmt = 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="black" if cm[i, j] > thresh else "white")
plt.show()
def plot_diff(y_true, y_pred, title = '' ):
plt.scatter(y_true, y_pred)
plt.title(title)
plt.xlabel('True Values')
plt.ylabel('Predictions')
plt.axis('equal')
plt.axis('square')
plt.plot([-100, 100], [-100, 100])
return plt
plot_metrics(history, 'wine_quality_root_mean_squared_error', 'RMSE', ylim=2)
plot_metrics(history, 'wine_type_loss', 'Wine Type Loss', ylim=0.2)
plot_confusion_matrix(test_Y[1], np.round(type_pred), title='Wine Type', labels = [0, 1])
scatter_plot = plot_diff(test_Y[0], quality_pred, title='Type')