ReentrantLock implements PV operation – simulates multi-threaded competition for database connection pool resource scenarios

Use ReentrantLock + Condition to simulate PV operation, realize multi-threaded competition for database connection pool resources, block waiting after resources are exhausted, and wake up blocked threads after returning resources (in the code, 10 threads compete for 5 database connection resources)

    • ConnectionPool.class (connection pool)
    • Connection.class (connection object)
    • ConnectionState.class (connection state enumeration)
    • Application.class (program entry)
    • operation result

ConnectionPool.class (connection pool)

package demo.lock.db;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;

/**
 * Simulate database connection pool
 */
public class ConnectionPool {<!-- -->
    /** The maximum number of connections in the connection pool */
    private static final int POOL_SIZE = 5;
    public static List<Connection> connections;
    /** signal */
    public static volatile int signal = POOL_SIZE;
    /** Lock */
    public static ReentrantLock lock = new ReentrantLock();
    /** Wait for condition */
    public static Condition condition = lock. newCondition();

    /**
     * <p>
     * Initialize the database connection pool
     *</p>
     */
    public static void init() {<!-- -->
        connections = new ArrayList<>(POOL_SIZE);
        for (int i = 0; i < POOL_SIZE; i ++ ) {<!-- -->
            connections.add(new Connection("connection-" + i));
        }
    }

    /**
     * <p>
     * Use ReentrantLock to wrap the acquisition data connection logic into a P operation primitive
     * <p>
     * After preempting the lock, if it detects that the semaphore is not greater than 0, add itself to the blocking queue of the Condition object and release the lock
     *</p>
     * <p>
     * When continuing to hold the lock or re-holding the lock after being woken up, try to spin to obtain the database connection
     *</p>
     *
     * @return obtained database connection object
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {<!-- -->
        // Perform P operation, first acquire the lock object
        lock. lock();
        try {<!-- -->
            // At this time, the lock object is preempted, but since the semaphore is less than or equal to 0, that is, no resources are available, it blocks itself and releases the lock at the same time
            if (signal <= 0) {<!-- -->
                condition. await();
            }
            // When holding or re-holding the lock object, perform a spin operation to try to obtain a database connection
            while (true) {<!-- -->
                if (signal > 0) {<!-- -->
                    // Get an idle connection from the connection pool
                    List<Connection> freeConnections = connections. stream()
                        .filter(x -> x.state.equals(ConnectionState.FREE)).collect(Collectors.toList());
                    Connection currConnection =
                        CollectionUtils.isEmpty(freeConnections) ? null : freeConnections.get(0);
                    if (null == currConnection) {<!-- -->
                        return null;
                    }
                    currConnection.state = ConnectionState.BUSY;
                    // Get the connection successfully, the semaphore will be decremented by 1
                    signal--;
                    System.out.println("Current thread:" + Thread.currentThread().getName() + " Grab the database connection:"
                         + currConnection.getName() + " Number of idle connections in the current connection pool: " + signal);
                    return currConnection;
                }
            }
        } finally {<!-- -->
            // Release the lock object
            lock. unlock();
        }

    }

    /**
     * <p>
     * Use ReentrantLock to wrap the return data connection logic into a V primitive
     *</p>
     * <p>
     * After returning the connection successfully and incrementing the semaphore, wake up a thread in the waiting queue of Condition
     *</p>
     *
     * @param connection database connection object
     */
    public static void repayConnection(Connection connection) {<!-- -->
        if (null == connection) {<!-- -->
            return;
        }
        // Perform V operation, first acquire the lock object
        lock. lock();
        try {<!-- -->
            connections.forEach(x -> x.state = connection.equals(x) ? ConnectionState.FREE : x.state);
            // The semaphore is incremented by 1, which means returning 1 resource
            signal + + ;
            System.out.println("Current thread:" + Thread.currentThread().getName() + "Return database connection:" + connection.getName()
                 + " Number of idle connections in the current connection pool: " + signal);
            // wake up a thread in the waiting queue of the condition
            condition. signal();
        } finally {<!-- -->
            // Release the lock resource
            lock. unlock();
        }
    }
}

Connection.class (connection object)

package demo.lock.db;

/**
 * Database connection object
 */
public class Connection {<!-- -->
    /**
     * connection name
     */
    String name;
    /**
     * current status
     */
    ConnectionState state = ConnectionState. FREE;

    public Connection(String name) {<!-- -->
        this.name = name;
    }

    public String getName() {<!-- -->
        return name;
    }

    public void setName(String name) {<!-- -->
        this.name = name;
    }

    @Override
    public String toString() {<!-- -->
        return "Current connection: " + ", state: " + state.state;
    }
}

ConnectionState.class (connection state enumeration)

package demo.lock.db;

/**
 * Database connection current status enumeration
 */
public enum ConnectionState {<!-- -->

    FREE(0, "Free"), BUSY(1, "Busy");

    /**
     * Status code 0-idle 1-busy
     */
    int code;
    /**
     * state name
     */
    String state;

    ConnectionState(int code, String state) {<!-- -->
        this.code = code;
        this.state = state;
    }
}

Application.class (program entry)

package demo.lock;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import demo.lock.db.Connection;
import demo.lock.db.ConnectionPool;

public class Application {<!-- -->

    ThreadPoolExecutor executor =
        new ThreadPoolExecutor(10, 10, 30, TimeUnit. MILLISECONDS, new LinkedBlockingDeque<>());

    public void execute() {<!-- -->
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            executor. submit(new Runnable() {<!-- -->
                @Override
                public void run() {<!-- -->
                    try {<!-- -->
                        Connection connection = ConnectionPool. getConnection();
                        // Randomly occupy 0-1 seconds and return the connection
                        Thread. sleep((long)(Math. random() * 1000));
                        ConnectionPool. repayConnection(connection);
                    } catch (Exception e) {<!-- -->
                        e.printStackTrace();
                    }
                }
            });
        }

    }

    public void shutdown() {<!-- -->
        if (null == executor) {<!-- -->
            return;
        }
        executor. shutdown();
    }

    public static void main(String[] args) throws InterruptedException {<!-- -->
        ConnectionPool.init();
        Application application = new Application();
        application. execute();
        Thread. sleep(10000L);
        application. shutdown();
    }
}

Run result