宝剑锋从磨砺出,梅花香自苦寒来。这篇文章主要讲述Android数据库源码分析-连接缓存池SQLiteConnectionPool相关的知识,希望能为你提供帮助。
本系列主要关注安卓数据库的线程行为,分为四个部分:
(1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
(2)SQLiteDatabase的实现以及多线程行为
(3)连接缓存池SQLiteConnectionPool
(4)SQLiteDatabase多线程实践
本篇主要关注SQLiteConnectionPool
(连接池)在并发下的行为。
上文提到,SQLiteDatabase
会在每个线程中使用一个SQLiteSession
,而SQLiteSession
会共用一个SQLiteConnectionPool
对象,并通过SQLiteConnectionPool
的acquireConnection
和releaseConnection
方法来获取和释放数据库连接(一个SQLiteConnection
对象)。
public SQLiteConnection acquireConnection(String sql, int connectionFlags,
CancellationSignal cancellationSignal) {
return waitForConnection(sql, connectionFlags, cancellationSignal);
//看名字是要等待什么锁了
}
connectionFlags
是用SQLiteDatabase.getThreadDefaultConnectionFlags
的返回值一路传下来的,这个方法在前文讨论过这个方法,会记录两件事:1.数据库是只读还是可写;2.当前是否主线程。waitForConnection
方法比较长,我们一段一段地看。1 尝试立即获取连接
//是否可写连接。可写的连接同一时间只能存在一个。
final boolean wantPrimaryConnection =
(connectionFlags &
CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY) != 0;
final ConnectionWaiter waiter;
final int nonce;
synchronized (mLock) {//加锁。留意这一段代码中加锁部分并未结束。
throwIfClosedLocked();
// Abort if canceled.
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}// Try to acquire a connection.
SQLiteConnection connection = null;
if (!wantPrimaryConnection) {
//尝试获取只读连接
connection = tryAcquireNonPrimaryConnectionLocked(
sql, connectionFlags);
// might throw
}
if (connection == null) {
//尝试获取可写连接
connection = tryAcquirePrimaryConnectionLocked(connectionFlags);
// might throw
}
if (connection != null) {
return connection;
}
到这里是尝试直接获取连接。尝试的方法有
tryAcquireNonPrimaryConnectionLocked
和tryAcquirePrimaryConnectionLocked
。只读时只需要 non primary connection,而需要写时要primary connection。先看
tryAcquirePrimaryConnectionLocked
:// Might throw.
private SQLiteConnection tryAcquirePrimaryConnectionLocked(int connectionFlags) {
// If the primary connection is available, acquire it now.
SQLiteConnection connection = mAvailablePrimaryConnection;
//同时只能存在一个可写连接,用一个成员变量mAvailablePrimaryConnection缓存空闲连接
if (connection != null) {//有缓存返回即可。finishAcquirePrimaryConnection会把connection放到mAcquiredConnections中。mAcquiredConnections存储正在使用的连接。
mAvailablePrimaryConnection = null;
//不再空闲
finishAcquireConnectionLocked(connection, connectionFlags);
// might throw
return connection;
}// Make sure that the primary connection actually exists and has just been acquired.
//如果上一个if造成了不再空闲,则mAcquiredConnections中就会有一个primary connection,这里就会返回null。上一层的waitForConnection接到null会进入等待状态,这个后面讨论。
for (SQLiteConnection acquiredConnection : mAcquiredConnections.keySet()) {
if (acquiredConnection.isPrimaryConnection()) {
return null;
}
}//如果没有在上面返回null,那么这一定是第一次请求primary connnection,或者有一个连接泄露了(未recycle的情况下finalize),这时候就需要用openConnectionLocked去新开一个连接。
// Uhoh.No primary connection!Either this is the first time we asked
// for it, or maybe it leaked?
connection = openConnectionLocked(mConfiguration,
true /*primaryConnection*/);
// might throw
finishAcquireConnectionLocked(connection, connectionFlags);
// might throw
return connection;
}
然后看
tryAcquireNonPrimaryConnectionLocked
:// Might throw.
private SQLiteConnection tryAcquireNonPrimaryConnectionLocked(
String sql, int connectionFlags) {
// Try to acquire the next connection in the queue.
SQLiteConnection connection;
//只读连接可以有多个,用一个ArrayList缓存了所有空闲连接
final int availableCount = mAvailableNonPrimaryConnections.size();
if (availableCount >
1 &
&
sql != null) {
// If we have a choice, then prefer a connection that has the
// prepared statement in its cache.
// 如上面的英文注释说的,如果有不止一个连接可选,那么挑选缓存了相同sql语句的那个。可能SQLiteConnection对此有优化?
for (int i = 0;
i <
availableCount;
i++) {
connection = mAvailableNonPrimaryConnections.get(i);
if (connection.isPreparedStatementInCache(sql)) {//如果有相同sql,返回
mAvailableNonPrimaryConnections.remove(i);
finishAcquireConnectionLocked(connection, connectionFlags);
// might throw
return connection;
}
}
}
if (availableCount >
0) {
// Otherwise, just grab the next one.
//没有挑到,随便给一个
connection = mAvailableNonPrimaryConnections.remove(availableCount - 1);
finishAcquireConnectionLocked(connection, connectionFlags);
// might throw
return connection;
}// 一个空闲连接都没有。
// Expand the pool if needed.
int openConnections = mAcquiredConnections.size();
if (mAvailablePrimaryConnection != null) {
openConnections += 1;
}
// 上面在计算有多少已打开连接(空闲+使用中)。这里肯定没有空闲non primary连接了,而如果有空闲primary连接,则要 += 1。
if (openConnections >
= mMaxConnectionPoolSize) {
// 超过数据库连接限制,放弃治疗。连接限制与数据库底层实现有关。
return null;
}
// 没超限,还能再开一个连接。所以开连接并返回。
connection = openConnectionLocked(mConfiguration,
false /*primaryConnection*/);
// might throw
finishAcquireConnectionLocked(connection, connectionFlags);
// might throw
return connection;
}
在这一步中,进行了从缓存中取得连接的尝试;而如果无法取得连接,也进行了打开连接的尝试。如果再无法打开的话,就会拿到一个null了。后续就需要进行等待。
2 等待获取连接
// 留意这里还在上一个锁mLock中
// No connections available.Enqueue a waiter in priority order.
final int priority = getPriority(connectionFlags);
//主线程中的连接优先级更高,记得吗?
final long startTime = SystemClock.uptimeMillis();
// waiter是一个ConnectionWaiter对象。它同时也是一个链表,有一个同类的mNext成员变量。
// obtainConnectionWaiterLocked会去复用(取链表头)或者新建一个对象。
waiter = obtainConnectionWaiterLocked(Thread.currentThread(), startTime,
priority, wantPrimaryConnection, sql, connectionFlags);
ConnectionWaiter predecessor = null;
// 按照优先级向mConnectionWaiterQueue添加waitor对象。mConnectionWaiterQueue不是复用池,而是有效的等待队列(也是链表)。
ConnectionWaiter successor = mConnectionWaiterQueue;
while (successor != null) {
if (priority >
successor.mPriority) {
waiter.mNext = successor;
break;
}
predecessor = successor;
successor = successor.mNext;
}
if (predecessor != null) {
predecessor.mNext = waiter;
} else {
mConnectionWaiterQueue = waiter;
}nonce = waiter.mNonce;
//观察recycleConnectionWaiterLocked方法,mNonce在waiter每次被复用完成回收时自增1
}//锁mLock结束
到这里就是把需要等待的连接信息封装到
ConnectionWaiter
中,并将ConnectionWaiter
对象放到一个链表里。那么什么时候会结束等待并返回呢?继续看代码:// Set up the cancellation listener.
if (cancellationSignal != null) {
cancellationSignal.setOnCancelListener(new CancellationSignal.OnCancelListener() {
@Override
public void onCancel() {
synchronized (mLock) {
if (waiter.mNonce == nonce) {//nonce的作用在这里体现。防止waiter对象复用造成误取消。
cancelConnectionWaiterLocked(waiter);
}
}
}
});
}
这一段用于额外处理取消信号的。在等待连接过程中取消,就可以把这一个waiter去除了。
【Android数据库源码分析-连接缓存池SQLiteConnectionPool】接下来:
try {
// Park the thread until a connection is assigned or the pool is closed.
// Rethrow an exception from the wait, if we got one.
long busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
long nextBusyTimeoutTime = waiter.mStartTime + busyTimeoutMillis;
for (;
;
) {//循环开始
// Detect and recover from connection leaks.
// 管理泄露连接的。如果一个SQLiteConnection在finalize时还未关闭,则会置泄露状态。
// mConnectionLeaked是一个AtomicBoolean。
if (mConnectionLeaked.compareAndSet(true, false)) {
synchronized (mLock) {
wakeConnectionWaitersLocked();
//有泄露连接被关闭的话,最大连接限制下就可能有位置空出来,这时候就可以尝试分配一个连接
}
}// Wait to be unparked (may already have happened), a timeout, or interruption.
// 等待。那么unpark在哪里?在wakeConnectionWaitersLocked中。这个方法在上面泄露测试时调用过。
// 还有cancelConnectionWaiterLocked中,取消等待自然要唤醒线程处理一下。
LockSupport.parkNanos(this, busyTimeoutMillis * 1000000L);
// Clear the interrupted flag, just in case.
Thread.interrupted();
// Check whether we are done waiting yet.
synchronized (mLock) {
throwIfClosedLocked();
//等到了一个Connection。这个mAssignedConnection是何时赋值的呢?
//也是在wakeConnectionWaitersLocked中赋值的。
final SQLiteConnection connection = waiter.mAssignedConnection;
final RuntimeException ex = waiter.mException;
if (connection != null || ex != null) {
recycleConnectionWaiterLocked(waiter);
//回收waiter,会造成mNonce自增1
if (connection != null) {
return connection;
}
throw ex;
// rethrow!
}//没拿到连接,继续等。
final long now = SystemClock.uptimeMillis();
if (now <
nextBusyTimeoutTime) {
busyTimeoutMillis = now - nextBusyTimeoutTime;
} else {
logConnectionPoolBusyLocked(now - waiter.mStartTime, connectionFlags);
busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
nextBusyTimeoutTime = now + busyTimeoutMillis;
}
}
}//循环结束
} finally {
// Remove the cancellation listener.
if (cancellationSignal != null) {
cancellationSignal.setOnCancelListener(null);
}
}
}
在这一步中,用
ConnectionWaiter
来封装等待中的连接信息,并按优先级放入一个链表,随后进入等待状态。获取到连接后,等待状态结束,返回连接。3 连接的释放这里我们可以先预估以下:释放连接时需要把被释放的连接放回到空闲连接集合,并进行unpark操作,通知正在等待连接的线程。
代码如下:
public void releaseConnection(SQLiteConnection connection) {
synchronized (mLock) {
AcquiredConnectionStatus status = mAcquiredConnections.remove(connection);
//从活跃连接池中移除
if (status == null) {
throw new IllegalStateException("Cannot perform this operation "
+ "because the specified connection was not acquired "
+ "from this pool or has already been released.");
}if (!mIsOpen) {
closeConnectionAndLogExceptionsLocked(connection);
} else if (connection.isPrimaryConnection()) {
if (recycleConnectionLocked(connection, status)) {
assert mAvailablePrimaryConnection == null;
mAvailablePrimaryConnection = connection;
//放回可写连接mAvailablePrimaryConnection
}
wakeConnectionWaitersLocked();
//通知其它线程
} else if (mAvailableNonPrimaryConnections.size() >
= mMaxConnectionPoolSize - 1) {
closeConnectionAndLogExceptionsLocked(connection);
} else {
if (recycleConnectionLocked(connection, status)) {
mAvailableNonPrimaryConnections.add(connection);
//放回空闲只读连接池
}
wakeConnectionWaitersLocked();
//通知其它线程
}
}
}
4 总结综上所述,
SQLiteConnectionPool
提供数据库连接的流程如下:(1)从缓存中获取一个空闲的连接。若有多个空闲连接,优先挑选执行过相同SQL的那个。注意如果是写操作的话,则会返回一个primary connection,并将其它尝试获得primary connection的线程阻塞,直到当前线程结束使用连接。而只读的操作则可以同时存在多个,并可以和写操作的连接共存。
(2)如果缓存中没有连接,检查底层数据库是否可以容纳更多连接。如果可以,新建一个连接并返回。
(3)如果底层数据库不再允许增加连接,则进入等待。到超时或者有其它连接被释放结束等待。如果此时可以获取连接,则返回连接。如果不能,进入新一轮等待。
5 多线程下的transaction了解了以上的特性之后,transaction的多线程行为就比较好理解了。
以下是
SQLiteDatabase
的beginTransaction
方法:private void beginTransaction(SQLiteTransactionListener transactionListener,
boolean exclusive) {
acquireReference();
try {
getThreadSession().beginTransaction(
exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
transactionListener,
getThreadDefaultConnectionFlags(false /*readOnly*/), null);
} finally {
releaseReference();
}
}
留意flags参数传入的readOnly为false,所以
SQLiteSession
会从SQLiteConnectionPool
中获取一个独占的连接。并且在SQLiteSession
执行其它SQL语句的情况下,执行完成会将连接释放回连接池,而beginTransaction
操作则不会,而是持有这一个连接直至同一线程内调用endTransaction
。这里再贴一遍SQLiteSession.execute
源码:public void execute(String sql, Object[] bindArgs, int connectionFlags,
CancellationSignal cancellationSignal) {
if (sql == null) {
throw new IllegalArgumentException("sql must not be null.");
}if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) {
return;
}acquireConnection(sql, connectionFlags, cancellationSignal);
// might throw
try {
mConnection.execute(sql, bindArgs, cancellationSignal);
// might throw
} finally {
//这里释放了连接(其实是交还给连接池)
releaseConnection();
// might throw
}
}
所以,当有一个线程在transaction过程中时,其它线程的写操作和
beginTransaction
操作都会被阻塞住,直至当前线程的transaction完成才会按照优先级挑选一个线程继续。推荐阅读
- Android Studio的第一次运行
- spring boot 工厂+策略模式实现APP版本控制
- Android Init进程分析番外篇(9.0的init进程)
- idea将web项目打成war包放在tomcat/webapps上运行
- cordova打包app前置条件
- App的布局管理
- app常见性能测试点
- Android基础——常用布局管理layout
- Android APK文件的逆向反编译