如何设计数据库连接池

如何设计数据库连接池

曾经在面试的时候,就被问到如何设计连接池。当时就没有回答上来,就与offer擦肩而过。今天就来动手写一个简易版数据库连接池。

为什么需要连接池?数据库连接是一种比较宝贵资源,其创建或销毁都需要系统的开销。如果频繁创建、销毁,那势必会造成系统性能低下。为了提高资源的复用性,我们可以把创建的连接放入池中,需要的时候从池中获取,不需要的时候归还到池。这样我们就大大节省系统资源开销,还提高系统的性能。

难点

设计数据库连接池的难点在于:如何去归还到池中。

所用技术

动态代理:动态代理也可以理解为拦截器,可以在调用方法之前,去做一些事情。
JDK的代理基于接口,需要实现 InvocationHandler 接口
这里给出一个简单的例子。

public class Test {

    public static void main(String[] args) {
        A proxy = (A)Proxy.newProxyInstance(A.class.getClassLoader(), new Class[]{A.class}, new B());
        proxy.close();  // 打印方法名
    }

    interface A {
        void close();
    }

    static class B implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            System.out.println(method.getName());
            return null;
        }
    }
}

动态代理其实是JVM在运行期动态帮我们创建接口 A 的实例。动态创建的代理类大概是这个样子的。

public class AProxy implements A {
	InvocationHandler handler;
	public AProxy(InvocationHandler handler) {
		this.handler = handler;
	}
	@Override
	public void close() {
		handler.invoke(this, A.class.getMethod("close"), new Object[]{});
	}
}
数据库连接池

mybatis 中就实现数据库连接池,这里根据mybatis源码去掉一些统计功能,写一个简易的数据库连接池,便于我们能够理解其中设计原理。
数据库连接池有几个参数:最大活跃数、最大空闲数、最大空闲时间。

PoolState

PoolState 类是用来存放活跃的连接和空闲连接。

public class PoolState {
    private final List<PoolConnection> activeConnections = new ArrayList<>();
    private final List<PoolConnection> idleConnections = new ArrayList<>();

    public List<PoolConnection> getActiveConnections() {
        return activeConnections;
    }

    public List<PoolConnection> getIdleConnections() {
        return idleConnections;
    }
}
PoolConnection

PoolConnection 类对 Connection 进行一层包装。它实现 InvocationHandler 主要用来在调用 close 方法时,并不是真正去关闭数据库,而是把连接放回到连接池中(poolDataSource)。PoolConnection 类有两个Connection对象,realConnection是数据库连接实例,proxyConnection是JVM动态地创建Connection的对象。客户端拿到的是proxyConnection 对象。

public class PoolConnection implements InvocationHandler {
    private final static String CLOSE = "close";
    private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

    private final PoolDataSource dataSource;
    private final Connection realConnection;
    private final Connection proxyConnection;
    private long keepAliveTime;

    public PoolConnection(Connection conn, PoolDataSource dataSource) {
        this.dataSource = dataSource;
        this.realConnection = conn;
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        if (CLOSE.equals(methodName)) {  // 对close方法进行拦截
            dataSource.pushConnection(this); // 放回数据库连接池中
            return null;
        }
        return method.invoke(realConnection, args);
    }

    public Connection getRealConnection() {
        return realConnection;
    }

    public Connection getProxyConnection() {
        return proxyConnection;
    }

    public long getKeepAliveTime() {
        return System.currentTimeMillis() - keepAliveTime;
    }

    public void setKeepAliveTime(long keepAliveTime) {
        this.keepAliveTime = keepAliveTime;
    }
}
PoolDataSource

PoolDataSource 类实现了DataSource接口,由我们自己来维护连接池以及连接池的状态。PoolDataSource类有几个参数:

  • poolMaximumActiveConnections:最大活跃数
  • poolMaximumIdleConnections:最大空闲数
  • poolKeepAliveTime:存活时间
  • poolTimeToWait:池等待时间。

怎么把连接放回连接池?
连接都是从活跃队列(activeConnections)中取出的。首先把连接从活跃队列中移除,然后判断此时空闲队列是达到上限,如果没有放入空闲队列中;否则关闭连接。

