'''
Description: various functions for deep learning built on-top of tensorflow. The high-level modules in the package
call these functions.
- File name: network_ops.py
- Author: Lloyd Windrim
- Date created: June 2019
- Python package: deephyp
'''
import tensorflow as tf
import math
import numpy as np
from os.path import join, exists, basename, split
import json
[docs]def create_variable(shape,method='gaussian',wd=False):
""" Setup a trainable variable (collection of parameters) of a particular shape.
Args:
shape (list): Data shape.
method (str): How to initialise parameter values.
wd (boolean): Setup weight decay for this variable.
Returns:
(tensor): Set of parameters for the given variable.
"""
return tf.Variable(init_weight(method, shape, wd=wd))
[docs]def layer_fullyConn(input, W, b):
""" Define a fully connected layer operation. Also called a 'dense' layer.
Args:
input (tensor): Data input into the layer. Shape [numSamples x numInputNeurons].
W (tensor): Weight parameters for the layer. Shape [numInputNeurons x numOutputNeurons].
b (tensor): Bias parameters for the layer. Shape [numOutputNeurons].
Returns:
(tensor): Computes layer output. Shape [numSamples x numOutputNeurons].
"""
return tf.matmul(input, W) + b
[docs]def layer_conv1d(input, W, b, stride=1,padding='SAME'):
""" Define a 1 dimensional convolution layer operation.
Args:
input (tensor): Data input into the layer. Shape [numSamples x numInputNeurons x numFiltersIn].
W (tensor): Weight parameters of the filters/kernels. Shape [filterSize x numFiltersIn x numFiltersOut].
b (tensor): Bias parameters for the layer. Shape [numFiltersOut].
stride (int): Stride at which to convolve (must be >= 1).
padding (str): Type of padding to use ('SAME' or 'VALID').
Returns:
(tensor): Computes layer output. Shape [numSamples x numOutputNeurons x numFiltersOut].
"""
if (padding!='SAME')&(padding!='VALID'):
raise ValueError('unknown padding type: %s. Use SAME or VALID' % padding)
if stride < 1:
raise ValueError('stride must be greater than 0. Stride = %d found in conv layer.'% stride)
return tf.nn.conv1d(input,W,stride=stride,padding=padding) + b
[docs]def layer_deconv1d(input, W, b, outputShape, stride=1,padding='SAME'):
""" Define a 1 dimensional deconvolution layer operation. Also called convolutional transpose or upsampling layer.
Args:
input (tensor): Data input into the layer. Shape [numSamples x numInputNeurons x numFiltersIn].
W (tensor): Weight parameters of the filters/kernels. Shape [filterSize x numFiltersOut x numFiltersIn].
b (tensor): Bias parameters for the layer. Shape [numFiltersOut].
outputShape (list): Expected shape of the layer output. Shape [numSamples x numOutputNeurons x numFiltersOut].
stride (int): Stride at which to convolve (must be >= 1).
padding (str): Type of padding to use ('SAME' or 'VALID').
Returns:
(tensor): Computes layer output. Shape [numSamples x numOutputNeurons x numFiltersOut].
"""
if (padding!='SAME')&(padding!='VALID'):
raise ValueError('unknown padding type: %s. Use SAME or VALID' % padding)
if stride < 1:
raise ValueError('stride must be greater than 0. Stride = %d found in deconv layer.'% stride)
return tf.nn.conv1d_transpose(input,W,outputShape,strides=stride,padding=padding) + b
[docs]def layer_activation(input, func='sigmoid'):
""" Define an activation function operation.
Args:
input (tensor): Data input into the function.
func (str): Type of activation function. (relu, sigmoid, linear).
Returns:
(tensor): Computes activation. Shape is same as input.
"""
if func == 'relu':
a = tf.nn.relu(input)
elif func == 'sigmoid':
a = tf.nn.sigmoid(input)
elif func == 'linear':
a = input
else:
raise ValueError('unknown activation function: %s. Use relu, sigmoid or linear.' % func)
return a
[docs]def conv_output_shape(inputShape, filterSize, padding, stride):
""" Computes the expected output shape (for the convolving axis only) of a convolution layer given an input shape.
Args:
inputShape (int): Shape of convolving axis of input data.
filterSize (int): Size of filter/kernel of convolution layer.
stride (int): Stride at which to convolve (must be >= 1).
padding (str): Type of padding to use ('SAME' or 'VALID').
Returns:
(int): Output shape of convolving axis for given layer and input shape.
"""
if padding=='VALID':
outputShape = np.ceil( (inputShape - (filterSize-1))/stride )
elif padding=='SAME':
outputShape = np.ceil(inputShape / stride)
else:
raise ValueError('unknown padding type: %s. Use SAME or VALID' % padding)
return int(outputShape)
[docs]def train_step(loss, learning_rate=1e-3, decay_steps=None, decay_rate=None, piecewise_bounds=None, piecewise_values=None,
method='Adam'):
""" Operation for training the weights of the network by optimising them to minimise the loss function. Note that \
the default is a constant learning rate (no decay).
Args:
loss (tensor): Output of network loss function.
learning_rate: (float) Controls the degree to which the weights are updated during training.
decay_steps (int): Epoch frequency at which to decay the learning rate.
decay_rate (float): Fraction at which to decay the learning rate.
piecewise_bounds (int list): Epoch step intervals for decaying the learning rate. Alternative to decay steps.
piecewise_values (float list): Rate at which to decay the learning rate at the piecewise_bounds.
method (str): Optimisation method. (Adam, SGD).
Returns:
(op) A train op.
"""
global_step = tf.Variable(0, trainable=False, name='global_step')
# update learning rate for current step
if decay_rate != None:
lr = tf.train.exponential_decay(learning_rate,
global_step,
decay_steps,
decay_rate, staircase=True)
elif piecewise_bounds != None:
lr = tf.train.piecewise_constant(global_step, piecewise_bounds, [learning_rate] + piecewise_values)
else:
lr = learning_rate
if method == 'Adam':
optimizer = tf.train.AdamOptimizer(lr)
elif method == 'SGD':
optimizer = tf.train.GradientDescentOptimizer(lr)
else:
raise ValueError('unknown optimisation method: %s. Use Adam or SGD.' % method)
train_op = optimizer.minimize(loss, global_step=global_step)
return train_op
[docs]def loss_function_reconstruction_1D(y_reconstructed,y_target,func='SSE'):
""" Reconstruction loss function op, comparing 1D tensors for network reconstruction and target.
Args:
y_reconstructed (tensor): Output of network (reconstructed 1D vector). Shape [numSamples x inputSize].
y_target (tensor): What the network is trying to reconstruct (1D vector). Shape [numSamples x inputSize].
func (string): The name of the loss function to be used. 'SSE'-sum of square errors,'CSA'-cosine spectral angle, \
'SA'-spectral angle, 'SID'-spectral information divergence.
Returns:
(tensor): Reconstruction loss.
"""
if func == 'SSE':
# sum of squared errors loss
loss = tf.reduce_sum( tf.square(y_target - y_reconstructed) )
elif func == 'CSA':
# cosine of spectral angle loss
normalize_r = tf.math.l2_normalize(tf.transpose(y_reconstructed),axis=0)
normalize_t = tf.math.l2_normalize(tf.transpose(y_target),axis=0)
loss = tf.reduce_sum( 1 - tf.reduce_sum(tf.multiply(normalize_r, normalize_t),axis=0 ) )
elif func == 'SA':
# spectral angle loss
normalize_r = tf.math.l2_normalize(tf.transpose(y_reconstructed),axis=0)
normalize_t = tf.math.l2_normalize(tf.transpose(y_target),axis=0)
loss = tf.reduce_sum( tf.math.acos(tf.reduce_sum(tf.multiply(normalize_r, normalize_t),axis=0 ) ) )
elif func == 'SID':
# spectral information divergence loss
t = tf.divide( tf.transpose(y_target) , tf.reduce_sum(tf.transpose(y_target),axis=0) )
r = tf.divide( tf.transpose(y_reconstructed) , tf.reduce_sum(tf.transpose(y_reconstructed),axis=0) )
loss = tf.reduce_sum( tf.reduce_sum( tf.multiply(t,tf.log(tf.divide(t,r))) , axis=0)
+ tf.reduce_sum( tf.multiply(r,tf.log(tf.divide(r,t))) , axis=0) )
else:
raise ValueError('unknown loss function: %s. Use SSE, CSA, SA or SID.' % func)
return loss
[docs]def loss_function_crossentropy_1D( y_pred, y_target, class_weights=None, num_classes=None):
""" Cross entropy loss function op, comparing 1D tensors for network prediction and target. Weights the classes \
when calculating the loss to balance un-even training batches. If class weights are not provided, then no \
weighting is done (weight of 1 assigned to each class).
Args:
y_pred (tensor): Output of network (1D vector of class scores). Shape [numSamples x numClasses].
y_target (tensor): One-hot classification labels (1D vector). Shape [numSamples x numClasses].
class_weights (tensor): Weight for each class. Shape [numClasses].
num_classes (int):
Returns:
(tensor): Cross-entropy loss.
"""
if class_weights==None:
class_weights = tf.constant(1,shape=[num_classes],dtype=tf.dtypes.float32)
sample_weights = tf.reduce_sum( tf.multiply(y_target, class_weights ), axis=1) # weight of each sample
loss = tf.reduce_mean( tf.losses.softmax_cross_entropy(
onehot_labels=y_target,logits=y_pred,weights=sample_weights ) )
return loss
[docs]def loss_weight_decay(wdLambda):
""" Weight decay loss op, regularises network by penalising parameters for being too large.
Args:
wdLambda (float): Scalar to control weighting of weight decay in loss.
Returns:
(tensor) : Weight-decay loss.
"""
return tf.multiply( wdLambda , tf.reduce_sum(tf.get_collection('wd')) )
[docs]def balance_classes(y_target,num_classes):
""" Calculates the class weights needed to balance the classes, based on the number of samples of each class in the \
batch of data.
Args:
y_target (tensor): One-hot classification labels (1D vector). Shape [numSamples x numClasses]
num_classes (int):
Returns:
(tensor): A weighting for each class that balances their contribution to the loss. Shape [numClasses].
"""
y_target = tf.reshape( y_target, [-1, num_classes] )
class_count = tf.add( tf.reduce_sum( y_target, axis=0 ), tf.constant( [1]*num_classes, dtype=tf.float32 ) )
class_weights = tf.multiply( tf.divide( tf.ones( ( 1, num_classes) ), class_count ), tf.reduce_max( class_count ) )
return class_weights
[docs]def save_model(addr,sess,saver,current_epoch,epochs_to_save):
"""Saves a checkpoint at a list of epochs.
Args:
addr (str): Address of a directory to save checkpoint for current epoch.
sess (obj): Tensor flow session object.
saver (obj): Tensor flow save object.
current_epoch (int): The current epoch.
epochs_to_save (int list): Epochs to save checkpoints at.
"""
if current_epoch in epochs_to_save:
saver.save(sess, join(addr,"epoch_%i"%(current_epoch),"model.ckpt"))
[docs]def load_model(addr,sess):
"""Loads a model from the address of a checkpoint.
Args:
addr (str): Address of a directory to save checkpoint for current epoch.
sess (obj): Tensor flow session object.
"""
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, join(addr, 'model.ckpt'))
[docs]def save_config(net_obj,addr):
"""Saves a network config file. Saves the variables listed in net_config within the network object.
Args:
net_obj (obj): Network object.
addr (obj): Directory of where to store the config.json file.
"""
data = {}
for config_parameter in net_obj.net_config:
data[config_parameter] = getattr(net_obj,config_parameter)
with open(join(addr,'config.json'), 'w') as outfile:
json.dump(data, outfile)
[docs]def load_config(net_obj,addr):
"""Loads a network config file. Loads from variables in the config.json file and overwrites variables in network \
object. Applies to variables in the net_config list in the network object.
Args:
net_obj (obj): Network object.
addr (obj): Directory location of config.json file.
"""
with open(addr, 'r') as outfile:
data = json.load(outfile)
for config_parameter in data:
setattr(net_obj,config_parameter,data[config_parameter])
[docs]def train( net_obj , dataTrain, dataVal, train_op_name, n_epochs, save_addr, visualiseRateTrain=0, visualiseRateVal=0,
save_epochs=[1000] ):
""" Function for training a network. Updates the network weights through the training op. The function will check \
the save address for a model checkpoint to load, otherwise it will begin training from scratch.
Args:
net_obj (obj): Network object.
dataTrain (obj): Iterator object for training data.
dataVal (obj): Iterator object for validation data.
train_op_name (string): Name of training op created.
n_epochs (int): Number of loops through dataset to train for.
save_addr (str): Address of a directory to save checkpoints for desired epochs, or address of saved checkpoint. \
If address is for an epoch and contains a previously saved checkpoint, then the network will \
start training from there. Otherwise it will be trained from scratch.
visualiseRateTrain (int): Epoch rate at which to print training loss in console.
visualiseRateVal (int): Epoch rate at which to print validation loss in console.
save_epochs (int list): Epochs to save checkpoints at.
"""
if np.shape(dataTrain.dataSamples)[1] != net_obj.inputSize:
raise Exception('the data dimensionality must match the network input size. '
'Data size: %d, network input size: %d'%(np.shape(dataTrain.dataSamples)[1], net_obj.inputSize))
batchSize = dataTrain.batchSize
numSamples = dataTrain.numSamples
numIters = numSamples // batchSize
if (numSamples % batchSize)>0:
numIters+=1
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
# check if addr has 'epoch' in name and contains a checkpoint
if exists(join(save_addr,'checkpoint')) & ('epoch' in basename(save_addr)):
# load a checkpoint
saver.restore(sess,join(save_addr,'model.ckpt'))
epoch_start = int((basename(save_addr)).split('_')[-1]) + 1
save_addr = split(save_addr)[0]
else:
# save directory is empty
epoch_start = 0
# create network config file in directory
save_config(net_obj,save_addr)
for epoch_i in range(epoch_start, n_epochs+1):
train_error = []
for batch_i in range(numIters):
train_batch_x , train_batch_y = dataTrain.next_batch()
# update weights and biases
sess.run(net_obj.train_ops['%s_train'%(train_op_name)], feed_dict={net_obj.x: train_batch_x,
net_obj.y_target: train_batch_y})
# training loss
if visualiseRateTrain > 0:
if epoch_i % visualiseRateTrain == 0:
train_error.append( net_obj.train_ops['%s_loss' % (train_op_name)].eval(
{net_obj.x: train_batch_x, net_obj.y_target: train_batch_y}) )
if batch_i == numIters - 1:
dataTrain.reset_batch()
# outputs average batch error
if visualiseRateTrain > 0:
if epoch_i % visualiseRateTrain == 0:
train_error = np.array(train_error)
print("epoch: %d, training loss: %g" % (epoch_i, np.mean(train_error)))
# iterate over validation samples and output loss
if visualiseRateVal > 0:
if epoch_i % visualiseRateVal == 0:
val_error = []
for batch_i in range(dataVal.numSamples // dataVal.batchSize):
val_batch_x, val_batch_y = dataVal.next_batch()
val_error.append( net_obj.train_ops['%s_loss' % (train_op_name)].eval(
{net_obj.x: val_batch_x, net_obj.y_target: val_batch_y}) )
if batch_i == (dataVal.numSamples // dataVal.batchSize)-1:
dataVal.reset_batch()
val_error = np.array(val_error)
print("epoch: %d, validation loss: %g" % (epoch_i, np.mean(val_error)))
save_model(save_addr,sess,saver,epoch_i,save_epochs)
[docs]def init_weight(opts, shape, stddev=0.1, const=0.1, wd = False, dtype=tf.float32):
""" Weight initialisation function.
Args:
opts (str): Method for initialising variable. ('gaussian','truncated_normal','xavier','xavier_improved', \
'constant').
shape (list): Data shape.
stddev (int): Standard deviation used by 'gaussian' and 'truncated_normal' variable initialisation methods.
const (int): Constant value to initialise variable to if using 'constant' method.
wd (boolean): Whether this variable contributes to weight decay or not.
dtype (tf.dtype): Data type for variable.
Returns:
weights:
"""
if opts == 'gaussian':
weights = tf.random_normal(shape, stddev=stddev, dtype=dtype)
elif opts == 'truncated_normal':
weights = tf.truncated_normal(shape, stddev=stddev)
elif opts == 'xavier':
h = shape[0]
w = shape[1]
try:
num_in = shape[2]
except:
num_in = 1
sc = math.sqrt(3.0 / (h * w * num_in))
weights = tf.multiply(tf.random_normal(shape, dtype=dtype) * 2 - 1, sc)
elif opts == 'xavier_improved':
h = shape[0]
w = shape[1]
try:
num_out = shape[3]
except:
num_out = 1
sc = math.sqrt(2.0 / (h * w * num_out))
weights = tf.multiply(tf.random_normal(shape, dtype=dtype), sc)
elif opts == 'constant':
weights = tf.constant(const, shape)
else:
raise ValueError('Unknown weight initialization method %s' % opts)
# set up weight decay on weights
if wd:
weight_decay = tf.nn.l2_loss(weights)
tf.add_to_collection('wd', weight_decay)
return weights