基于卷积神经网络的手写数字识别theano

http://www.lxway.com/4464045452.htm

这里先简单介绍一下系统功能,本文实现了通过图形界面输入手写数字,然后传输到服务器端,由服务器得到输出结果并显示。

下面直接上代码吧,这一段是手写字输入代码,输入之后图片保存为tmp.jpg然后通过socket传输到服务器。这里代码我是在windows下实现的,需要有opencv支持,PIL有点用不惯。这里还用到了pygame。

 

import os
from socket import *
import struct
import pygame
from pygame.locals import *
import math
import cv2
import numpy
import pickle
import Image

#传送图片
def sendPicture(pic='tmp.jpg',address=('192.168.1.200',8003)):
    sendSock = socket(AF_INET,SOCK_STREAM)
    BUFSIZE = 1024
    fp = open(pic,'rb')
    FILEINFO_SIZE=struct.calcsize('128s32sI8s')
    sendSock.connect(address)
    fhead=struct.pack('128s11I',pic,0,0,0,0,0,0,0,0,os.stat(pic).st_size,0,0)
    sendSock.send(fhead)
    while 1:
        filedata = fp.read(BUFSIZE)
        if not filedata: break
        sendSock.send(filedata)
    fp.close()
    sendSock.close()
    print "传输完毕,连接已关闭..."

class Brush():
    def __init__(self, screen):
        self.screen = screen
        self.color = (0, 0, 0)
        self.size  = 1
        self.drawing = False
        self.last_pos = None
        self.space = 1
        # if style is True, normal solid brush
        # if style is False, png brush
        self.style = False
        # load brush style png
        self.brush = pygame.image.load("brush.png").convert_alpha()
        # set the current brush depends on size
        self.brush_now = self.brush.subsurface((0,0), (1, 1))
 
    def start_draw(self, pos):
        self.drawing = True
        self.last_pos = pos
    def end_draw(self):
        self.drawing = False
 
    def set_brush_style(self, style):
        print "* set brush style to", style
        self.style = style
    def get_brush_style(self):
        return self.style
 
    def set_size(self, size):
        if size < 0.5: size = 0.5
        elif size > 50: size = 50
        print "* set brush size to", size
        self.size = size
        self.brush_now = self.brush.subsurface((0,0), (size*2, size*2))
    def get_size(self):
        return self.size
 
    def draw(self, pos):
        if self.drawing:
            for p in self._get_points(pos):
                # draw eveypoint between them
                if self.style == False:
                    pygame.draw.circle(self.screen,
                            self.color, p, self.size)
                else:
                    self.screen.blit(self.brush_now, p)
 
            self.last_pos = pos
 
    def _get_points(self, pos):
        """ Get all points between last_point ~ now_point. """
        points = [ (self.last_pos[0], self.last_pos[1]) ]
        len_x = pos[0] - self.last_pos[0]
        len_y = pos[1] - self.last_pos[1]
        length = math.sqrt(len_x  2 + len_y  2)
        step_x = len_x / length
        step_y = len_y / length
        for i in xrange(int(length)):
            points.append(
                    (points[-1][0] + step_x, points[-1][1] + step_y))
        points = map(lambda x:(int(0.5+x[0]), int(0.5+x[1])), points)
        # return light-weight, uniq list
        return list(set(points))
 
class Painter():
    def __init__(self):
        self.screen = pygame.display.set_mode((56, 56))
        pygame.display.set_caption("Painter")
        self.clock = pygame.time.Clock()
        self.brush = Brush(self.screen)
        self.image = pygame.image     
       
    def run(self):
        self.screen.fill((255, 255, 255))
        while True:
            # max fps limit
            self.clock.tick(30)
            for event in pygame.event.get():
                if event.type == QUIT:
                    return
                elif event.type == KEYDOWN:# press esc to clear screen
                    if event.key == K_ESCAPE:
                        #保存图片并传输,用于字符识别
                        self.image.save(self.screen,"tmp.jpg") #保存图片然后清屏 
                        sendPicture()                                          
                        self.screen.fill((255, 255, 255))
                elif event.type == MOUSEBUTTONDOWN: #
                    self.brush.start_draw(event.pos)
                elif event.type == MOUSEMOTION:
                    self.brush.draw(event.pos)
                elif event.type == MOUSEBUTTONUP:
                    self.brush.end_draw()
            pygame.display.update()
 
