基于Tensorflow2 和Keras的模型构建及训练过程的几种写法

基于Tensorflow2 和Keras的模型构建及训练过程的几种写法

针对深度神经网络,Tensorflow2和Keras提供了丰富的API来构建和训练模型。

本文尝试使用多种方法来构建模型、训练模型。

1. 加载Tensorflow

这里加载了tensorflow和time库,其中tensorflow版本为2.0+。

import tensorflow as tf
import time

2. 加载数据

本文使用CIFAR10数据库来进行实验,并通过tensorflow来加载数据。

其中数据的shape为(None, 32, 32, 3),(None, 1, 1)。

在训练前,需要对数据进行预处理,主要包括:

  • 图像归一化
  • 标签reshape为(None, 1)
  • 标签onehot编码
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.cifar10.load_data()

train_images, test_images = train_images / 255.0, test_images / 255.0

train_labels = tf.reshape(train_labels, shape=[train_labels.shape[0]])
test_labels = tf.reshape(test_labels, shape=[test_labels.shape[0]])
train_labels, test_labels = tf.one_hot(train_labels, depth=10), tf.one_hot(test_labels, depth=10)

预处理完后,构建Dataset示例,方便后期训练。

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_dataset = test_dataset.shuffle(buffer_size=1024).batch(64)

3. 构建模型

本文使用一个简单的ResNet结构的模型来进行实验。其中,模型共包括2个Residual Block。具体的模型结构见代码。

下面尝试使用多种方法来构建模型,如:

  • 函数式API
  • 子类化模型
  • 序列化模型

实际应用中,根据具体场景选取合适的构建方式即可。

3.1 函数式API

inputs = tf.keras.Input(shape=(32, 32, 3))
x = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')(inputs)
x = tf.keras.layers.MaxPooling2D()(x)

temp = x
for filter in [64, 128]:
    x = tf.keras.layers.Conv2D(filter, 3, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    
	x = tf.keras.layers.Conv2D(filter, 3, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    
    res = tf.keras.layers.Conv2D(filter, 1)(temp)
    x = tf.keras.layers.add([x, res])
    
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.MaxPooling2D()(x)
    
    temp = x

x = tf.keras.layers.GlobalAvgPool2D()(x)
x = tf.keras.layers.Dense(128)(x)
outputs = tf.keras.layers.Dense(10, activation='softmax')(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)

3.2 子类化模型

class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, filter, **kwargs):
        super(ResidualBlock, self).__init__(**kwargs)
        self.filter = filter
        self.conv2D_1 = tf.keras.layers.Conv2D(filter, 3, padding='same')
        self.conv2D_2 = tf.keras.layers.Conv2D(filter, 3, padding='same')
        self.conv2D_3 = tf.keras.layers.Conv2D(filter, 1)
        self.activation_1 = tf.keras.layers.ReLU()
        self.activation_2 = tf.keras.layers.ReLU()
        self.normlization_1 = tf.keras.layers.BatchNormalization()
        self.normlization_2 = tf.keras.layers.BatchNormalization()
        self.maxpooling = tf.keras.layers.MaxPooling2D()

    def call(self, inputs, *args, **kwargs):
        x = self.conv2D_1(inputs)
        x = self.normlization_1(x)
        x = self.activation_1(x)
        
        x = self.conv2D_2(x)
        x = self.normlization_2(x)
        
        res = self.conv2D_3(inputs)
        x = tf.keras.layers.add([x, res])
        
        x = self.activation_2(x)
        return self.maxpooling(x)


class ResNet(tf.keras.Model):
    def __init__(self, outputs_shape=10):
        super(ResNet, self).__init__()

        self.conv2d_1 = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')
        self.block_1 = ResidualBlock(64)
        self.block_2 = ResidualBlock(128)
        self.maxpooling = tf.keras.layers.MaxPooling2D()
        self.globalpooling = tf.keras.layers.GlobalAvgPool2D()
        self.dense = tf.keras.layers.Dense(128)
        self.classifier = tf.keras.layers.Dense(outputs_shape, activation='softmax')

        self.inputs = tf.keras.Input(shape=(32, 32, 3))
        self.outputs = self.call(self.inputs)

    def build_model(self):
        return tf.keras.Model(inputs=self.inputs, outputs=self.call(self.inputs))

    def call(self, inputs):
        x = self.conv2d_1(inputs)
        x = self.maxpooling(x)
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.globalpooling(x)
        x = self.dense(x)
        return self.classifier(x)
        
model = ResNet()
model.build_model().summary()

3.3 序列化模型

序列化模型通过构建tf.keras.Sequential类,通过add方法加入层。

由于序列化模型过于繁琐,这里不再赘述。

4. 训练模型

常用的模型训练方法主要包括三种:

  • 使用Keras内置方法,如compile,fit,predict等
  • 使用tensorflow自定义训练循环
  • 重载Keras的train_step和test_tesp

4.1 Keras内置方法

通过3.1,3.2和3.3 中所述的方法构建得到的模型可以直接通过Keras自带的方法进行训练。

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(),
        tf.keras.metrics.TopKCategoricalAccuracy(k=3),
    ]
)

model.fit(train_dataset, batch_size=64, epochs=100, validation_data=test_dataset)

4.2 自定义训练循环

Keras自带的训练方法在某些时候可能无法满足我们的需求,此时可以自定义训练循环。

# 优化器
optimizer = tf.keras.optimizers.Adam()

