Foreword
This article is actually a continuation of the previous two articles, both of which talk about the c3p0 database connection pool on the positioning line, and the problem of connection leakage occurs.
The second article mentioned that two parameters can be configured to find out where the code borrowed the connection and did not return it. However, the situation on my side is that for the connection that has not been returned, the stack of the borrower is indeed printed to the log, but when I simulated it locally, I found that there are actually returned connections in these scenarios, so I began to doubt Not a code problem.
It’s not a business code problem, what could be the problem? Let’s first look at how the connection is returned to the connection pool.
Actual type of connection
I debugged locally and found that when getting a connection, the code is as follows:
com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource#getConnection() public Connection getConnection() throws SQLException { // javax.sql.PooledConnection, the actual type is com.mchange.v2.c3p0.impl.NewPooledConnection PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection(); return pc. getConnection(); }
To be honest, I didn’t notice the class javax.sql.PooledConnection
in the jdbc API before. Here, I first obtained a com.mchange.v2.c3p0 from the c3p0 connection pool. impl.NewPooledConnection
object, and then converted to javax.sql.PooledConnection
.
Then, calling javax.sql.PooledConnection#getConnection
will return an object whose actual type is com.mchange.v2.c3p0.impl.NewProxyConnection
.
com.mchange.v2.c3p0.impl.NewPooledConnection#getConnection public synchronized Connection getConnection() throws SQLException { if ( exposedProxy == null ) { exposedProxy = new NewProxyConnection( physicalConnection, this ); } return exposedProxy; }
In this class, it mainly contains the following fields:
inner: the actual underlying connection, as here, its type is oracle.jdbc.driver.T4CConnection parentPooledConnection: pooled connection of type javax.sql.PooledConnection cel: The type is ConnectionEventListener, which is a listener
connection.close method logic
com.mchange.v2.c3p0.impl.NewProxyConnection public synchronized void close() throws SQLException { // 0 if (!this. isDetached()) { // 1 NewPooledConnection npc = this. parentPooledConnection; this. detach(); // 2 npc.markClosedProxyConnection(this, this.txn_known_resolved); this. inner = null; } }
0, check whether the object has been unbound from the underlying pooled connection:
boolean isDetached() { return this. parentPooledConnection == null; }
At 1, obtain a pooled connection of type NewPooledConnection
through parentPooledConnection
, and then unbind it from the pooled connection:
private void detach() { this.parentPooledConnection.removeConnectionEventListener(this.cel); this. parentPooledConnection = null; }
2, call the pooled connection method to clean up:
void markClosedProxyConnection( NewProxyConnection npc, boolean txn_known_resolved ) { // 2.1 List closeExceptions = new LinkedList(); // 2.2 cleanupResultSets( closeExceptions ); cleanupUncachedStatements( closeExceptions ); checkinAllCachedStatements( closeExceptions ); // 2.3 if ( closeExceptions. size() > 0 ) { ... // print exception } reset(txn_known_resolved); exposedProxy = null; //volatile // 2.4 fireConnectionClosed(); }
At 2.1, build a list to collect various exceptions during the cleaning process;
2.2, clean up ResultSet, Statement, etc.
2.3, the printing is abnormal
At 2.4, notify the listener:
private void fireConnectionClosed() { ces.fireConnectionClosed(); }
Then enter:
ConnectionEvent evt = new ConnectionEvent(source); for (Iterator i = mlCopy. iterator(); i. hasNext();) { ConnectionEventListener cl = (ConnectionEventListener) i. next(); // 1 method to call listener cl.connectionClosed(evt); } // com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.ConnectionEventListenerImpl#connectionClosed public void connectionClosed(final ConnectionEvent evt) { doCheckinResource( evt ); } Then the following method is called: private void doCheckinResource(ConnectionEvent evt) { // rp: com.mchange.v2.resourcepool.BasicResourcePool rp. checkinResource( evt. getSource() ); }
Here rp is the resource pool, and the connection will be returned to the resource pool here.
The internal implementation is as follows:
Here is an internal class RefurbishCheckinResourceTask
is defined, the internal class implements Runnable, and then a new instance is thrown to taskRunner for asynchronous return.
The logic of this task:
class RefurbishCheckinResourceTask implements Runnable { public void run() { // 1 Check if the resource is ok boolean resc_okay = attemptRefurbishResourceOnCheckin( resc ); synchronized( BasicResourcePool. this ) { PunchCard card = (PunchCard) managed. get( resc ); // 2 If the resource is ok, return it to the unused free list and update the card if (resc_okay & amp; & amp; card != null) { // 2.1 Return to the unused free list unused. add(0, resc ); // 2.2 Update the return time of the card to the current time, and the lending time to -1, indicating that it has not been lent card.last_checkin_time = System.currentTimeMillis(); card.checkout_time = -1; } else { if (card != null) card.checkout_time = -1; // The connection is bad, then destroy the connection removeResource( resc ); ensureMinResources(); } BasicResourcePool.this.notifyAll(); } } }
The connection is returned here. You can see that it is a new runnable, which is thrown to the thread pool for asynchronous execution. However, asynchronous execution is not very stable. For example, if the threads in the thread pool are all stuck at this time, there is no way How to deal with task?
The words APPARENT DEADLOCK appear in the online log
Problem Description
If you search for APPARENT DEADLOCK
in a search engine, you will find a lot, which means that everyone has been troubled by this problem for a long time.
On our side, every time this connection leak problem occurs, it seems to be accompanied by this log. The log looks like this:
06-08 17:00:30,119[Timer-5][][c.ThreadPoolAsynchronousRunner:608][WARN]-com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@3cf46c2 -- APPARENT DEADLOCK!!! emergency threads for unassigned pending tasks! 06-08 17:00:30,121[Timer-5][][c.ThreadPoolAsynchronousRunner:624][WARN]-com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@3cf46c2 -- APPARENT DEADLOCK!!! Complete Status: Managed Threads: 3 Active Threads: 3 Active Tasks: com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@b451b27 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0) com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@65f9a338 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1) com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@684ae5d5 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2) Pending Tasks: com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@d373871 com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@245a897e com.mchange.v2.resourcepool.BasicResourcePool$DestroyResourceTask@33f8c1d7 com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@107e24e9 Pool thread stack traces: Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main] java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.read(SocketInputStream.java:152) java.net.SocketInputStream.read(SocketInputStream.java:122) oracle.net.ns.Packet.receive(Packet.java:300) oracle.net.ns.DataPacket.receive(DataPacket.java:106) oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:315) oracle.net.ns.NetInputStream.read(NetInputStream.java:260) oracle.net.ns.NetInputStream.read(NetInputStream.java:185) oracle.net.ns.NetInputStream.read(NetInputStream.java:102) oracle.jdbc.driver.T4CSocketInputStreamWrapper.readNextPacket(T4CSocketInputStreamWrapper.java:124) oracle.jdbc.driver.T4CSocketInputStreamWrapper.read(T4CSocketInputStreamWrapper.java:80) oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1137) oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:290) oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:192) oracle.jdbc.driver.T4CTTIoauthenticate.doOSESSKEY(T4CTTIoauthenticate.java:404) oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:385) oracle.jdbc.driver.PhysicalConnection.<init>(PhysicalConnection.java:546) oracle.jdbc.driver.T4CConnection.<init>(T4CConnection.java:236) oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:32) oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:521) com.mchange.v2.c3p0.DriverManagerDataSource.getConnection(DriverManagerDataSource.java:134) com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:182) com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:171) com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool$1PooledConnectionResourcePoolManager.acquireResource(C3P0PooledConnectionPool.java:137) com.mchange.v2.resourcepool.BasicResourcePool.doAcquire(BasicResourcePool.java:1014) com.mchange.v2.resourcepool.BasicResourcePool.access$800(BasicResourcePool.java:32) com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask.run(BasicResourcePool.java:1810) com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:547)
task type in thread pool
We have mentioned that many things are thrown to the thread pool for asynchronous execution. For example, when the main thread initializes the connection, the main will not create the connection by itself, but new several tasks, which are thrown to the thread pool for parallel execution, and then main The thread waits there.
There are mainly several tasks:
-
com.mchange.v2.resourcepool.BasicResourcePool.AcquireTask
Get the database connection and deal with the underlying db driver, such as the driver of mysql and oracle
-
com/mchange/v2/resourcepool/BasicResourcePool.java:959
In this method, an internal class is defined, and this
DestroyResourceTask
is used to destroy the underlying connectionprivate void destroyResource(final Object resc, boolean synchronous) { class DestroyResourceTask implements Runnable {
-
Inner class in com.mchange.v2.resourcepool.BasicResourcePool#doCheckinManaged:
class RefurbishCheckinResourceTask implements Runnable
? This class is very important. As mentioned earlier, when the connection is returned, this task will be generated and executed asynchronously.
-
com.mchange.v2.resourcepool.BasicResourcePool.AsyncTestIdleResourceTask#AsyncTestIdleResourceTask
This class is mainly to test those resources that have been idle for too long to see if they are still ok. If not, they will be destroyed in time
-
com.mchange.v2.resourcepool.BasicResourcePool.RemoveTask
It is needed when the connection pool shrinks. For example, if there are 20 connections now, and we configure the min as 10, then the extra 10 connections will be destroyed.
Among them, there are several that need to communicate with db, such as AcquireTask, DestroyResourceTask, AsyncTestIdleResourceTask, the communication may time out, and the current thread may be blocked after a long timeout. Next, let’s see if these threads are blocked possible.
How does the thread pool execute the task
The thread pool is created as follows:
private ThreadPoolAsynchronousRunner(int num_threads, boolean daemon, int max_individual_task_time, int deadlock_detector_interval, int interrupt_delay_after_apparent_deadlock, Timer myTimer, boolean should_cancel_timer ) { this.num_threads = num_threads; this. daemon = daemon; this.max_individual_task_time = max_individual_task_time; this. deadlock_detector_interval = deadlock_detector_interval; this.interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock; this.myTimer = myTimer; this.should_cancel_timer = should_cancel_timer; // create thread pool recreateThreadsAndTasks(); myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval ); }
private void recreateThreadsAndTasks() { // If the thread pool already exists, destroy it first if ( this. managed != null) { Date aboutNow = new Date(); for (Iterator ii = managed. iterator(); ii. hasNext(); ) { PoolThread pt = (PoolThread) ii. next(); pt. gentleStop(); stoppedThreadsToStopDates. put( pt, aboutNow ); ensureReplacedThreadsProcessing(); } } \t\t // create thread pool this. managed = new HashSet(); this.available = new HashSet(); this.pendingTasks = new LinkedList(); for (int i = 0; i < num_threads; + + i) { // The thread type is com.mchange.v2.async.ThreadPoolAsynchronousRunner.PoolThread Thread t = new PoolThread(i, daemon); managed. add( t ); available. add( t ); t. start(); } }
The execution logic of the thread:
// 1 boolean should_stop; LinkedList pendingTasks; while (true) { Runnable myTask; synchronized ( ThreadPoolAsynchronousRunner. this ) { while ( !should_stop & amp; & amp; pendingTasks. size() == 0 ) ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL ); // 2 if (should_stop) break thread_loop; // 3 myTask = (Runnable) pendingTasks. remove(0); currentTask = myTask; } try { // 4 if (max_individual_task_time > 0) setMaxIndividualTaskTimeEnforcer(); // 5 myTask. run(); } ... }
At 1, a flag is defined in the thread. If the flag is true, the thread will stop executing if it detects it;
2 places, detection marks;
3, extract tasks from the task list;