if __name__ == '__main__':
    ocr = Painter()
    ocr.run()

下面的代码是关键,这里实现了接收图片信息,识别之后打印识别结果的功能。

import os
import sys
from socket import *
import struct
import time
import numpy
import cPickle
import cv2
import theano
import theano.tensor as T
from theano.tensor.signal import downsample
from theano.tensor.nnet import conv
from logistic import LogisticRegression, load_data
from mlp import HiddenLayer
rng = numpy.random.RandomState(1234)

def receive(address = ('192.168.1.200',8003)):
    BUFSIZE = 1024
    FILEINFO_SIZE=struct.calcsize('128s32sI8s')
    recvSock = socket(AF_INET,SOCK_STREAM)
    recvSock.bind(address)
    recvSock.listen(True)
    conn,addr = recvSock.accept()
    fhead = conn.recv(FILEINFO_SIZE)
    filename,temp1,filesize,temp2=struct.unpack('128s32sI8s',fhead)
    fp = open('tmp.jpg','wb')
    restsize = filesize
    while 1:
        if restsize > BUFSIZE:
            filedata = conn.recv(BUFSIZE)
        else:
            filedata = conn.recv(restsize)
        if not filedata: break
        fp.write(filedata)
        restsize = restsize-len(filedata)
        if restsize == 0:
         break
    fp.close()
    conn.close()
    recvSock.close()
    print "接收完毕,连接已关闭..."

class LeNetConvPoolLayer(object):
    """Pool Layer of a convolutional network """

    def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
        """      
        :param filter_shape: (number of filters, num input feature maps,   
                              filter height, filter width)

        :param image_shape: (batch size, num input feature maps,   
                             image height, image width)
        """

        assert image_shape[1] == filter_shape[1]
        self.input = input

        # there are "num input feature maps * filter height * filter width"
        # inputs to each hidden unit
        fan_in = numpy.prod(filter_shape[1:])  
        # each unit in the lower layer receives a gradient from:
        # "num output feature maps * filter height * filter width" /
        #   pooling size
        fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /  
                   numpy.prod(poolsize))
        # initialize weights with random weights
        W_bound = numpy.sqrt(6. / (fan_in + fan_out))
        self.W = theano.shared(
            numpy.asarray(
                rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),  
                dtype=theano.config.floatX
            ),
            borrow=True
        )

        # the bias is a 1D tensor -- one bias per output feature map
        b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
        self.b = theano.shared(value=b_values, borrow=True)

        # convolve input feature maps with filters
        conv_out = conv.conv2d(
            input=input,
            filters=self.W,
            filter_shape=filter_shape,
            image_shape=image_shape
        )
        # downsample each feature map individually, using maxpooling
        pooled_out = downsample.max_pool_2d(
            input=conv_out,
            ds=poolsize,
            ignore_border=True
        )
        # add the bias term. Since the bias is a vector (1D array), we first
        # reshape it to a tensor of shape (1, n_filters, 1, 1). Each bias will
        # thus be broadcasted across mini-batches and feature map
        # width & height
        self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
        '''
        http://deeplearning.net/software/theano/library/tensor/basic.html
        '''
        # store parameters of this layer
        self.params = [self.W, self.b]

