Commit fd7106a4 authored by heikki@hundin.mysql.fi's avatar heikki@hundin.mysql.fi
Browse files

lock0lock.c:

  Eliminate a spurious deadlock when an insert waits for an update and a second update comes to wait after the insert
parent a9c1508d
Loading
Loading
Loading
Loading
+142 −62
Original line number Diff line number Diff line
@@ -53,10 +53,9 @@ bump into an x-lock set there.

Different transaction can have conflicting locks set on the gap at the
same time. The locks on the gap are purely inhibitive: an insert cannot
be made, or a select cursor may have to wait, if a different transaction
be made, or a select cursor may have to wait if a different transaction
has a conflicting lock on the gap. An x-lock on the gap does not give
the right to insert into the gap if there are conflicting locks granted
on the gap at the same time.
the right to insert into the gap.

An explicit lock can be placed on a user record or the supremum record of
a page. The locks on the supremum record are always thought to be of the gap
@@ -88,7 +87,7 @@ locks.
RULE 4: If a there is a waiting lock request in a queue, no lock request,
-------
gap or not, can be inserted ahead of it in the queue. In record deletes
and page splits, new gap type locks can be created by the database manager
and page splits new gap type locks can be created by the database manager
for a transaction, and without rule 4, the waits-for graph of transactions
might become cyclic without the database noticing it, as the deadlock check
is only performed when a transaction itself requests a lock!
@@ -839,40 +838,93 @@ lock_mode_compatible(
}

/*************************************************************************
Returns LOCK_X if mode is LOCK_S, and vice versa. */
Checks if a lock request for a new lock has to wait for request lock2. */
UNIV_INLINE
ulint
lock_get_confl_mode(
/*================*/
			/* out: conflicting basic lock mode */
	ulint	mode)	/* in: LOCK_S or LOCK_X */
