环境:
python3.7
tensorflow 1.14
window
首先对MNIST数据集进行介绍:
MINST数据库是由Yann提供的手写数字数据库文件,其官方下载地址http://yann.lecun.com/exdb/mnist/,这个数据库主要包含了60000张的训练图像和10000张的测试图像,主要是下面的四个文件:
Training set images: train-images-idx3-ubyte.gz (9.9 MB, 解压后 47 MB, 包含 60,000 个样本)
Training set labels: train-labels-idx1-ubyte.gz (29 KB, 解压后 60 KB, 包含 60,000 个标签)
Test set images: t10k-images-idx3-ubyte.gz (1.6 MB, 解压后 7.8 MB, 包含 10,000 个样本)
Test set labels: t10k-labels-idx1-ubyte.gz (5KB, 解压后 10 KB, 包含 10,000 个标签)
数据集中图像都是28*28大小的灰度图像,每个像素的是一个八位字节(0~255),如下图:
该数据集是0~9的灰度图像数据。本次实验利用tensorflow来实现对该手写数字进行识别。
代码文件组织形式:
数据预处理
data_processing.py 该文件主要用来进行数据预处理:
# -*- coding:utf-8 -*-
import os
import gzip
import numpy as np
import matplotlib.pyplot as plt
from six.moves import urllib
import tensorflow as tf
PIXEL_SIZE = 255
IMAGE_SIZE = 28
NUM_CHANNEL = 1
WORK_DIRECTORY = "data"
SOURCE_URL = "http://yann.lecun.com/exdb/mnist/"
def get_data(data_path,num_images):
"""从压缩包中读取数据"""
with gzip.open(data_path) as bytestream:
bytestream.read(16)
buf = bytestream.read(IMAGE_SIZE * IMAGE_SIZE * num_images * NUM_CHANNEL)
data = np.frombuffer(buf,dtype = np.uint8).astype(np.float32)
# 将数据缩放到[-0.5,0.5]
data = (data - (PIXEL_SIZE / 2.0)) / PIXEL_SIZE
data = data.reshape([num_images,IMAGE_SIZE,IMAGE_SIZE,NUM_CHANNEL])
return data
def get_label(label_path,num_images):
"""从压缩包中获取标签数据"""
with gzip.open(label_path) as bytestream:
bytestream.read(8)
buf = bytestream.read(num_images*1)
labels = np.frombuffer(buf,dtype = np.uint8).astype(np.int64)
return labels
def maybe_download(filename):
"""如果数据集不存在,则进行下载"""
if not os.path.isdir(WORK_DIRECTORY):
os.mkdir(WORK_DIRECTORY)
file_path = os.path.join(WORK_DIRECTORY,filename)
if not os.path.isfile(file_path):
file_path, _ = urllib.request.urlretrieve(SOURCE_URL+filename,file_path)
with tf.gfile.GFile(file_path) as f:
size = f.size()
print("Successfully download",filename,size,"bytes.")
return file_path
def min_batch(data,labels,batch_size,start):
"""获取batch_size数据"""
data_size = len(data)
end = start + batch_size
if end > data_size and start < data_size:
start = data_size - batch_size
end = data_size
else:
start = 0
end = batch_size
batch_data = data[start:end,:]
batch_labels = labels[start:end]
return batch_data,batch_labels
模型的搭建
本次实验模型结构如下图:
model.py 用来构建网络模型
# -*- coding:utf-8 -*-
import tensorflow as tf
SEED = 66478
#类别数
NUM_LABELS = 10
def conv2d(input_data,#卷积核输入
kernel_x,#卷积核的长
kernel_y,#卷积核的宽
input_channels,#卷核输入通道数
output_channels,#卷积核输出通道数
name,
strides=[1,1,1,1],#卷积核步长
padding = "SAME"):
"""定义卷层输入"""
conv_weights = tf.Variable(tf.truncated_normal(
[kernel_x,kernel_y,input_channels,output_channels],
stddev = 0.1,
seed = SEED,
dtype=tf.float32))
conv_bias = tf.Variable(tf.zeros([output_channels],dtype = tf.float32))
conv = tf.nn.conv2d(input_data,
filter = conv_weights,
strides = strides,
padding = padding,
name = name)
return tf.nn.relu(tf.nn.bias_add(conv,conv_bias))
def fc(input_data,output_size):
"""定义全连接层"""
input_shape = input_data.get_shape().as_list()
fc_weights = tf.Variable(tf.truncated_normal(
[input_shape[1],output_size],
stddev = 0.1,
seed = SEED,
dtype = tf.float32))
fc_bias = tf.Variable(tf.constant(
0.1,
shape=[output_size],
dtype = tf.float32))
return tf.matmul(input_data,fc_weights) + fc_bias,fc_weights,fc_bias
def model(batch_size,data_size,train = True):
"""搭建网络模型(Lenet)"""
model_data = []
x = tf.placeholder(tf.float32,shape = (batch_size,28,28,1))
y = tf.placeholder(tf.int64,shape = (batch_size,))
model_data.append((x,y))
conv1 = conv2d(x,5,5,1,32,name = "conv1")
pool1 = tf.nn.max_pool(conv1,
ksize = [1,2,2,1],
strides = [1,2,2,1],
padding = 'SAME',
name = 'pool1')
conv2 = conv2d(pool1,5,5,32,64,name = "conv2")
pool2 = tf.nn.max_pool(conv2,
ksize = [1,2,2,1],
strides = [1,2,2,1],
padding = 'SAME',
name = 'pool2')
flatten = tf.contrib.layers.flatten(pool2)
fc1_mul,fc1_weights,fc1_biases = fc(flatten,512)
fc1 = tf.nn.relu(fc1_mul)
if train:
fc1 = tf.nn.dropout(fc1,0.5,seed = SEED)
logits,fc2_weights,fc2_biases = fc(fc1,NUM_LABELS)
with tf.name_scope("metrics"):
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
labels = y,
logits = logits))
regularizers = (tf.nn.l2_loss(fc1_weights) + tf.nn.l2_loss(fc1_biases) +
tf.nn.l2_loss(fc2_weights) + tf.nn.l2_loss(fc2_biases))
loss += 5e-4 * regularizers
y_ = tf.argmax(tf.nn.softmax(logits),axis = 1)
accuracy = tf.reduce_mean(tf.cast(tf.equal(y,y_),tf.float32))
model_data.append((loss,accuracy))
batch = tf.Variable(0,tf.float32)
if train:
# 指数衰减学习率
learning_rate = tf.train.exponential_decay(0.01,
batch * batch_size,
data_size,
0.95,
staircase = True)
optimizer = tf.train.MomentumOptimizer(learning_rate,
0.9).minimize(loss,global_step = batch)
model_data.append((optimizer))
return model_data
模型训练
train.py用来对模型进行训练
# -*- coding:utf-8 -*-
from data_processing import get_data,get_label,maybe_download,min_batch
from model import model
from six.moves import xrange
import tensorflow as tf
import numpy as np
import time
# 从远程服务器获取数据集
train_data_path = maybe_download("train-images-idx3-ubyte.gz")
train_label_path= maybe_download("train-labels-idx1-ubyte.gz")
# 获取数据集
train_data = get_data(train_data_path,60000)
train_labels = get_label(train_label_path,60000)
# 训练轮数
NUM_EPOCHS = 10
# min-batch大小
BATCH_SIZE = 64
TRAIN_CHECK_POINT = "check_point/"
# 验证集数据
VALIDATION_SIZE = 5000
validation_data = train_data[:VALIDATION_SIZE,...]
validation_labels = train_labels[:VALIDATION_SIZE]
train_data = train_data[VALIDATION_SIZE:,...]
train_labels = train_labels[VALIDATION_SIZE:]
train_size = len(train_data)
data = model(BATCH_SIZE,train_size)
x,y = data[0]
loss,accuracy = data[1]
optimizer = data[2]
start_time = time.time()
# 启用模型参数文件保存
saver = tf.train.Saver()
with tf.Session() as sess:
# 变量参数初始化
tf.global_variables_initializer().run()
for epoch in range(NUM_EPOCHS):
p = np.random.permutation(train_size)
train_data = train_data[p]
train_labels = train_labels[p]
start = 0
for step in range(train_size // BATCH_SIZE):
batch_data,batch_labels = min_batch(train_data,train_labels,BATCH_SIZE,start)
train_loss,train_acc,_ =sess.run([loss,accuracy,optimizer],
feed_dict={x:batch_data,y:batch_labels})
start = (start+1) % train_size
elapsed_time = time.time() - start_time
print("epoch:{},train_loss:{:.2f},train_acc:{:.2f},time:{:.1f} ms".format(epoch,
train_loss,train_acc,elapsed_time))
start_time = time.time()
validation_acc = []
v_start = 0
for num in range(VALIDATION_SIZE // BATCH_SIZE):
v_batch_data,v_batch_labels = min_batch(validation_data,
validation_labels,BATCH_SIZE,v_start)
valid_loss,valid_acc = sess.run([loss,accuracy],feed_dict={x:v_batch_data,
y:v_batch_labels})
v_start = (v_start+1) % VALIDATION_SIZE
validation_acc.append(valid_acc)
avg_valid_acc = np.mean(validation_acc)
print("epoch:{},validation_avg_acc:{:.2f}".format(epoch,avg_valid_acc))
# 每个epoch保存一次模型参数
saver.save(sess,TRAIN_CHECK_POINT + "train-ckpt",global_step = epoch)
print("第{}个epoch模型参数文件已经被保存{}".format(epoch,TRAIN_CHECK_POINT+"train-ckpt"+"-"+str(epoch)))
模型测试
test.py 对模型进行测试
# -*- coding:utf-8 -*-
from data_processing import get_data,get_label,maybe_download,min_batch
from model import model
from six.moves import xrange
import tensorflow as tf
import numpy as np
# 从远程服务器获取数据集
test_data_path = maybe_download("t10k-images-idx3-ubyte.gz")
test_label_path= maybe_download("t10k-labels-idx1-ubyte.gz")
test_data = get_data(test_data_path,10000)
test_labels = get_label(test_label_path,10000)
# min-batch大小
BATCH_SIZE = 1
TRAIN_CHECK_POINT = "check_point/train-ckpt-9"
test_size = len(test_labels)
data = model(1,test_size,False)
x,y = data[0]
loss,accuracy = data[1]
# 从模型文件读取训练好的参数
restorer = tf.train.Saver()
with tf.Session() as sess:
# 变量参数初始化
tf.global_variables_initializer().run()
restorer.restore(sess,TRAIN_CHECK_POINT)
test_acc = []
for i in range(test_size):
batch_data = test_data[i,...].reshape([1,28,28,1])
batch_labels = np.array([test_labels[i]])
test_accuarcy = sess.run(accuracy,feed_dict={x:batch_data,y:batch_labels})
test_acc.append(test_accuarcy)
test_avg_acc = np.mean(test_acc)
print("Test_avg_acc={:.2f}".format(test_avg_acc))
至此我们完成了模型搭建,训练,测试,过程。
版权声明:本文为weixin_44402973原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。