数据库事务浅析

数据库事务介绍 在我们使用数据库时,Android为我们提供了事务操作,简而言之就是在事务中,我们执行各种sql语句时,要么都执行成功,但只要其中一个sql语句失败时,那么会触发回滚,我们之前执行完成的sql语句也会失败。在下面代码中,手动抛出一个异常后,第二个sql语句执行失败,那么已经执行完成的第一个sql语句就会回滚。

/** * 使用事务来进行数据库操作,两种操作要么都完成,要么都失败(事务) */ SQLiteDatabase db = dbHelper.getWritableDatabase(); db.beginTransaction(); //开启事物 try { db.delete("book",null,null); if (true){ //这里手动抛出一个异常,让事物失败 //throw new NullPointerException(); //由于我们手动抛出了一个异常,这样添加数据的代码就无法执行了,但是由于事物的存在,此时旧数据也无法删除 } db.execSQL("insert into book(name,author,pages,price) values(?,?,?,?)", new String[]{ "android ", "tonycheng", "550", "79" }); db.setTransactionSuccessful(); }finally { db.endTransaction(); }

源码: 一.db.beginTransaction():
/** * Begins a transaction in EXCLUSIVE mode. * * Transactions can be nested. * When the outer transaction is ended all of * the work done in that transaction and all of the nested transactions will be committed or * rolled back. The changes will be rolled back if any transaction is ended without being * marked as clean (by calling setTransactionSuccessful). Otherwise they will be committed. *
* Here is the standard idiom for transactions: * *
*db.beginTransaction(); *try { *... *db.setTransactionSuccessful(); *} finally { *db.endTransaction(); *} *

*/ public void beginTransaction() { beginTransaction(null /* transactionStatusCallback */, true); }/**transactionListener listener that should be notified when the *transaction begins, commits, or is rolled back, either *explicitly or by a call to {@link #yieldIfContendedSafely}. */ private void beginTransaction(SQLiteTransactionListener transactionListener, boolean exclusive) { acquireReference(); try { getThreadSession().beginTransaction( exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE : SQLiteSession.TRANSACTION_MODE_IMMEDIATE, transactionListener, getThreadDefaultConnectionFlags(false /*readOnly*/), null); } finally { releaseReference(); } }