ibool
lock_rec_has_to_wait(
/*=================*/
			/* out: TRUE if new lock has to wait for lock2 to be
			removed */
	trx_t*	trx,	/* in: trx of new lock */
	ulint	mode,	/* in: LOCK_S or LOCK_X */
	ulint	gap,	/* in: LOCK_GAP or 0 */
	ulint	insert_intention,
			/* in: LOCK_INSERT_INTENTION or 0 */
	lock_t*	lock2)	/* in: another record lock; NOTE that it is assumed
			that this has a lock bit set on the same record as
			in lock1 */
{
	ut_ad(mode == LOCK_X || mode == LOCK_S);
	ut_ad(trx && lock2);
	ut_ad(lock_get_type(lock2) == LOCK_REC);
	ut_ad(mode == LOCK_S || mode == LOCK_X);
	ut_ad(gap == LOCK_GAP || gap == 0);
	ut_ad(insert_intention == LOCK_INSERT_INTENTION
	      				|| insert_intention == 0);

	if (trx != lock2->trx && !lock_mode_compatible(mode,
				     		lock_get_mode(lock2))) {

		/* We have somewhat complex rules when gap type
		record locks cause waits */

		if (!gap && lock_rec_get_insert_intention(lock2)) {

	if (mode == LOCK_S) {
			/* Request of a full next-key record does not
			need to wait for an insert intention lock to be
			removed. This is ok since our rules allow conflicting
			locks on gaps. This eliminates a spurious deadlock
			caused by a next-key lock waiting for an insert
			intention lock; when the insert intention lock was
			granted, the insert deadlocked on the waiting
			next-key lock. */
				
			return(FALSE);
		}

		if (insert_intention && lock_rec_get_insert_intention(lock2)) {

			/* An insert intention is not disturbed by another
			insert intention; this removes a spurious deadlock
			caused by inserts which had to wait for a next-key
			lock to be removed */

			return(FALSE);
		}

		return(LOCK_X);
		return(TRUE);
	}

	return(LOCK_S);
	return(FALSE);
}

/*************************************************************************
Checks if a lock request lock1 has to wait for request lock2. NOTE that we,
for simplicity, ignore the gap bits in locks, and treat gap type lock
requests like non-gap lock requests. */
UNIV_INLINE
Checks if a lock request lock1 has to wait for request lock2. */
static
ibool
lock_has_to_wait(
/*=============*/
			/* out: TRUE if lock1 has to wait lock2 to be removed */
	lock_t*	lock1,	/* in: waiting record lock */
			/* out: TRUE if lock1 has to wait for lock2 to be
			removed */
	lock_t*	lock1,	/* in: waiting lock */
	lock_t*	lock2)	/* in: another lock; NOTE that it is assumed that this
			has a lock bit set on the same record as in lock1 */
			has a lock bit set on the same record as in lock1 if
			the locks are record locks */
{
	ut_ad(lock1 && lock2);

	if (lock1->trx != lock2->trx
			&& !lock_mode_compatible(lock_get_mode(lock1),
				     		lock_get_mode(lock2))) {
		if (lock_get_type(lock1) == LOCK_REC) {
			ut_ad(lock_get_type(lock2) == LOCK_REC);
				
			return(lock_rec_has_to_wait(lock1->trx,
				lock_get_mode(lock1),
				lock_rec_get_gap(lock1),
				lock_rec_get_insert_intention(lock1),
				lock2));
		}

		return(TRUE);
	}

@@ -1021,6 +1073,7 @@ lock_rec_get_next_on_page(
	ulint	page_no;

	ut_ad(mutex_own(&kernel_mutex));
	ut_ad(lock_get_type(lock) == LOCK_REC);

	space = lock->un_member.rec_lock.space;
	page_no = lock->un_member.rec_lock.page_no;
@@ -1147,6 +1200,7 @@ lock_rec_get_next(
	lock_t*	lock)	/* in: lock */
{
	ut_ad(mutex_own(&kernel_mutex));
	ut_ad(lock_get_type(lock) == LOCK_REC);

	for (;;) {
		lock = lock_rec_get_next_on_page(lock);
@@ -1204,6 +1258,8 @@ lock_rec_bitmap_reset(
	ulint	n_bytes;
	ulint	i;

	ut_ad(lock_get_type(lock) == LOCK_REC);

	/* Reset to zero the bitmap which resides immediately after the lock
	struct */

@@ -1233,6 +1289,8 @@ lock_rec_copy(
	lock_t*	dupl_lock;
	ulint	size;

	ut_ad(lock_get_type(lock) == LOCK_REC);

	size = sizeof(lock_t) + lock_rec_get_n_bits(lock) / 8;	

	dupl_lock = mem_heap_alloc(heap, size);
@@ -1348,6 +1406,7 @@ lock_rec_has_expl(
		if (lock->trx == trx
		    && lock_mode_stronger_or_eq(lock_get_mode(lock), mode)
		    && !lock_get_wait(lock)
		    && !lock_rec_get_insert_intention(lock) /* we play safe */
		    && !(lock_rec_get_gap(lock)
					|| page_rec_is_supremum(rec))) {
		    	return(lock);
@@ -1360,9 +1419,8 @@ lock_rec_has_expl(
}
			
/*************************************************************************
Checks if some other transaction has an explicit lock request stronger or
equal to mode on rec or gap, waiting or granted, in the lock queue. */
UNIV_INLINE
Checks if some other transaction has a lock request in the queue. */
static
lock_t*
lock_rec_other_has_expl_req(
/*========================*/
@@ -1372,15 +1430,9 @@ lock_rec_other_has_expl_req(
			into account, or 0 if not */
	ulint	wait,	/* in: LOCK_WAIT if also waiting locks are
			taken into account, or 0 if not */
	ibool	ignore_insert_intention,/* in: FALSE if also locks where the
			LOCK_INSERT_INTENTION
			flag is set should be taken into account; we can
			use this option to prevent unnecessary deadlocks
			between inserts which have needed to set a wait
			lock to wait for some next-key lock to be released */
	rec_t*	rec,	/* in: record to look at */	
	trx_t*	trx)	/* in: transaction, or NULL if requests
			by any transaction are wanted */
	trx_t*	trx)	/* in: transaction, or NULL if requests by all
			transactions are taken into account */
{
	lock_t*	lock;
	
@@ -1388,8 +1440,7 @@ lock_rec_other_has_expl_req(
	ut_ad(mode == LOCK_X || mode == LOCK_S);
	ut_ad(gap == 0 || gap == LOCK_GAP);
	ut_ad(wait == 0 || wait == LOCK_WAIT);
	ut_ad(ignore_insert_intention == TRUE
					|| ignore_insert_intention == FALSE);

	lock = lock_rec_get_first(rec);

	while (lock) {
@@ -1399,14 +1450,45 @@ lock_rec_other_has_expl_req(
		    && (wait || !lock_get_wait(lock))
		    && lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) {

		    	if (ignore_insert_intention
				&& lock_rec_get_insert_intention(lock)) {
		    	return(lock);
		}

				/* We can ignore this lock */
			} else {
		lock = lock_rec_get_next(rec, lock);
	}

		    		return(lock);
	return(NULL);
}

/*************************************************************************
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait. */
static
lock_t*
lock_rec_other_has_conflicting(
/*===========================*/
			/* out: lock or NULL */
	ulint	mode,	/* in: lock mode of the lock we are going to reserve */
	ulint	gap,	/* in: LOCK_GAP if we are going to reserve a gap type
			lock, else 0 */
	ulint	insert_intention,
			/* in: LOCK_INSERT_INTENTION if we are going to
			reserve an insert intention lock */
	rec_t*	rec,	/* in: record to look at */	
	trx_t*	trx)	/* in: our transaction */
{
	lock_t*	lock;
	
	ut_ad(mutex_own(&kernel_mutex));
	ut_ad(mode == LOCK_X || mode == LOCK_S);
	ut_ad(gap == 0 || gap == LOCK_GAP);
	ut_ad(insert_intention == LOCK_INSERT_INTENTION
						|| insert_intention == 0);
	lock = lock_rec_get_first(rec);

	while (lock) {
		if (lock_rec_has_to_wait(trx, mode, gap, insert_intention,
								lock)) {
			return(lock);
		}
		
		lock = lock_rec_get_next(rec, lock);
@@ -1584,7 +1666,7 @@ lock_rec_enqueue_waiting(
	ulint		type_mode,/* in: lock mode this transaction is
				requesting: LOCK_S or LOCK_X, ORed with
				LOCK_GAP if a gap lock is requested, ORed
				with LOCK_INSERT_INTENTION if the waiting
				with LOCK_INSERT_INTENTION if this waiting
				lock request is set when performing an
				insert of an index record */
	rec_t*		rec,	/* in: record */
@@ -1674,11 +1756,11 @@ lock_rec_add_to_queue(
	ut_ad((type_mode & (LOCK_WAIT | LOCK_GAP))
	      || ((type_mode & LOCK_MODE_MASK) != LOCK_S)
	      || !lock_rec_other_has_expl_req(LOCK_X, 0, LOCK_WAIT,
						FALSE, rec, trx));
						rec, trx));
	ut_ad((type_mode & (LOCK_WAIT | LOCK_GAP))
	      || ((type_mode & LOCK_MODE_MASK) != LOCK_X)
	      || !lock_rec_other_has_expl_req(LOCK_S, 0, LOCK_WAIT,
						FALSE,rec, trx));
						rec, trx));
	type_mode = type_mode | LOCK_REC;

	page = buf_frame_align(rec);
@@ -1794,7 +1876,6 @@ lock_rec_lock_slow(
	dict_index_t*	index,	/* in: index of record */
	que_thr_t* 	thr)	/* in: query thread */
{
	ulint	confl_mode;
	trx_t*	trx;
	ulint	err;

@@ -1802,7 +1883,6 @@ lock_rec_lock_slow(
	ut_ad((mode == LOCK_X) || (mode == LOCK_S));

	trx = thr_get_trx(thr);
	confl_mode = lock_get_confl_mode(mode);

	ut_ad((mode != LOCK_S) || lock_table_has(trx, index->table,
								LOCK_IS));
@@ -1813,8 +1893,8 @@ lock_rec_lock_slow(
		nothing */

		err = DB_SUCCESS;
	} else if (lock_rec_other_has_expl_req(confl_mode, 0, LOCK_WAIT,
							FALSE, rec, trx)) {
	} else if (lock_rec_other_has_conflicting(mode, 0, 0, rec, trx)) {

		/* If another transaction has a non-gap conflicting request in
		the queue, as this transaction does not have a lock strong
		enough already granted on the record, we have to wait. */
@@ -1875,9 +1955,7 @@ lock_rec_lock(
}

/*************************************************************************
Checks if a waiting record lock request still has to wait in a queue.
NOTE that we, for simplicity, ignore the gap bits in locks, and treat
gap type lock requests like non-gap lock requests. */
Checks if a waiting record lock request still has to wait in a queue. */
static
ibool
lock_rec_has_to_wait_in_queue(
@@ -1892,6 +1970,7 @@ lock_rec_has_to_wait_in_queue(

	ut_ad(mutex_own(&kernel_mutex));
 	ut_ad(lock_get_wait(wait_lock));
	ut_ad(lock_get_type(wait_lock) == LOCK_REC);
 	
	space = wait_lock->un_member.rec_lock.space;
	page_no = wait_lock->un_member.rec_lock.page_no;
@@ -1901,8 +1980,8 @@ lock_rec_has_to_wait_in_queue(

	while (lock != wait_lock) {

		if (lock_has_to_wait(wait_lock, lock)
				&& lock_rec_get_nth_bit(lock, heap_no)) {
		if (lock_rec_get_nth_bit(lock, heap_no)
		    && lock_has_to_wait(wait_lock, lock)) {

		    	return(TRUE);
		}
@@ -1945,6 +2024,7 @@ lock_rec_cancel(
	lock_t*	lock)	/* in: waiting record lock request */
{
	ut_ad(mutex_own(&kernel_mutex));
	ut_ad(lock_get_type(lock) == LOCK_REC);

	/* Reset the bit (there can be only one set bit) in the lock bitmap */
	lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
@@ -3944,7 +4024,7 @@ lock_rec_queue_validate(
		impl_trx = lock_clust_rec_some_has_impl(rec, index);

		if (impl_trx && lock_rec_other_has_expl_req(LOCK_S, 0,
				LOCK_WAIT, FALSE, rec, impl_trx)) {
				LOCK_WAIT, rec, impl_trx)) {

			ut_a(lock_rec_has_expl(LOCK_X, rec, impl_trx));
		}
@@ -3959,7 +4039,7 @@ lock_rec_queue_validate(
		impl_trx = lock_sec_rec_some_has_impl_off_kernel(rec, index);

		if (impl_trx && lock_rec_other_has_expl_req(LOCK_S, 0,
				LOCK_WAIT, FALSE, rec, impl_trx)) {
				LOCK_WAIT, rec, impl_trx)) {

			ut_a(lock_rec_has_expl(LOCK_X, rec, impl_trx));
		}
@@ -3984,10 +4064,10 @@ lock_rec_queue_validate(
		
			if (lock_get_mode(lock) == LOCK_S) {
				ut_a(!lock_rec_other_has_expl_req(LOCK_X,
						0, 0, FALSE, rec, lock->trx));
						0, 0, rec, lock->trx));
			} else {
				ut_a(!lock_rec_other_has_expl_req(LOCK_S,
						0, 0, FALSE, rec, lock->trx));
						0, 0, rec, lock->trx));
			}

		} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
@@ -4235,11 +4315,11 @@ lock_rec_insert_check_and_lock(
	had to wait for their insert. Both had waiting gap type lock requests
	on the successor, which produced an unnecessary deadlock. */

	if (lock_rec_other_has_expl_req(LOCK_S, LOCK_GAP, LOCK_WAIT,
						TRUE, next_rec, trx)) {
	if (lock_rec_other_has_conflicting(LOCK_X, LOCK_GAP,
				LOCK_INSERT_INTENTION, next_rec, trx)) {
		err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
					| LOCK_INSERT_INTENTION, next_rec,
								index, thr);
						| LOCK_INSERT_INTENTION,
						next_rec, index, thr);
	} else {
		err = DB_SUCCESS;
	}