利用Python+Tensorflow实现对MNIST数据集分类

环境:

python3.7

tensorflow 1.14

window

首先对MNIST数据集进行介绍:

MINST数据库是由Yann提供的手写数字数据库文件,其官方下载地址http://yann.lecun.com/exdb/mnist/,这个数据库主要包含了60000张的训练图像和10000张的测试图像,主要是下面的四个文件:

Training set images: train-images-idx3-ubyte.gz (9.9 MB, 解压后 47 MB, 包含 60,000 个样本)
Training set labels: train-labels-idx1-ubyte.gz (29 KB, 解压后 60 KB, 包含 60,000 个标签)
Test set images: t10k-images-idx3-ubyte.gz (1.6 MB, 解压后 7.8 MB, 包含 10,000 个样本)
Test set labels: t10k-labels-idx1-ubyte.gz (5KB, 解压后 10 KB, 包含 10,000 个标签)

数据集中图像都是28*28大小的灰度图像,每个像素的是一个八位字节(0~255),如下图:

该数据集是0~9的灰度图像数据。本次实验利用tensorflow来实现对该手写数字进行识别。

代码文件组织形式:

数据预处理

data_processing.py 该文件主要用来进行数据预处理:

# -*- coding:utf-8 -*-

import os
import gzip
import numpy as np
import matplotlib.pyplot as plt
from six.moves import urllib
import tensorflow as tf

PIXEL_SIZE = 255
IMAGE_SIZE = 28
NUM_CHANNEL = 1
WORK_DIRECTORY = "data"
SOURCE_URL = "http://yann.lecun.com/exdb/mnist/"

def get_data(data_path,num_images):
    """从压缩包中读取数据"""
	
    with gzip.open(data_path) as bytestream:
        bytestream.read(16)
	buf = bytestream.read(IMAGE_SIZE * IMAGE_SIZE * num_images * NUM_CHANNEL)
	data = np.frombuffer(buf,dtype = np.uint8).astype(np.float32)
	
        # 将数据缩放到[-0.5,0.5]
	data = (data - (PIXEL_SIZE / 2.0)) / PIXEL_SIZE
	data = data.reshape([num_images,IMAGE_SIZE,IMAGE_SIZE,NUM_CHANNEL])
	return data

def get_label(label_path,num_images):
    """从压缩包中获取标签数据"""
	
    with gzip.open(label_path) as bytestream:
        bytestream.read(8)
        buf = bytestream.read(num_images*1)
        labels = np.frombuffer(buf,dtype = np.uint8).astype(np.int64)
        return labels

def maybe_download(filename):
    """如果数据集不存在,则进行下载"""

    if not os.path.isdir(WORK_DIRECTORY):
        os.mkdir(WORK_DIRECTORY)
	file_path = os.path.join(WORK_DIRECTORY,filename)

	if not os.path.isfile(file_path):
            file_path, _ = urllib.request.urlretrieve(SOURCE_URL+filename,file_path)
            with tf.gfile.GFile(file_path) as f:
                size = f.size()
		print("Successfully download",filename,size,"bytes.")
	return file_path

def min_batch(data,labels,batch_size,start):
    """获取batch_size数据"""

    data_size = len(data)
    end = start + batch_size
    if end > data_size and start < data_size:
        start = data_size - batch_size
        end = data_size
    else:
        start = 0
        end = batch_size
    batch_data = data[start:end,:]
    batch_labels = labels[start:end]
	
    return batch_data,batch_labels

模型的搭建

本次实验模型结构如下图:

model.py 用来构建网络模型

# -*- coding:utf-8 -*-
import tensorflow as tf

SEED = 66478

#类别数
NUM_LABELS = 10