def evaluate_lenet5(learning_rate=0.1, n_epochs=3,
                    dataset='tmp.jpg',
                    nkerns=[20,50], batch_size=500):   #nkerns=[20, 50]
  
    # allocate symbolic variables for the data
    index = T.lscalar()  # index to a [mini]batch
    # start-snippet-1
    x = T.matrix('x')   # the data is presented as rasterized images
    y = T.ivector('y')  # the labels are presented as 1D vector of
                        # [int] labels

    '''
    ######################
    # BUILD ACTUAL MODEL #
    ######################
    '''
    print '... building the model'

    # allocate symbolic variables for the data
    index = T.lscalar()  # index to a [mini]batch
    # start-snippet-1
    x = T.matrix('x')   # the data is presented as rasterized images
    y = T.ivector('y')  # the labels are presented as 1D vector of
                        # [int] labels

    # Reshape matrix of rasterized images of shape (batch_size, 28 * 28)
    # to a 4D tensor, compatible with our LeNetConvPoolLayer
    # (28, 28) is the size of MNIST images.
    layer0_input = x.reshape((batch_size, 1, 28, 28))   #batch_size 500
    # Construct the first convolutional pooling layer:
    # filtering reduces the image size to (28-5+1 , 28-5+1) = (24, 24)
    # maxpooling reduces this further to (24/2, 24/2) = (12, 12)
    # 4D output tensor is thus of shape (batch_size, nkerns[0], 12, 12)
    layer0 = LeNetConvPoolLayer(
        rng,
        input=layer0_input,
        image_shape=(batch_size, 1, 28, 28),
        filter_shape=(nkerns[0], 1, 5, 5),
        poolsize=(2, 2)
    )
    # Construct the second convolutional pooling layer
    # filtering reduces the image size to (12-5+1, 12-5+1) = (8, 8)
    # maxpooling reduces this further to (8/2, 8/2) = (4, 4)
    # 4D output tensor is thus of shape (batch_size, nkerns[1], 4, 4)
    layer1 = LeNetConvPoolLayer(
        rng,
        input=layer0.output,
        image_shape=(batch_size, nkerns[0], 12, 12),
        filter_shape=(nkerns[1], nkerns[0], 5, 5),
        poolsize=(2, 2)
    )
    '''
    # the HiddenLayer being fully-connected, it operates on 2D matrices of
    # shape (batch_size, num_pixels) (i.e matrix of rasterized images).
    # This will generate a matrix of shape (batch_size, nkerns[1] * 4 * 4),
    # or (500, 50 * 4 * 4) = (500, 800) with the default values.
    '''
    layer2_input = layer1.output.flatten(2)   
    '''
    a=numpy.array([[[1,2],[3,4]],[[5,6],[7,8]],[[5,6],[7,8]]])
    b=a.flatten(2)
    print b
    输出[1 5 5  3 7 7  2 6 6  4 8 8]
    '''
    # construct a fully-connected sigmoidal layer
    layer2 = HiddenLayer(
        rng,
        input=layer2_input,
        n_in=nkerns[1] * 4 * 4,
        n_out=500,
        activation=T.tanh
    )
    # classify the values of the fully-connected sigmoidal layer
    layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
    # create a list of all model parameters to be fit by gradient descent
    #params = layer3.params + layer2.params + layer1.params + layer0.params
    layers = [layer0, layer1, layer2, layer3] 


    '''
    ###############
    # TRAIN MODEL #
    ###############
    '''
    save_file = open('weightCNN')
    for layer in layers:
        layer.W.set_value(cPickle.load(save_file),borrow=True)
        layer.b.set_value(cPickle.load(save_file),borrow=True)
    save_file.close()


    img = cv2.imread(dataset,0)
    img = 255-img
    img = cv2.resize(img,(0,0),fx=0.5, fy=0.5,interpolation=cv2.INTER_NEAREST)
    #test
    cv2.imwrite('img.jpg',img)

    img = img.reshape(28*28)
    img = numpy.mat(img)
    imgs = theano.shared(numpy.asarray(img,dtype=theano.config.floatX),borrow=True)
    
    tests = theano.function(
            inputs = [],
            outputs =  (layer3.y_pred,layer3.p_y_given_x),
            givens = [(x,imgs/255)]          
        )

    result ,  probability = tests()
    #print result
    return result ,probability
    #print probability
    
    
