I am using latest TensorFlow NGC container nvcr.io/nvidia/tensorflow:20.09-tf2-py3
When running my Deep Learning model on 1 GPU (Tesla T4) within the NGC container 20.09, I don't get reproductible results between consecutive runs (different predictions) although all the seeds are set correctly. Running the same code on CPU gives reproductible results over consecutive runs.
On a small dataset (138k samples), I managed to get reproductible results within the NGC by setting the default float type to float64 (instead of float32).
But with a bigger dataset (2.6m samples), after 1million of samples, the predictions start to differ over different runs.
The more I increase the dataset size, the bigger is the difference between predictions of different consecutive runs.
My datasets are tfrecord files of hashed values, serialized by batch of 10,000 samples.
Here are the small_dataset (size 13M):
small_dataset
and the big_dataset (size 251M) - can you send me your email to share it with you via Google Drive to be able to reproduce the issue?
You need to unzip them before running the script bellow.
import tensorflow as tf
import sys
import string
import os
from tensorflow.python.keras.initializers import RandomNormal, GlorotUniform, Zeros, glorot_normal
from tensorflow.python.keras.regularizers import l2
from tensorflow.compat.v1.keras.layers import Layer, Embedding, Input, Dense, Flatten, Add
from tensorflow.python.keras import backend as K
import numpy as np
import logging
tf.get_logger().setLevel(logging.ERROR)
K.set_floatx('float64')
prediction_output_name = "pred"
SEED = 1024
os.environ['PYTHONHASHSEED'] = str(SEED)
np.random.seed(SEED)
tf.compat.v1.set_random_seed(SEED)
class Linear(Layer):
def call(self, x):
return tf.reduce_sum(x, axis=[1])
def compute_output_shape(self):
return None, 1
class FM(Layer):
def __init__(self, **kwargs):
super(FM, self).__init__(**kwargs)
def build(self, input_shape):
if len(input_shape) != 3:
raise ValueError("Unexpected inputs dimensions % d,\
expect to be 3 dimensions" % (len(input_shape)))
super(FM, self).build(input_shape)
def call(self, inputs):
if K.ndim(inputs) != 3:
raise ValueError(
"Unexpected inputs dimensions %d, expect to be 3 dimensions"
% (K.ndim(inputs)))
concated_embeds_value = inputs
square_of_sum = tf.square(tf.compat.v1.reduce_sum(
concated_embeds_value, axis=1, keep_dims=True))
sum_of_square = tf.compat.v1.reduce_sum(
concated_embeds_value * concated_embeds_value, axis=1, keep_dims=True)
cross_term = square_of_sum - sum_of_square
cross_term = 0.5 * tf.compat.v1.reduce_sum(cross_term, axis=2, keep_dims=False)
return cross_term
def compute_output_shape(self):
return None, 1
class DNN(Layer):
def __init__(self, hidden_units, activation='relu', l2_reg=0, dropout_rate=0, use_bn=False, seed=1024, **kwargs):
self.hidden_units = hidden_units
self.activation = activation
self.dropout_rate = dropout_rate
self.seed = seed
self.l2_reg = l2_reg
self.use_bn = use_bn
super(DNN, self).__init__(**kwargs)
def build(self, input_shape):
input_size = input_shape[-1]
hidden_units = [int(input_size)] + list(self.hidden_units)
self.kernels = [self.add_weight(name='kernel' + str(i),
shape=(
hidden_units[i], hidden_units[i + 1]),
initializer=glorot_normal(
seed=self.seed),
# initializer=Constant(0.4),
regularizer=l2(self.l2_reg),
trainable=True) for i in range(len(self.hidden_units))]
self.bias = [self.add_weight(name='bias' + str(i),
shape=(self.hidden_units[i],),
initializer=Zeros(),
trainable=True) for i in range(len(self.hidden_units))]
if self.use_bn:
self.bn_layers = [tf.keras.layers.BatchNormalization() for _ in range(len(self.hidden_units))]
self.dropout_layers = [tf.keras.layers.Dropout(self.dropout_rate, seed=self.seed + i) for i in
range(len(self.hidden_units))]
self.activation_layers = [tf.keras.layers.Activation(self.activation) for _ in range(len(self.hidden_units))]
super(DNN, self).build(input_shape)
def call(self, inputs, training=None):
deep_input = inputs
for i in range(len(self.hidden_units)):
fc = tf.math.add(tf.tensordot(
deep_input, self.kernels[i], axes=(-1, 0)), self.bias[i])
if self.use_bn:
fc = self.bn_layers[i](fc, training=training)
fc = self.activation_layers[i](fc)
fc = self.dropout_layers[i](fc, training=training)
deep_input = fc
return deep_input
def compute_output_shape(self, input_shape):
if len(self.hidden_units) > 0:
shape = input_shape[:-1] + (self.hidden_units[-1],)
else:
shape = input_shape
return tuple(shape)
def get_config(self, ):
config = {'activation': self.activation, 'hidden_units': self.hidden_units,
'l2_reg': self.l2_reg, 'use_bn': self.use_bn, 'dropout_rate': self.dropout_rate, 'seed': self.seed}
base_config = super(DNN, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
class PredictionLayer(Layer):
def __init__(self, task='binary', use_bias=True, **kwargs):
if task not in ["binary", "multiclass", "regression"]:
raise ValueError("task must be binary,multiclass or regression")
self.task = task
self.use_bias = use_bias
super(PredictionLayer, self).__init__(**kwargs)
def build(self, input_shape):
if self.use_bias:
self.global_bias = self.add_weight(
shape=(1,), initializer=Zeros(), name="global_bias")
super(PredictionLayer, self).build(input_shape)
def call(self, inputs):
x = inputs
if self.use_bias:
x = tf.nn.bias_add(x, self.global_bias, data_format='NHWC')
if self.task == "binary":
x = tf.sigmoid(x)
output = tf.cast(tf.reshape(x, (-1, 1)), tf.float32)
return output
def compute_output_shape(self):
return None, 1
def get_config(self, ):
config = {'task': self.task, 'use_bias': self.use_bias}
base_config = super(PredictionLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def input_pipeline(dataset_file, serialized_features, epochs):
dataset = create_dataset(dataset_file, serialized_features, epochs)
dataset.prefetch(1)
iterator = tf.compat.v1.data.make_one_shot_iterator(dataset)
training_init_op = iterator.make_initializer(dataset)
return iterator, training_init_op
def create_dataset(dataset_file, serialized_features, epochs):
decode_func = make_decoder(serialized_features)
return tf.data.TFRecordDataset(dataset_file).map(decode_func).batch(1).repeat(epochs)
def make_decoder(serialized_features):
features_dict = {f: tf.io.VarLenFeature(dtype=tf.int64) for f in serialized_features}
features_dict['label'] = tf.io.VarLenFeature(dtype=tf.int64)
def decode(serialized_example):
features = tf.io.parse_example(
[serialized_example],
features_dict
)
return features
return decode
def select_features(features, hash_size, model_features):
selected_features = []
for feature_name in model_features:
feature_tensor = features.get(feature_name[0])
dense_feature_tensor = tf.transpose(tf.sparse.to_dense(tf.cast(feature_tensor, dtype=tf.int32)))
dense_feature_tensor = tf.math.mod(dense_feature_tensor, hash_size)
selected_features.append(dense_feature_tensor)
return selected_features
def build_optimizer(learning_rate, optimizer):
if optimizer == 'RMSProp':
opt = tf.compat.v1.train.RMSPropOptimizer(learning_rate=learning_rate)
elif optimizer == 'adagrad':
opt = tf.compat.v1.train.AdagradOptimizer(learning_rate=learning_rate)
elif optimizer == 'adam':
opt = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
else:
opt = tf.compat.v1.train.GradientDescentOptimizer(learning_rate=learning_rate)
return opt
def predictions_saver(prediction_path, predictions):
with open(prediction_path, "a") as output_file:
output_file.write('\n'.join([str(x) for x in predictions]) + '\n')
def summarize_weights(session):
if hasattr(session, 'raw_session'): session = session.raw_session()
weights = session.run(tf.compat.v1.trainable_variables())
summary = sum(map(lambda x: x.sum(), weights))
print("Summary of weights: %.20f" % summary)
if __name__ == '__main__':
dataset_file = sys.argv[1]
prediction_path = sys.argv[2]
header = list(string.ascii_uppercase) + list(string.ascii_lowercase) + [str(i) for i in range (10)]
epochs = 1
hash_size = 2 ** 25
model_features = "A-B-C-D".split('-')
init_std = 0.01
embedding_size = 4
l2_reg_lr, l2_reg_nn, l2_reg_emb = 0, 0, 0
hidden_units_num = 200
hidden_layers_num = 4
dnn_activation_function = "relu"
dnn_dropout = 0
learning_rate = 0.0004
optimizer = "adam"
graph = tf.Graph()
with graph.as_default():
# Input pipeline
iterator, training_init_op = input_pipeline(dataset_file, header, epochs)
features = iterator.get_next()
selected_features = select_features(features, hash_size, model_features)
stacked_features = tf.stack(selected_features, axis=1)
inputs = tf.reshape(stacked_features, [-1, len(model_features)])
label = tf.reshape(tf.sparse.to_dense(features['label']), [-1])
# Linear part
emb_linear = tf.compat.v1.keras.layers.Embedding(
hash_size,
1,
embeddings_initializer=RandomNormal(mean=0.0, stddev=init_std, seed=SEED),
embeddings_regularizer=l2(l2_reg_emb),
embeddings_constraint=None,
mask_zero=False,
input_length=len(model_features)
)(inputs)
linear_logit = Linear(l2_reg_lr)(emb_linear)
# Embeddings for FM / DNN
emb_fm = tf.compat.v1.keras.layers.Embedding(
hash_size,
embedding_size,
embeddings_initializer=RandomNormal(mean=0.0, stddev=init_std, seed=SEED),
embeddings_regularizer=l2(l2_reg_emb),
embeddings_constraint=None,
mask_zero=False,
input_length=len(model_features)
)(inputs)
# FM part
fm_logit = FM()(emb_fm)
# DNN part
dnn_input = Flatten()(emb_fm)
dnn_output = DNN([hidden_units_num] * hidden_layers_num, activation=dnn_activation_function, l2_reg=l2_reg_nn,
dropout_rate=dnn_dropout, seed=SEED)(dnn_input)
# dnn_logit = Dense(1, use_bias=False, activation=None, kernel_initializer="ones")(dnn_output)
dnn_logit = Dense(1, use_bias=False, activation=None, kernel_initializer=GlorotUniform(seed=SEED))(dnn_output)
# Add together
final_logit = Add()([linear_logit, fm_logit, dnn_logit])
# Prediction layer
pred = tf.reshape(PredictionLayer('binary')(final_logit), [-1], name=prediction_output_name)
loss = tf.identity(tf.compat.v1.losses.log_loss(label, pred), name='LOSS')
opt = build_optimizer(learning_rate, optimizer)
optimizer = opt.minimize(loss)
init_all_vars = tf.compat.v1.global_variables_initializer()
saver = tf.compat.v1.train.Saver()
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session = tf.compat.v1.Session(graph=graph, config=config)
session.run(init_all_vars)
session.run(training_init_op)
summarize_weights(session)
while True:
try:
inp, predictions, _ = session.run([inputs, pred, optimizer])
summarize_weights(session)
with open(prediction_path, "a") as output_file:
output_file.write('\n'.join([str(x) for x in predictions]) + '\n')
except tf.errors.OutOfRangeError:
break
session.close()
Could you help me to understand why I don't get the exact same predictions after consecutive runs ?