def conv2d(input_data,#卷积核输入
    kernel_x,#卷积核的长
    kernel_y,#卷积核的宽
    input_channels,#卷核输入通道数
    output_channels,#卷积核输出通道数
    name,
    strides=[1,1,1,1],#卷积核步长
    padding = "SAME"):
    """定义卷层输入"""

    conv_weights = tf.Variable(tf.truncated_normal(
	[kernel_x,kernel_y,input_channels,output_channels],
	stddev = 0.1,
	seed = SEED,
	dtype=tf.float32))
    conv_bias = tf.Variable(tf.zeros([output_channels],dtype = tf.float32))
    conv = tf.nn.conv2d(input_data,
	filter = conv_weights,
	strides = strides,
	padding = padding,
	name = name)
    return tf.nn.relu(tf.nn.bias_add(conv,conv_bias))

def fc(input_data,output_size):
    """定义全连接层"""

    input_shape = input_data.get_shape().as_list()

    fc_weights = tf.Variable(tf.truncated_normal(
	[input_shape[1],output_size],
	stddev = 0.1,
	seed = SEED,
	dtype = tf.float32))
    fc_bias = tf.Variable(tf.constant(
	0.1,
	shape=[output_size],
	dtype = tf.float32))

    return tf.matmul(input_data,fc_weights) + fc_bias,fc_weights,fc_bias

def model(batch_size,data_size,train = True):
    """搭建网络模型(Lenet)"""
	
    model_data = []

    x = tf.placeholder(tf.float32,shape = (batch_size,28,28,1))
    y = tf.placeholder(tf.int64,shape = (batch_size,))
    model_data.append((x,y))
    conv1 = conv2d(x,5,5,1,32,name = "conv1")
    pool1 = tf.nn.max_pool(conv1,
	ksize = [1,2,2,1],
	strides = [1,2,2,1],
	padding = 'SAME',
	name = 'pool1')

    conv2 = conv2d(pool1,5,5,32,64,name = "conv2")
    pool2 = tf.nn.max_pool(conv2,
	ksize = [1,2,2,1],
	strides = [1,2,2,1],
	padding = 'SAME',
	name = 'pool2')
    flatten = tf.contrib.layers.flatten(pool2)
    fc1_mul,fc1_weights,fc1_biases = fc(flatten,512)
    fc1 = tf.nn.relu(fc1_mul)
    if train:
        fc1 = tf.nn.dropout(fc1,0.5,seed = SEED)

    logits,fc2_weights,fc2_biases = fc(fc1,NUM_LABELS)

    with tf.name_scope("metrics"):
	loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
		labels = y,
		logits = logits))
	regularizers = (tf.nn.l2_loss(fc1_weights) + tf.nn.l2_loss(fc1_biases) +
		tf.nn.l2_loss(fc2_weights) + tf.nn.l2_loss(fc2_biases))
	loss += 5e-4 * regularizers
		
	y_ = tf.argmax(tf.nn.softmax(logits),axis = 1)
	accuracy = tf.reduce_mean(tf.cast(tf.equal(y,y_),tf.float32))
	model_data.append((loss,accuracy))

    batch = tf.Variable(0,tf.float32)
    if train:
	# 指数衰减学习率
	learning_rate = tf.train.exponential_decay(0.01, 
	batch * batch_size,
	data_size,
	0.95,
	staircase = True)

    optimizer = tf.train.MomentumOptimizer(learning_rate,
	0.9).minimize(loss,global_step = batch)
	model_data.append((optimizer))

    return model_data

模型训练

train.py用来对模型进行训练

# -*- coding:utf-8 -*-
from data_processing import get_data,get_label,maybe_download,min_batch
from model import model
from six.moves import xrange
import tensorflow as tf
import numpy as np
import time

# 从远程服务器获取数据集
train_data_path = maybe_download("train-images-idx3-ubyte.gz")
train_label_path= maybe_download("train-labels-idx1-ubyte.gz")

# 获取数据集
train_data = get_data(train_data_path,60000)
train_labels = get_label(train_label_path,60000)

# 训练轮数
NUM_EPOCHS = 10

# min-batch大小
BATCH_SIZE = 64