if __name__ == '__main__':
    while 1:
        receive()
        result,probability= evaluate_lenet5(batch_size=1)
        print result
        print probability

下面是logistic中的几个函数

 

def load_data(dataset):      #'mnist.pkl.gz'
    
    print '... loading data'

    # Load the dataset
    f = gzip.open(dataset, 'rb')
    train_set, valid_set, test_set = cPickle.load(f)
    xx,yy = test_set
    print yy[0:10]
    f.close()
    #train_set, valid_set, test_set format: tuple(input, target)
    #input is an numpy.ndarray of 2 dimensions (a matrix)
    #witch row's correspond to an example. target is a
    #numpy.ndarray of 1 dimensions (vector)) that have the same length as
    #the number of rows in the input. It should give the target
    #target to the example with the same index in the input.

    def shared_dataset(data_xy, borrow=True):
        """ Function that loads the dataset into shared variables

        The reason we store our dataset in shared variables is to allow
        Theano to copy it into the GPU memory (when code is run on GPU).
        Since copying data into the GPU is slow, copying a minibatch everytime
        is needed (the default behaviour if the data is not in a shared
        variable) would lead to a large decrease in performance.
        """
        data_x, data_y = data_xy
        shared_x = theano.shared(numpy.asarray(data_x,
                                               dtype=theano.config.floatX),
                                 borrow=borrow)
        shared_y = theano.shared(numpy.asarray(data_y,
                                               dtype=theano.config.floatX),
                                 borrow=borrow)
        # When storing data on the GPU it has to be stored as floats
        # therefore we will store the labels as ``floatX`` as well
        # (``shared_y`` does exactly that). But during our computations
        # we need them as ints (we use labels as index, and if they are
        # floats it doesn't make sense) therefore instead of returning
        # ``shared_y`` we will have to cast it to int. This little hack
        # lets ous get around this issue
        return shared_x, T.cast(shared_y, 'int32')

    test_set_x, test_set_y = shared_dataset(test_set)
    valid_set_x, valid_set_y = shared_dataset(valid_set)
    train_set_x, train_set_y = shared_dataset(train_set)

    rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]
    return rval