# 损失函数
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
loss_metrics = tf.keras.metrics.Mean()

# 评价指标
acc = tf.keras.metrics.CategoricalAccuracy()
topacc = tf.keras.metrics.TopKCategoricalAccuracy(k=3)

epochs = 100
for epoch in range(epochs):
    print("\nepoch {}". format(epoch))
    start_time = time.time()

    # train_step
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            y_batch_pre = model(x_batch_train, training=True) # 前向传播
            loss_value = loss(y_batch_train, y_batch_pre)
        optimizer.minimize(loss_value, model.trainable_weights, tape=tape) # 反向传播更新参数
        loss_metrics.update_state(loss_value)

        acc.update_state(y_batch_train, y_batch_pre)
        topacc.update_state(y_batch_train, y_batch_pre)

    train_time = time.time()
    print(' - train - time:{:.4f}\t loss:{:.4f}\t acc:{:.4f}\t top_acc:{:.4f}'
          .format(train_time-start_time, loss_metrics.result(), acc.result(), topacc.result()))

    loss_metrics.reset_state()
    acc.reset_state()
    topacc.reset_state()

    # test_step
    for x_batch_test, y_batch_test in test_dataset:
        y_batch_pre = model(x_batch_test, training=False)
        loss_value = loss(y_batch_test, y_batch_pre)
        loss_metrics.update_state(loss_value)

        acc.update_state(y_batch_test, y_batch_pre)
        topacc.update_state(y_batch_test, y_batch_pre)

    end_time = time.time()
    print(' - valid - time:{:.4f}\t loss:{:.4f}\t acc:{:.4f}\t top_acc:{:.4f}'
          .format(end_time-train_time, loss_metrics.result(), acc.result(), topacc.result()))

    loss_metrics.reset_state()
    acc.reset_state()
    topacc.reset_state()

4.3 重载train_step和test_step

类似的,可以通过重载tf.keras.Model类中的train_step和test_step方法来满足某些自定义的需求。

根据情况的不同,可能还需要对init、call、compile等方法进行重载,这里只针对简单的情况进行演示。

结合3.1中函数式API的方法,可通过如下代码实现

class mymodel(tf.keras.Model):
    def train_step(self, data):
        x, y = data
        with tf.GradientTape() as tape:
            y_pre = self(x, training=True)
            loss_value = self.compiled_loss(y, y_pre)
        self.optimizer.minimize(loss_value, self.trainable_weights, tape=tape)
        self.compiled_metrics.update_state(y, y_pre)

        return_metrics = {}
        for metric in self.metrics:
            result = metric.result()
            return_metrics[metric.name] = result
        return return_metrics

    def test_step(self, data):
        x, y = data
        y_pre = self(x, training=False)
        loss_value = self.compiled_loss(y, y_pre)
        self.compiled_metrics.update_state(y, y_pre)

        return_metrics = {}
        for metric in self.metrics:
            result = metric.result()
            return_metrics[metric.name] = result
        return return_metrics
    
model = mymodel(inputs=inputs, outputs=outputs)
model.summary()

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(),
        tf.keras.metrics.TopKCategoricalAccuracy(k=3),
    ]
)
model.fit(train_dataset, batch_size=64, epochs=100, validation_data=test_dataset)

结合3.2中子类化模型的的方法,可通过如下代码来实现。

class ResNet(tf.keras.Model):
    def __init__(self, outputs_shape=10):
        super(ResNet, self).__init__()

        self.conv2d_1 = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')
        self.block_1 = ResidualBlock(64)
        self.block_2 = ResidualBlock(128)
        self.maxpooling = tf.keras.layers.MaxPooling2D()
        self.globalpooling = tf.keras.layers.GlobalAvgPool2D()
        self.dense = tf.keras.layers.Dense(128)
        self.classifier = tf.keras.layers.Dense(outputs_shape, activation='softmax')

        self.inputs = tf.keras.Input(shape=(32, 32, 3))
        self.outputs = self.call(self.inputs)

    def build_model(self):
        return tf.keras.Model(inputs=self.inputs, outputs=self.call(self.inputs))

    def call(self, inputs):
        x = self.conv2d_1(inputs)
        x = self.maxpooling(x)
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.globalpooling(x)
        x = self.dense(x)
        return self.classifier(x)

    def train_step(self, data):
        x, y = data
        with tf.GradientTape() as tape:
            y_pre = self(x, training=True)
            loss_value = self.compiled_loss(y, y_pre)
        self.optimizer.minimize(loss_value, self.trainable_weights, tape=tape)
        self.compiled_metrics.update_state(y, y_pre)

        return_metrics = {}
        for metric in self.metrics:
            result = metric.result()
            return_metrics[metric.name] = result
        return return_metrics

    def test_step(self, data):
        x, y = data
        y_pre = self(x, training=False)
        loss_value = self.compiled_loss(y, y_pre)
        self.compiled_metrics.update_state(y, y_pre)

        return_metrics = {}
        for metric in self.metrics:
            result = metric.result()
            return_metrics[metric.name] = result
        return return_metrics
    
model = ResNet()
model.build_model().summary()

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(),
        tf.keras.metrics.TopKCategoricalAccuracy(k=3),
    ]
)
model.fit(train_dataset, batch_size=64, epochs=100, validation_data=test_dataset)

本文是根据一些个人的经验总结写成的Demo。

如果有疑问或问题,欢迎留言讨论指正!


版权声明:本文为For_learning原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。