TRAIN_CHECK_POINT = "check_point/"
# 验证集数据
VALIDATION_SIZE = 5000

validation_data = train_data[:VALIDATION_SIZE,...]
validation_labels = train_labels[:VALIDATION_SIZE]

train_data = train_data[VALIDATION_SIZE:,...]
train_labels = train_labels[VALIDATION_SIZE:]
train_size = len(train_data)


data = model(BATCH_SIZE,train_size)
x,y = data[0]
loss,accuracy = data[1]
optimizer = data[2]
start_time = time.time()

# 启用模型参数文件保存
saver = tf.train.Saver()

with tf.Session() as sess:
    # 变量参数初始化
    tf.global_variables_initializer().run()

    for epoch in range(NUM_EPOCHS):
	p = np.random.permutation(train_size)
	train_data = train_data[p]
	train_labels = train_labels[p]
	start = 0
	for step in range(train_size // BATCH_SIZE):
	    batch_data,batch_labels = min_batch(train_data,train_labels,BATCH_SIZE,start)
	    train_loss,train_acc,_ =sess.run([loss,accuracy,optimizer],
                            feed_dict={x:batch_data,y:batch_labels})
	    start = (start+1) % train_size
        elapsed_time = time.time() - start_time
	print("epoch:{},train_loss:{:.2f},train_acc:{:.2f},time:{:.1f} ms".format(epoch,
                    train_loss,train_acc,elapsed_time))
	start_time = time.time()
	validation_acc = []
	v_start = 0
	for num in range(VALIDATION_SIZE // BATCH_SIZE):
            v_batch_data,v_batch_labels = min_batch(validation_data,
                            validation_labels,BATCH_SIZE,v_start)
            valid_loss,valid_acc = sess.run([loss,accuracy],feed_dict={x:v_batch_data,
                            y:v_batch_labels})
	    v_start = (v_start+1) % VALIDATION_SIZE
	    validation_acc.append(valid_acc)
	avg_valid_acc = np.mean(validation_acc)
	print("epoch:{},validation_avg_acc:{:.2f}".format(epoch,avg_valid_acc))

	# 每个epoch保存一次模型参数
	saver.save(sess,TRAIN_CHECK_POINT + "train-ckpt",global_step = epoch)
	print("第{}个epoch模型参数文件已经被保存{}".format(epoch,TRAIN_CHECK_POINT+"train-ckpt"+"-"+str(epoch)))

模型测试

test.py 对模型进行测试

# -*- coding:utf-8 -*-
from data_processing import get_data,get_label,maybe_download,min_batch
from model import model
from six.moves import xrange
import tensorflow as tf
import numpy as np

# 从远程服务器获取数据集
test_data_path = maybe_download("t10k-images-idx3-ubyte.gz")
test_label_path= maybe_download("t10k-labels-idx1-ubyte.gz")

test_data = get_data(test_data_path,10000)
test_labels = get_label(test_label_path,10000)

# min-batch大小
BATCH_SIZE = 1

TRAIN_CHECK_POINT = "check_point/train-ckpt-9"

test_size = len(test_labels)


data = model(1,test_size,False)
x,y = data[0]
loss,accuracy = data[1]

# 从模型文件读取训练好的参数
restorer = tf.train.Saver()

with tf.Session() as sess:
    # 变量参数初始化
    tf.global_variables_initializer().run()
    restorer.restore(sess,TRAIN_CHECK_POINT)
    test_acc = []
    for i in range(test_size):
	batch_data = test_data[i,...].reshape([1,28,28,1])
	batch_labels = np.array([test_labels[i]])
	test_accuarcy = sess.run(accuracy,feed_dict={x:batch_data,y:batch_labels})
		test_acc.append(test_accuarcy)
    test_avg_acc = np.mean(test_acc)
    print("Test_avg_acc={:.2f}".format(test_avg_acc))

至此我们完成了模型搭建,训练,测试,过程。


版权声明:本文为weixin_44402973原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。