class LogisticRegression(object):
    """Multi-class Logistic Regression Class

    The logistic regression is fully described by a weight matrix :math:`W`
    and bias vector :math:`b`. Classification is done by projecting data
    points onto a set of hyperplanes, the distance to which is used to
    determine a class membership probability.
    """

    def __init__(self, input, n_in, n_out):
        """ Initialize the parameters of the logistic regression

        :type input: theano.tensor.TensorType
        :param input: symbolic variable that describes the input of the
                      architecture (one minibatch)

        :type n_in: int
        :param n_in: number of input units, the dimension of the space in
                     which the datapoints lie

        :type n_out: int
        :param n_out: number of output units, the dimension of the space in
                      which the labels lie

        """
        # start-snippet-1
        # initialize with 0 the weights W as a matrix of shape (n_in, n_out)
        self.W = theano.shared(
            value=numpy.zeros(
                (n_in, n_out),
                dtype=theano.config.floatX
            ),
            name='W',
            borrow=True
        )
        # initialize the baises b as a vector of n_out 0s
        self.b = theano.shared(
            value=numpy.zeros(
                (n_out,),
                dtype=theano.config.floatX
            ),
            name='b',
            borrow=True
        )

        # symbolic expression for computing the matrix of class-membership
        # probabilities
        # Where:
        # W is a matrix where column-k represent the separation hyper plain for
        # class-k
        # x is a matrix where row-j  represents input training sample-j
        # b is a vector where element-k represent the free parameter of hyper
        # plain-k
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) #softmax()函数 ,求出概率

        # symbolic description of how to compute prediction as class whose
        # probability is maximal
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)   # argmax(f(x))是使得 f(x)取得最大值所对应的变量x
        # end-snippet-1

        # parameters of the model
        self.params = [self.W, self.b]

    def negative_log_likelihood(self, y): 
        """Return the mean of the negative log-likelihood of the prediction
        of this model under a given target distribution.

        .. math::

            \frac{1}{|\mathcal{D}|} \mathcal{L} (\theta=\{W,b\}, \mathcal{D}) =
            \frac{1}{|\mathcal{D}|} \sum_{i=0}^{|\mathcal{D}|}
                \log(P(Y=y^{(i)}|x^{(i)}, W,b)) \\
            \ell (\theta=\{W,b\}, \mathcal{D})

        :type y: theano.tensor.TensorType
        :param y: corresponds to a vector that gives for each example the
                  correct label

        Note: we use the mean instead of the sum so that
              the learning rate is less dependent on the batch size
        """
        # start-snippet-2
        # y.shape[0] is (symbolically) the number of rows in y, i.e.,
        # number of examples (call it n) in the minibatch
        # T.arange(y.shape[0]) is a symbolic vector which will contain
        # [0,1,2,... n-1] T.log(self.p_y_given_x) is a matrix of
        # Log-Probabilities (call it LP) with one row per example and
        # one column per class LP[T.arange(y.shape[0]),y] is a vector
        # v containing [LP[0,y[0]], LP[1,y[1]], LP[2,y[2]], ...,
        # LP[n-1,y[n-1]]] and T.mean(LP[T.arange(y.shape[0]),y]) is
        # the mean (across minibatch examples) of the elements in v,
        # i.e., the mean log-likelihood across the minibatch.
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])     
        # end-snippet-2

    def errors(self, y):
        """Return a float representing the number of errors in the minibatch
        over the total number of examples of the minibatch ; zero one
        loss over the size of the minibatch

        :type y: theano.tensor.TensorType
        :param y: corresponds to a vector that gives for each example the
                  correct label
        """

        # check if y has same dimension of y_pred
        if y.ndim != self.y_pred.ndim:
            raise TypeError(
                'y should have the same shape as self.y_pred',
                ('y', y.type, 'y_pred', self.y_pred.type)
            )
        # check if y is of the correct datatype
        if y.dtype.startswith('int'):
            # the T.neq operator returns a vector of 0s and 1s, where 1
            # represents a mistake in prediction
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()

这里是mlp.py中的函数

 

 