protected void pushConnection(PoolConnection conn) throws SQLException {
        synchronized (state) {
            state.getActiveConnections().remove(conn);
            if (state.getIdleConnections().size() < poolMaximumIdleConnections) {
                if (!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                state.getIdleConnections().add(conn);
                state.notifyAll();
            } else {
                if (!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                conn.getRealConnection().close();
            }
        }
    }

怎么从数据库获取连接?

获取连接的逻辑比较复杂,这里我画了一个逻辑图来帮助理解。
在这里插入图片描述

private PoolConnection popConnection() throws SQLException {
        PoolConnection conn = null;

        while (conn == null) {
            synchronized (state) {
                if (!state.getIdleConnections().isEmpty()) {
                    conn = state.getIdleConnections().remove(0);
                } else {
                    if (state.getActiveConnections().size() < poolMaximumActiveConnections) {
                        conn = new PoolConnection(getRealConnection(), this);
                    } else {
                        PoolConnection oldConn = state.getActiveConnections().get(0);
                        long keepAliveTime = oldConn.getKeepAliveTime();
                        if (keepAliveTime > poolKeepAliveTime) {
                            state.getActiveConnections().remove(oldConn);
                            if (!oldConn.getRealConnection().getAutoCommit()) {
                            // 把放回连接池的对象需要回滚,防止数据造成异常
                                oldConn.getRealConnection().rollback();
                            }
                            conn = new PoolConnection(oldConn.getRealConnection(),this);
                        } else {
                            try {
                            	// 如果空闲队列的连接数已经达到上限
                            	// 活跃队列的连接数也达到上限并且都还存活
                            	// 此时,连接池需要等待
                                state.wait(poolTimeToWait);
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                    }
                }

                if (conn != null) {
                    if (!conn.getRealConnection().getAutoCommit()) {
                        conn.getRealConnection().rollback();
                    }
                    conn.setKeepAliveTime(System.currentTimeMillis());
                    state.getActiveConnections().add(conn);
                }
            }
        }

        if (conn == null) {
            throw new SQLException("PoolDataSource get connection fail.");
        }
        return conn;
    }

PoolDataSource

public class PoolDataSource implements DataSource{

    private static Map<String, Driver> registerDrivers = new ConcurrentHashMap<>();
    private String driver;
    private String url;
    private String username;
    private String password;

    private final PoolState state = new PoolState();
    private DataSource dataSource;

    private int poolMaximumActiveConnections = 10;
    private int poolMaximumIdleConnections = 5;
    private int poolKeepAliveTime = 2000;
    private int poolTimeToWait = 2000;

    public PoolDataSource(String driver, String url, String username, String password) {
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
    }

    protected void pushConnection(PoolConnection conn) throws SQLException {
        synchronized (state) {
            state.getActiveConnections().remove(conn);
            if (state.getIdleConnections().size() < poolMaximumIdleConnections) {
                if (!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                state.getIdleConnections().add(conn);
                state.notifyAll();
            } else {
                if (!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                conn.getRealConnection().close();
            }
        }
    }

    private PoolConnection popConnection() throws SQLException {
        PoolConnection conn = null;

        while (conn == null) {
            synchronized (state) {
                if (!state.getIdleConnections().isEmpty()) {
                    conn = state.getIdleConnections().remove(0);
                } else {
                    if (state.getActiveConnections().size() < poolMaximumActiveConnections) {
                        conn = new PoolConnection(getRealConnection(), this);
                    } else {
                        PoolConnection oldConn = state.getActiveConnections().get(0);
                        long keepAliveTime = oldConn.getKeepAliveTime();
                        if (keepAliveTime > poolKeepAliveTime) {
                            state.getActiveConnections().remove(oldConn);
                            if (!oldConn.getRealConnection().getAutoCommit()) {
                                oldConn.getRealConnection().rollback();
                            }
                            conn = new PoolConnection(oldConn.getRealConnection(),this);
                        } else {
                            try {
                                state.wait(poolTimeToWait);
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                    }
                }

                if (conn != null) {
                    if (!conn.getRealConnection().getAutoCommit()) {
                        conn.getRealConnection().rollback();
                    }
                    conn.setKeepAliveTime(System.currentTimeMillis());
                    state.getActiveConnections().add(conn);
                }
            }
        }

        if (conn == null) {
            throw new SQLException("PoolDataSource get connection fail.");
        }
        return conn;
    }

    private Connection getRealConnection() throws SQLException {
        return getRealConnection(username, password);
    }

    private Connection getRealConnection(String username, String password) throws SQLException {
        Properties props = new Properties();
        if (username != null) {
            props.put("user", username);
        }
        if (password != null) {
            props.put("password", password);
        }
        return getRealConnection(props);
    }

    private Connection getRealConnection(Properties properties) throws SQLException {
        initializeDriver();
        Connection conn = DriverManager.getConnection(url, properties);
        return conn;
    }

    private void initializeDriver() throws SQLException {
        if (!registerDrivers.containsKey(driver)) {
            try {
                Class.forName(driver);
            } catch (Exception e) {
                throw new SQLException("register Driver fail. Cause:" + e);
            }
        }
    }

    @Override
    public Connection getConnection() throws SQLException {
        return popConnection().getProxyConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return popConnection().getProxyConnection();
    }
    // 省去get//set 以及部分接口方法
}
总结

虽然这个简版连接池并不能其他相媲美,但是可以帮我们理解连接池的原理,这里也是借助源码,去除一些复杂功能。尤其是动态代理的技术运用到这里,十分精妙。

参考

版权声明:本文为buffeer原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。