beginTransaction的注释大概意思是开启一个事务,事务是可以嵌套的,当一个外层事务的事务结束了,这个事务中所有的工作都将结束,嵌套的事务将被提交或者被回滚。
还有其中回滚的情况就是没有调用setTransactionSuccessful方法,调用了之后,事务将被提交。
TRANSACTION_MODE_EXCLUSIVE: 当一个事务开启时,session获得一个唯一的锁,当持有了这个锁时,只有当前的session可以进行读写。
/**transactionListener listener that should be notified when the *transaction begins, commits, or is rolled back, either *explicitly or by a call to {@link #yieldIfContendedSafely}. */ private void beginTransaction(SQLiteTransactionListener transactionListener, boolean exclusive) { //获取一个参考,mReferenceCount++,当mReferenceCount<=0时,抛出一个尝试去打开已关闭的对象异常 acquireReference(); try { getThreadSession().beginTransaction( exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE : SQLiteSession.TRANSACTION_MODE_IMMEDIATE, transactionListener, getThreadDefaultConnectionFlags(false /*readOnly*/), null); } finally { releaseReference(); } }

第三个参数:大概意思是获取一个适当的连接方式的flag
int getThreadDefaultConnectionFlags(boolean readOnly) { int flags = readOnly ? SQLiteConnectionPool.CONNECTION_FLAG_READ_ONLY : SQLiteConnectionPool.CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY; if (isMainThread()) { flags |= SQLiteConnectionPool.CONNECTION_FLAG_INTERACTIVE; } return flags; }

【数据库事务浅析】调用了beginTransaction开启事务之后,默认transactionListener为null,这个listener是监听我们的事务的开始,提交,或者回滚状态。exclusive 默认为false。
getThreadSession()通过ThreadLocal获取当前线程的Session,返回SQLiteSession对象,他提供了一种使用database的能力。
SQLiteSession的beginTransaction:
/** *@param transactionMode #TRANSACTION_MODE_EXCLUSIVE * */ public void beginTransaction(int transactionMode, SQLiteTransactionListener transactionListener, int connectionFlags, CancellationSignal cancellationSignal) { //检查事务是否被标记成功,如果成功则抛出异常 throwIfTransactionMarkedSuccessful(); beginTransactionUnchecked(transactionMode, transactionListener, connectionFlags, cancellationSignal); }######beginTransactionUnchecked:开启事务private void beginTransactionUnchecked(int transactionMode, SQLiteTransactionListener transactionListener, int connectionFlags, CancellationSignal cancellationSignal) { if (cancellationSignal != null) { cancellationSignal.throwIfCanceled(); }//1.事务为空 获取连接 if (mTransactionStack == null) { acquireConnection(null, connectionFlags, cancellationSignal); // might throw }try { // Set up the transaction such that we can back out safely // in case we fail part way. if (mTransactionStack == null) { // Execute SQL might throw a runtime exception. switch (transactionMode) { case TRANSACTION_MODE_IMMEDIATE: mConnection.execute("BEGIN IMMEDIATE; ", null, cancellationSignal); // might throw break; case TRANSACTION_MODE_EXCLUSIVE: //2. 开始执行 mConnection.execute("BEGIN EXCLUSIVE; ", null, cancellationSignal); // might throw break; default: mConnection.execute("BEGIN; ", null, cancellationSignal); // might throw break; } }// Listener might throw a runtime exception. if (transactionListener != null) { try { transactionListener.onBegin(); // might throw } catch (RuntimeException ex) { if (mTransactionStack == null) { mConnection.execute("ROLLBACK; ", null, cancellationSignal); // might throw } throw ex; } }// Bookkeeping can't throw, except an OOM, which is just too bad... Transaction transaction = obtainTransaction(transactionMode, transactionListener); transaction.mParent = mTransactionStack; mTransactionStack = transaction; } finally { if (mTransactionStack == null) { releaseConnection(); // might throw } } }

1.事务为空 获取连接
private void acquireConnection(String sql, int connectionFlags, CancellationSignal cancellationSignal) { if (mConnection == null) { assert mConnectionUseCount == 0; //1.1先从SQLiteConnectionPool连接池中获取SQLiteConnection对象: mConnection = mConnectionPool.acquireConnection(sql, connectionFlags, cancellationSignal); // might throw mConnectionFlags = connectionFlags; } //连接数+1 mConnectionUseCount += 1; }

1.1 acquireConnection:
public SQLiteConnection acquireConnection(String sql, int connectionFlags, CancellationSignal cancellationSignal) { //获取连接 SQLiteConnection con = waitForConnection(sql, connectionFlags, cancellationSignal); synchronized (mLock) { if (mIdleConnectionHandler != null) { mIdleConnectionHandler.connectionAcquired(con); } } return con; }

private SQLiteConnection waitForConnection(String sql, int connectionFlags, CancellationSignal cancellationSignal) {//是否可写连接。可写的连接同一时间只能存在一个。默认flag传进来此时不为0 final boolean wantPrimaryConnection = (connectionFlags & CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY) != 0; final ConnectionWaiter waiter; final int nonce; synchronized (mLock) { throwIfClosedLocked(); // Abort if canceled. if (cancellationSignal != null) { cancellationSignal.throwIfCanceled(); }// Try to acquire a connection. SQLiteConnection connection = null; if (!wantPrimaryConnection) { //1.1.1 尝试获取只读连接 connection = tryAcquireNonPrimaryConnectionLocked( sql, connectionFlags); // might throw } if (connection == null) { //1.1.2 尝试获取可写连接 connection = tryAcquirePrimaryConnectionLocked(connectionFlags); // might throw }//获取到连接 返回 if (connection != null) { return connection; }//没有可用的连接 则进入等待 //在这一步中,用ConnectionWaiter来封装等待中的连接信息,并按优先级放入一个链表,随后进入等待状态。获取到连接后,等待状态结束,返回连接。 // No connections available.Enqueue a waiter in priority order. //主线程中的连接优先级更高 final int priority = getPriority(connectionFlags); final long startTime = SystemClock.uptimeMillis(); // waiter是一个ConnectionWaiter对象。它同时也是一个链表,有一个同类的mNext成员变量。 waiter = obtainConnectionWaiterLocked(Thread.currentThread(), startTime, priority, wantPrimaryConnection, sql, connectionFlags); ConnectionWaiter predecessor = null; ConnectionWaiter successor = mConnectionWaiterQueue; // 按照优先级向mConnectionWaiterQueue添加waitor对象。mConnectionWaiterQueue不是复用池,而是有效的等待队列(也是链表)。 //将优先级高的排在下一个 while (successor != null) { if (priority > successor.mPriority) { waiter.mNext = successor; break; } predecessor = successor; successor = successor.mNext; } if (predecessor != null) { predecessor.mNext = waiter; } else { mConnectionWaiterQueue = waiter; }//观察recycleConnectionWaiterLocked方法,mNonce在waiter每次被复用完成回收时自增1 nonce = waiter.mNonce; }// Set up the cancellation listener. if (cancellationSignal != null) { cancellationSignal.setOnCancelListener(new CancellationSignal.OnCancelListener() { @Override public void onCancel() { synchronized (mLock) { if (waiter.mNonce == nonce) { //nonce的作用在这里体现。防止waiter对象复用造成误取消。 cancelConnectionWaiterLocked(waiter); } } } }); } try { // Park the thread until a connection is assigned or the pool is closed. // Rethrow an exception from the wait, if we got one. long busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS; long nextBusyTimeoutTime = waiter.mStartTime + busyTimeoutMillis; for (; ; ) { // Detect and recover from connection leaks. if (mConnectionLeaked.compareAndSet(true, false)) { synchronized (mLock) { //有泄露连接被关闭的话,最大连接限制下就可能有位置空出来,这时候就可以通过waiter 尝试分配一个连接 wakeConnectionWaitersLocked(); } }// Wait to be unparked (may already have happened), a timeout, or interruption. //等待。那么unpark在哪里?在wakeConnectionWaitersLocked中。这个方法在上面泄露测试时调用过。 LockSupport.parkNanos(this, busyTimeoutMillis * 1000000L); // Clear the interrupted flag, just in case. Thread.interrupted(); // Check whether we are done waiting yet. synchronized (mLock) { throwIfClosedLocked(); //等到了一个Connection。这个mAssignedConnection是何时赋值的呢? //也是在wakeConnectionWaitersLocked中赋值的。 final SQLiteConnection connection = waiter.mAssignedConnection; final RuntimeException ex = waiter.mException; if (connection != null || ex != null) {//回收waiter,会造成mNonce自增1 recycleConnectionWaiterLocked(waiter); if (connection != null) { return connection; } throw ex; // rethrow! }//没拿到连接,继续等。 final long now = SystemClock.uptimeMillis(); if (now < nextBusyTimeoutTime) { busyTimeoutMillis = now - nextBusyTimeoutTime; } else { logConnectionPoolBusyLocked(now - waiter.mStartTime, connectionFlags); busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS; nextBusyTimeoutTime = now + busyTimeoutMillis; } } } } finally { // Remove the cancellation listener. if (cancellationSignal != null) { cancellationSignal.setOnCancelListener(null); } } }

1.1.2 先看尝试获取可写连接 tryAcquirePrimaryConnectionLocked:
private SQLiteConnection tryAcquirePrimaryConnectionLocked(int connectionFlags) { // If the primary connection is available, acquire it now. //同时只能存在一个可写连接,用一个成员变量mAvailablePrimaryConnection缓存空闲连接 SQLiteConnection connection = mAvailablePrimaryConnection; if (connection != null) { //如果有可用的可写连接,获取到之后就把mAvailablePrimaryConnection缓存置空 mAvailablePrimaryConnection = null; //保存获取到的可写连接,并通过connectionFlags判断这个连接是否是只读的 finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; }// Make sure that the primary connection actually exists and has just been acquired. //如果上一个if造成mAvailablePrimaryConnection缓存置空,mAcquiredConnections中就会有一个primary connection,这里就会返回null。上一层的waitForConnection接到null会进入等待状态,这个后面讨论。 //此处mAcquiredConnections在上一个if中finishAcquireConnectionLocked方法赋值 for (SQLiteConnection acquiredConnection : mAcquiredConnections.keySet()) { if (acquiredConnection.isPrimaryConnection()) { return null; } }// Uhoh.No primary connection!Either this is the first time we asked // for it, or maybe it leaked? //如果没有在上面返回null,那么这一定是第一次请求primary connnection,或者有一个连接泄露了(未recycle的情况下finalize)。 //去创建并打开一个新的SQLiteConnection。 connection = openConnectionLocked(mConfiguration, true /*primaryConnection*/); // might throw //将这个SQLiteConnection保存到mAcquiredConnections这个Map中 finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; }

1.1.1 尝试获取只读连接 tryAcquireNonPrimaryConnectionLocked:
private SQLiteConnection tryAcquireNonPrimaryConnectionLocked( String sql, int connectionFlags) { // Try to acquire the next connection in the queue. SQLiteConnection connection; //只读连接可以有多个,用一个ArrayList缓存了所有空闲连接 final int availableCount = mAvailableNonPrimaryConnections.size(); //如果有缓存空闲连接 那么选一个sql语句一样的连接,并保存在mAcquiredConnections中 if (availableCount > 1 && sql != null) { // If we have a choice, then prefer a connection that has the // prepared statement in its cache. for (int i = 0; i < availableCount; i++) { connection = mAvailableNonPrimaryConnections.get(i); if (connection.isPreparedStatementInCache(sql)) { mAvailableNonPrimaryConnections.remove(i); finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; } } }//如果有缓存空闲连接但没有sql语句一样的 那么选最后一个,并保存在mAcquiredConnections中 if (availableCount > 0) { // Otherwise, just grab the next one. connection = mAvailableNonPrimaryConnections.remove(availableCount - 1); finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; }// 一个空闲连接都没有。 // Expand the pool if needed. // 计算有多少连接(空闲+使用中)。这里肯定没有空闲non primary连接了,而如果有空闲primary连接,则要 += 1。 int openConnections = mAcquiredConnections.size(); if (mAvailablePrimaryConnection != null) { openConnections += 1; } if (openConnections >= mMaxConnectionPoolSize) { // 超过数据库连接限制,放弃治疗。连接限制与数据库底层实现有关。 return null; }// 没超限,还能再开一个连接。所以开连接并返回。 connection = openConnectionLocked(mConfiguration, false /*primaryConnection*/); // might throw // 保存这个连接到mAcquiredConnections中。 finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; }

2.获取到连接后,SQLiteConnection.execute ,这里貌似是检查sql语句的,最后调用了native方法,这里不关注。 beginTransaction:开启事务的时候传入的readOnly为false,所以SQLiteSession会从SQLiteConnectionPool中获取一个独占的连接。并且在SQLiteSession执行其它SQL语句的情况下,执行完成会将连接释放回连接池,而beginTransaction操作则不会,而是持有这一个连接直至同一线程内调用endTransaction。
二.db.setTransactionSuccessful():
public void setTransactionSuccessful() { throwIfNoTransaction(); throwIfTransactionMarkedSuccessful(); //主要干了一件事就是设置事务的标记位 mMarkedSuccessful为true mTransactionStack.mMarkedSuccessful = true; }

三.endTransaction():
public void endTransaction(CancellationSignal cancellationSignal) { throwIfNoTransaction(); assert mConnection != null; endTransactionUnchecked(cancellationSignal, false); }

private void endTransactionUnchecked(CancellationSignal cancellationSignal, boolean yielding) { if (cancellationSignal != null) { cancellationSignal.throwIfCanceled(); }final Transaction top = mTransactionStack; //判断是否成功 boolean successful = (top.mMarkedSuccessful || yielding) && !top.mChildFailed; RuntimeException listenerException = null; final SQLiteTransactionListener listener = top.mListener; if (listener != null) { try { if (successful) { //提交 listener.onCommit(); // might throw } else { //回滚 listener.onRollback(); // might throw } } catch (RuntimeException ex) { listenerException = ex; successful = false; } }//创建事务的时候 top.mParent为null mTransactionStack = top.mParent; //回收这个事务 recycleTransaction(top); if (mTransactionStack != null) { if (!successful) { mTransactionStack.mChildFailed = true; } } else { try { if (successful) { //如果成功 那么调用native方法去执行 mConnection.execute("COMMIT; ", null, cancellationSignal); // might throw } else { mConnection.execute("ROLLBACK; ", null, cancellationSignal); // might throw } } finally { //1.释放连接 releaseConnection(); // might throw } }if (listenerException != null) { throw listenerException; } }

1.释放连接 releaseConnection:
private void releaseConnection() { assert mConnection != null; assert mConnectionUseCount > 0; if (--mConnectionUseCount == 0) { try { //mConnection 在我们开启事务的时候获取的连接 默认开启是ReadOnly为false,获取的是primaryConnection 唯一的连接 //这里尝试去释放它 mConnectionPool.releaseConnection(mConnection); // might throw } finally { mConnection = null; } } }

public void releaseConnection(SQLiteConnection connection) { synchronized (mLock) { if (mIdleConnectionHandler != null) { //释放连接 mIdleConnectionHandler.connectionReleased(connection); } //在缓存里移除这个连接 AcquiredConnectionStatus status = mAcquiredConnections.remove(connection); if (status == null) { throw new IllegalStateException("Cannot perform this operation " + "because the specified connection was not acquired " + "from this pool or has already been released."); }if (!mIsOpen) { closeConnectionAndLogExceptionsLocked(connection); } else if (connection.isPrimaryConnection()) {//这里置空我们当时获取并赋值的唯一缓存mAvailablePrimaryConnection if (recycleConnectionLocked(connection, status)) { assert mAvailablePrimaryConnection == null; mAvailablePrimaryConnection = connection; } wakeConnectionWaitersLocked(); } else if (mAvailableNonPrimaryConnections.size() >= mMaxConnectionPoolSize - 1) { closeConnectionAndLogExceptionsLocked(connection); } else { if (recycleConnectionLocked(connection, status)) { mAvailableNonPrimaryConnections.add(connection); } wakeConnectionWaitersLocked(); } } }

总结:在开启一个事务后,SQLiteSession会从SQLiteConnectionPool中获取一个独占的连接。在SQLiteSession执行其它SQL语句的情况下,执行完成会将连接释放回连接池,而beginTransaction操作则不会,而是持有这一个连接直至同一线程内调用endTransaction后释放这个连接。所以,当有一个线程在transaction过程中时,其它线程的写操作和beginTransaction操作都会被阻塞住,直至当前线程的transaction完成才会按照优先级挑选一个线程继续。 参考:Android数据库源码分析(3)-连接缓存池SQLiteConnectionPool

    推荐阅读