基于Tensorflow2 和Keras的模型构建及训练过程的几种写法
针对深度神经网络,Tensorflow2和Keras提供了丰富的API来构建和训练模型。
本文尝试使用多种方法来构建模型、训练模型。
1. 加载Tensorflow
这里加载了tensorflow和time库,其中tensorflow版本为2.0+。
import tensorflow as tf
import time
2. 加载数据
本文使用CIFAR10数据库来进行实验,并通过tensorflow来加载数据。
其中数据的shape为(None, 32, 32, 3),(None, 1, 1)。
在训练前,需要对数据进行预处理,主要包括:
- 图像归一化
- 标签reshape为(None, 1)
- 标签onehot编码
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.cifar10.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0
train_labels = tf.reshape(train_labels, shape=[train_labels.shape[0]])
test_labels = tf.reshape(test_labels, shape=[test_labels.shape[0]])
train_labels, test_labels = tf.one_hot(train_labels, depth=10), tf.one_hot(test_labels, depth=10)
预处理完后,构建Dataset示例,方便后期训练。
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_dataset = test_dataset.shuffle(buffer_size=1024).batch(64)
3. 构建模型
本文使用一个简单的ResNet结构的模型来进行实验。其中,模型共包括2个Residual Block。具体的模型结构见代码。
下面尝试使用多种方法来构建模型,如:
- 函数式API
- 子类化模型
- 序列化模型
实际应用中,根据具体场景选取合适的构建方式即可。
3.1 函数式API
inputs = tf.keras.Input(shape=(32, 32, 3))
x = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')(inputs)
x = tf.keras.layers.MaxPooling2D()(x)
temp = x
for filter in [64, 128]:
x = tf.keras.layers.Conv2D(filter, 3, padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.ReLU()(x)
x = tf.keras.layers.Conv2D(filter, 3, padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)
res = tf.keras.layers.Conv2D(filter, 1)(temp)
x = tf.keras.layers.add([x, res])
x = tf.keras.layers.ReLU()(x)
x = tf.keras.layers.MaxPooling2D()(x)
temp = x
x = tf.keras.layers.GlobalAvgPool2D()(x)
x = tf.keras.layers.Dense(128)(x)
outputs = tf.keras.layers.Dense(10, activation='softmax')(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
3.2 子类化模型
class ResidualBlock(tf.keras.layers.Layer):
def __init__(self, filter, **kwargs):
super(ResidualBlock, self).__init__(**kwargs)
self.filter = filter
self.conv2D_1 = tf.keras.layers.Conv2D(filter, 3, padding='same')
self.conv2D_2 = tf.keras.layers.Conv2D(filter, 3, padding='same')
self.conv2D_3 = tf.keras.layers.Conv2D(filter, 1)
self.activation_1 = tf.keras.layers.ReLU()
self.activation_2 = tf.keras.layers.ReLU()
self.normlization_1 = tf.keras.layers.BatchNormalization()
self.normlization_2 = tf.keras.layers.BatchNormalization()
self.maxpooling = tf.keras.layers.MaxPooling2D()
def call(self, inputs, *args, **kwargs):
x = self.conv2D_1(inputs)
x = self.normlization_1(x)
x = self.activation_1(x)
x = self.conv2D_2(x)
x = self.normlization_2(x)
res = self.conv2D_3(inputs)
x = tf.keras.layers.add([x, res])
x = self.activation_2(x)
return self.maxpooling(x)
class ResNet(tf.keras.Model):
def __init__(self, outputs_shape=10):
super(ResNet, self).__init__()
self.conv2d_1 = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')
self.block_1 = ResidualBlock(64)
self.block_2 = ResidualBlock(128)
self.maxpooling = tf.keras.layers.MaxPooling2D()
self.globalpooling = tf.keras.layers.GlobalAvgPool2D()
self.dense = tf.keras.layers.Dense(128)
self.classifier = tf.keras.layers.Dense(outputs_shape, activation='softmax')
self.inputs = tf.keras.Input(shape=(32, 32, 3))
self.outputs = self.call(self.inputs)
def build_model(self):
return tf.keras.Model(inputs=self.inputs, outputs=self.call(self.inputs))
def call(self, inputs):
x = self.conv2d_1(inputs)
x = self.maxpooling(x)
x = self.block_1(x)
x = self.block_2(x)
x = self.globalpooling(x)
x = self.dense(x)
return self.classifier(x)
model = ResNet()
model.build_model().summary()
3.3 序列化模型
序列化模型通过构建tf.keras.Sequential类,通过add方法加入层。
由于序列化模型过于繁琐,这里不再赘述。
4. 训练模型
常用的模型训练方法主要包括三种:
- 使用Keras内置方法,如compile,fit,predict等
- 使用tensorflow自定义训练循环
- 重载Keras的train_step和test_tesp
4.1 Keras内置方法
通过3.1,3.2和3.3 中所述的方法构建得到的模型可以直接通过Keras自带的方法进行训练。
model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
metrics=[
tf.keras.metrics.CategoricalAccuracy(),
tf.keras.metrics.TopKCategoricalAccuracy(k=3),
]
)
model.fit(train_dataset, batch_size=64, epochs=100, validation_data=test_dataset)
4.2 自定义训练循环
Keras自带的训练方法在某些时候可能无法满足我们的需求,此时可以自定义训练循环。
# 优化器
optimizer = tf.keras.optimizers.Adam()
# 损失函数
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
loss_metrics = tf.keras.metrics.Mean()
# 评价指标
acc = tf.keras.metrics.CategoricalAccuracy()
topacc = tf.keras.metrics.TopKCategoricalAccuracy(k=3)
epochs = 100
for epoch in range(epochs):
print("\nepoch {}". format(epoch))
start_time = time.time()
# train_step
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
with tf.GradientTape() as tape:
y_batch_pre = model(x_batch_train, training=True) # 前向传播
loss_value = loss(y_batch_train, y_batch_pre)
optimizer.minimize(loss_value, model.trainable_weights, tape=tape) # 反向传播更新参数
loss_metrics.update_state(loss_value)
acc.update_state(y_batch_train, y_batch_pre)
topacc.update_state(y_batch_train, y_batch_pre)
train_time = time.time()
print(' - train - time:{:.4f}\t loss:{:.4f}\t acc:{:.4f}\t top_acc:{:.4f}'
.format(train_time-start_time, loss_metrics.result(), acc.result(), topacc.result()))
loss_metrics.reset_state()
acc.reset_state()
topacc.reset_state()
# test_step
for x_batch_test, y_batch_test in test_dataset:
y_batch_pre = model(x_batch_test, training=False)
loss_value = loss(y_batch_test, y_batch_pre)
loss_metrics.update_state(loss_value)
acc.update_state(y_batch_test, y_batch_pre)
topacc.update_state(y_batch_test, y_batch_pre)
end_time = time.time()
print(' - valid - time:{:.4f}\t loss:{:.4f}\t acc:{:.4f}\t top_acc:{:.4f}'
.format(end_time-train_time, loss_metrics.result(), acc.result(), topacc.result()))
loss_metrics.reset_state()
acc.reset_state()
topacc.reset_state()
4.3 重载train_step和test_step
类似的,可以通过重载tf.keras.Model类中的train_step和test_step方法来满足某些自定义的需求。
根据情况的不同,可能还需要对init、call、compile等方法进行重载,这里只针对简单的情况进行演示。
结合3.1中函数式API的方法,可通过如下代码实现
class mymodel(tf.keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pre = self(x, training=True)
loss_value = self.compiled_loss(y, y_pre)
self.optimizer.minimize(loss_value, self.trainable_weights, tape=tape)
self.compiled_metrics.update_state(y, y_pre)
return_metrics = {}
for metric in self.metrics:
result = metric.result()
return_metrics[metric.name] = result
return return_metrics
def test_step(self, data):
x, y = data
y_pre = self(x, training=False)
loss_value = self.compiled_loss(y, y_pre)
self.compiled_metrics.update_state(y, y_pre)
return_metrics = {}
for metric in self.metrics:
result = metric.result()
return_metrics[metric.name] = result
return return_metrics
model = mymodel(inputs=inputs, outputs=outputs)
model.summary()
model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
metrics=[
tf.keras.metrics.CategoricalAccuracy(),
tf.keras.metrics.TopKCategoricalAccuracy(k=3),
]
)
model.fit(train_dataset, batch_size=64, epochs=100, validation_data=test_dataset)
结合3.2中子类化模型的的方法,可通过如下代码来实现。
class ResNet(tf.keras.Model):
def __init__(self, outputs_shape=10):
super(ResNet, self).__init__()
self.conv2d_1 = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')
self.block_1 = ResidualBlock(64)
self.block_2 = ResidualBlock(128)
self.maxpooling = tf.keras.layers.MaxPooling2D()
self.globalpooling = tf.keras.layers.GlobalAvgPool2D()
self.dense = tf.keras.layers.Dense(128)
self.classifier = tf.keras.layers.Dense(outputs_shape, activation='softmax')
self.inputs = tf.keras.Input(shape=(32, 32, 3))
self.outputs = self.call(self.inputs)
def build_model(self):
return tf.keras.Model(inputs=self.inputs, outputs=self.call(self.inputs))
def call(self, inputs):
x = self.conv2d_1(inputs)
x = self.maxpooling(x)
x = self.block_1(x)
x = self.block_2(x)
x = self.globalpooling(x)
x = self.dense(x)
return self.classifier(x)
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pre = self(x, training=True)
loss_value = self.compiled_loss(y, y_pre)
self.optimizer.minimize(loss_value, self.trainable_weights, tape=tape)
self.compiled_metrics.update_state(y, y_pre)
return_metrics = {}
for metric in self.metrics:
result = metric.result()
return_metrics[metric.name] = result
return return_metrics
def test_step(self, data):
x, y = data
y_pre = self(x, training=False)
loss_value = self.compiled_loss(y, y_pre)
self.compiled_metrics.update_state(y, y_pre)
return_metrics = {}
for metric in self.metrics:
result = metric.result()
return_metrics[metric.name] = result
return return_metrics
model = ResNet()
model.build_model().summary()
model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
metrics=[
tf.keras.metrics.CategoricalAccuracy(),
tf.keras.metrics.TopKCategoricalAccuracy(k=3),
]
)
model.fit(train_dataset, batch_size=64, epochs=100, validation_data=test_dataset)
本文是根据一些个人的经验总结写成的Demo。
如果有疑问或问题,欢迎留言讨论指正!
版权声明:本文为For_learning原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。