class HiddenLayer(object):
    def __init__(self, rng, input, n_in, n_out, W=None, b=None,
                 activation=T.tanh): #nnet.sigmoid):         #T.tanh):
        self.input = input
        if W is None:
            W_values = numpy.asarray(
                rng.uniform(
                    low=-numpy.sqrt(6. / (n_in + n_out)),
                    high=numpy.sqrt(6. / (n_in + n_out)),
                    size=(n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation == theano.tensor.nnet.sigmoid:
                W_values *= 4

            W = theano.shared(value=W_values, name='W', borrow=True)

        if b is None:
            b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)

        self.W = W
        self.b = b

        lin_output = T.dot(input, self.W) + self.b   #W(784,500) 隐藏层500
        self.output = (
            lin_output if activation is None
            else activation(lin_output)
        )
        self.params = [self.W, self.b]

 

 

在这之前我有用过基于pca+svm的识别方式,虽然在数据库中的识别率可以达到98%,而且具有识别,训练速度快的优势。但是当我真正把他用在这个系统里测试的时候效果非常差,感觉七成的识别率都不一定有。开始我还以为我的代码有错误,专门写一些程序测试各个阶段是否有错,因为这个cnns的代码之前也出过一个小问题,造成识别率较低。其实这个数据集有点太规则了,字符的大概位置以及大小都是比较有规律的,如果自己手写输入就大不相同了,这里就会把pca的弱点体现出来,当然这里也可以通过字符分割,归一化大小的方式解决。这里想表达的意思是说一个在测试集中完美的算法在真正工程应用中还是有待考究的。

这里改用卷积神经网络训练的结果还不错,不过距离真正的LeNet-5还是有非常大的差距。首先我们这里训练的网络就和LeNet-5差距很大,毕竟只是一个例程如果可以直接商用那别人怎么玩。而且我觉得这里的训练数据是不是可以通过选择缩放、加入噪点的方式扩大,这里有点瞎猜。然后LeNet-5最精髓的地方我觉得还是能区分粘连手写体,看这里http://yann.lecun.com/exdb/lenet/a35.html  

调试过程中出了一个小问题,造成识别效果非常差,这里稍微写一下,就是theano训练时输入的数据是灰度图归一化大小之后的浮点数值,这里我忘记了,在建模的时候输入的数据是灰度值,造成结果非常差,仔细检查了模型,怕是自己理解有问题造成的。最后用了一个比较土的方法测试,直接将图片代替测试集的第一个数字信息,这样就排除了建模过程中错误造成的影响。

    f = gzip.open(dataset, 'rb')
    train_set, valid_set, test_set = cPickle.load(f)
    xx,yy = test_set
    if(insert):
        img = cv2.imread('0.jpg',0)
        img = img.reshape(28*28)
        xx[0]=img
    test_set = xx,yy
    print yy[0:10]
    f.close()

 

 

最后贴出一些测试结果

之前运行的一些截图不知道怎么上传没了,这里写点别的吧。虽然卷积神经网络的抗干扰性以及旋转抗性不错,不过要想达到较好的识别效果这样还是不够的。最好能有方法扩大数据集,这里给出一段代码。本来想用opencv做这部分处理,但是opencv里没有较好的函数,这里就借用了别人的一段代码修改了一下,由于还是比较熟悉opencv所以这里代码看着可能有点别扭。

 

# coding:utf-8
import os
import cv2
from random import randint, uniform
import numpy as np

from skimage.io import imshow
from skimage import transform, filters, exposure



def augmentDataset(PIXELS = 64,address = '12345.jpg',mul=49):
	img = cv2.imread(address,0)
	img = cv2.resize(img,(64,64))
	img = np.array(img)
	# set empty copy to hold augmented images so that we don't overwrite
	aug = np.empty(shape = (mul, PIXELS, PIXELS), dtype = 'float64')

	
	for i in range(mul):
		dorotate = randint(-10,10)
		# random translations
		trans_1 = randint(-10,10)
		trans_2 = randint(-10,10)
	    # random zooms
		zoom = uniform(1,1.3)
		# shearing
		shear_deg = uniform(-25, 25)

		center_shift   = np.array((PIXELS, PIXELS)) / 2. - 0.5
		tform_center   = transform.SimilarityTransform(translation=-center_shift)
		tform_uncenter = transform.SimilarityTransform(translation=center_shift)

		tform_aug = transform.AffineTransform(rotation = np.deg2rad(dorotate),
	                                          scale =(1/zoom, 1/zoom),
	                                          shear = np.deg2rad(shear_deg),
	                                          translation = (trans_1, trans_2))
		tform = tform_center + tform_aug + tform_uncenter


		aug[i] = transform._warps_cy._warp_fast(img, tform.params, output_shape=(PIXELS,PIXELS), mode='nearest')

	
 	indices_sobel = np.random.choice(mul, mul/4, replace = False)
	for k in indices_sobel:
		tmp = np.array(aug[k],dtype = 'int64' )
		tmp = np.array(tmp,dtype = 'float64' )
		aug[k] = filters.sobel(tmp)
	

	indices_invert = np.random.choice(mul, mul/2, replace = False)
	for l in indices_invert:
		aug[l] = np.absolute(aug[l] - np.amax(aug[l]))


	
	for p in range(mul):
		name = './pic/'+str(p)+'.png'
		cv2.imwrite(name,aug[p])
	return aug



if __name__ == '__main__':
	augmentDataset()


基于卷积神经网络的手写数字识别theano

 




文中没有解释具体的实现过程,主要是写了一下自己的一些感受吧。代码后面会在评论里贴上链接。


分享到: 微信 更多