Commit 6be5f8a3 authored by unknown's avatar unknown
Browse files

ndb - Fix problematic handling of TUP_ALLOC/DEALLOC REQ wrt varsize

    same row -
      prepare delete, prepare insert (with different size), commit delete, commit insert would lead to assertion
        as prepare insert didnt call handle_size_change_after_update

parent ee740746
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -2103,6 +2103,7 @@ private:
  }

  void prepare_initial_insert(KeyReqStruct*, Operationrec*, Tablerec*);
  void fix_disk_insert_no_mem_insert(KeyReqStruct*, Operationrec*, Tablerec*);
  void setup_fixed_part(KeyReqStruct* req_struct,
			Operationrec* const regOperPtr,
			Tablerec* const regTabPtr);
+146 −97
Original line number Diff line number Diff line
@@ -1298,12 +1298,56 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
    disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
}

void
Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct, 
				     Operationrec* regOperPtr,
				     Tablerec* regTabPtr)
{
  regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
  req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
  
  const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
  const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
  Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);

  if(cnt1)
  {
    // Disk part is 32-bit aligned
    char *varptr = req_struct->m_var_data[MM].m_data_ptr;
    ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
  } 
  else
  {
    ptr -= Tuple_header::HeaderSize;
  }

  req_struct->m_disk_ptr= (Tuple_header*)ptr;
  
  if(cnt2)
  {
    KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
    ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
    dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
    dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
    dst->m_var_len_offset= cnt2;
    dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
  }
  
  // Set all null bits
  memset(req_struct->m_disk_ptr->m_null_bits+
	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF, 
	 4*regTabPtr->m_offsets[DD].m_null_words);
  req_struct->m_tuple_ptr->m_header_bits = 
    (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
}

int Dbtup::handleInsertReq(Signal* signal,
                           Ptr<Operationrec> regOperPtr,
                           Ptr<Fragrecord> fragPtr,
                           Tablerec* const regTabPtr,
                           KeyReqStruct *req_struct)
{
  Uint32 tup_version = 1;
  Fragrecord* regFragPtr = fragPtr.p;
  Uint32 *dst, *ptr= 0;
  Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
@@ -1320,29 +1364,24 @@ int Dbtup::handleInsertReq(Signal* signal,
  ndbout << "dst: " << hex << UintPtr(dst) << " - " 
	 << regOperPtr.p->m_copy_tuple_location << endl;
  
  bool disk = regTabPtr->m_no_of_disk_attributes > 0;
  bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
  bool disk_insert = regOperPtr.p->is_first_operation() && disk;

  Uint32 tup_version;
  union {
    Uint32 sizes[4];
    Uint64 cmp[2];
  };
  if(regOperPtr.p->is_first_operation())

  if(mem_insert)
  {
    tup_version= 1;
    jam();
    ndbassert(regOperPtr.p->is_first_operation()); // disk insert
    prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
    if(regTabPtr->m_no_of_disk_attributes)
    {
      int res;
      if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
					regOperPtr.p->m_undo_buffer_space)))
      {
	terrorCode= res;
	regOperPtr.p->m_undo_buffer_space= 0;
	goto log_space_error;
      }
    }
  }
  else
  {
    if (!regOperPtr.p->is_first_operation())
    {
      Operationrec* prevOp= req_struct->prevOpPtr.p;
      ndbassert(prevOp->op_struct.op_type == ZDELETE);
@@ -1350,12 +1389,31 @@ int Dbtup::handleInsertReq(Signal* signal,
      
      if(!prevOp->is_first_operation())
	org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
    }

    if (regTabPtr->need_expand())
      expand_tuple(req_struct, sizes, org, regTabPtr, true);
      expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
    else
      memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
  }
  
  if (disk_insert)
  {
    int res;
    if (unlikely(!mem_insert))
    {
      sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
      fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
    }
    if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
				      regOperPtr.p->m_undo_buffer_space)))
    {
      terrorCode= res;
      regOperPtr.p->m_undo_buffer_space= 0;
      goto log_space_error;
    }
  }
  
  regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
  tuple_ptr->set_tuple_version(tup_version);
  if(updateAttributes(req_struct, &cinBuffer[0], 
@@ -1364,7 +1422,6 @@ int Dbtup::handleInsertReq(Signal* signal,

  if (checkNullAttributes(req_struct, regTabPtr) == false)
  {

    goto null_check_error;
  }
  
@@ -1376,12 +1433,9 @@ int Dbtup::handleInsertReq(Signal* signal,
  /**
   * Alloc memory
   */
  if(regOperPtr.p->is_first_operation())
  {
  Uint32 frag_page_id = req_struct->frag_page_id;
  Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
    regOperPtr.p->m_tuple_location.m_page_no = frag_page_id;
    if (likely(get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT))
  if(mem_insert)
  {
    if (!regTabPtr->m_attributes[MM].m_no_of_varsize)
    {
@@ -1413,9 +1467,24 @@ int Dbtup::handleInsertReq(Signal* signal,
    
    ((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i;
    ((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC;
    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
  }
  else
  {
    int ret;
    if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
	(ret = handle_size_change_after_update(req_struct,
					       base,
					       regOperPtr.p,
					       regFragPtr,
					       regTabPtr,
					       sizes)))
    {
      return ret;
    }
  }
  
    if (regTabPtr->m_no_of_disk_attributes)
  if (disk_insert)
  {
    Local_key tmp;
    Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ? 
@@ -1431,41 +1500,21 @@ int Dbtup::handleInsertReq(Signal* signal,
    /**
     * Set ref from disk to mm
     */
    Local_key ref = regOperPtr.p->m_tuple_location;
    ref.m_page_no = frag_page_id;
    
    Tuple_header* disk_ptr= req_struct->m_disk_ptr;
    disk_ptr->m_header_bits = 0;
      disk_ptr->m_base_record_ref= regOperPtr.p->m_tuple_location.ref();
    disk_ptr->m_base_record_ref= ref.ref();
  }
  
    regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
    tuple_ptr->m_header_bits |= Tuple_header::ALLOC;
    
    if (regTabPtr->checksumIndicator) {
      jam();
      setChecksum(req_struct->m_tuple_ptr, regTabPtr);
    }
  
    return 0;
  }
  else
  if (regTabPtr->checksumIndicator) 
  {
    if (regTabPtr->checksumIndicator) {
    jam();
    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
  }
    
    if (!regTabPtr->need_shrink() || cmp[0] == cmp[1])
  return 0;
  
    return handle_size_change_after_update(req_struct,
					   base,
					   regOperPtr.p,
					   regFragPtr,
					   regTabPtr,
					   sizes);
  }


  
mem_error:
  terrorCode= ZMEM_NOMEM_ERROR;
  goto error;