Commit e0f91f22 authored by unknown's avatar unknown
Browse files

Bug #17095 Cluster RBR in circle does not terminate

- add any value to ndb
- use it to update correct server id in binlog thread


sql/ha_ndbcluster.cc:
  ndb: use "any value" to set correct server_id
sql/ha_ndbcluster_binlog.cc:
  ndb: use "any value" to set correct server_id
storage/ndb/include/kernel/AttributeHeader.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/kernel/signaldata/SumaImpl.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/ndbapi/NdbDictionary.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/ndbapi/NdbEventOperation.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/ndbapi/NdbOperation.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/suma/Suma.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbEventOperation.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbOperationDefine.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp:
  add any_value as psudo column, updatable from ndbapi
parent 52f82d52
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -2712,6 +2712,9 @@ int ha_ndbcluster::write_row(byte *record)
    op->setValue(no_fields, part_func_value);
  }

  if (thd->slave_thread)
    op->setAnyValue(thd->server_id);

  m_rows_changed++;

  /*
@@ -2992,6 +2995,10 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
      no_fields++;
    op->setValue(no_fields, part_func_value);
  }

  if (thd->slave_thread)
    op->setAnyValue(thd->server_id);

  // Execute update operation
  if (!cursor && execute_no_commit(this,trans,FALSE) != 0) {
    no_uncommitted_rows_execute_failure();
@@ -3047,6 +3054,9 @@ int ha_ndbcluster::delete_row(const byte *record)

    no_uncommitted_rows_update(-1);

    if (thd->slave_thread)
      ((NdbOperation *)trans->getLastDefinedOperation())->setAnyValue(thd->server_id);

    if (!m_primary_key_update)
      // If deleting from cursor, NoCommit will be handled in next_result
      DBUG_RETURN(0);
@@ -3076,6 +3086,9 @@ int ha_ndbcluster::delete_row(const byte *record)
      if ((error= set_primary_key_from_record(op, record)))
        DBUG_RETURN(error);
    }

    if (thd->slave_thread)
      op->setAnyValue(thd->server_id);
  }

  // Execute delete operation
+10 −5
Original line number Diff line number Diff line
@@ -3189,8 +3189,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
  NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
  if (share == ndb_apply_status_share)
    return 0;
  TABLE *table= share->table;

  uint originating_server_id= pOp->getAnyValue();
  if (originating_server_id == 0)
    originating_server_id= ::server_id;

  TABLE *table= share->table;
  DBUG_ASSERT(trans.good());
  DBUG_ASSERT(table != 0);

@@ -3235,7 +3239,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
        DBUG_ASSERT(ret == 0);
      }
      ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
      IF_DBUG(int ret=) trans.write_row(::server_id,
      IF_DBUG(int ret=) trans.write_row(originating_server_id,
                                        injector::transaction::table(table,
                                                                     TRUE),
                                        &b, n_fields, table->record[0]);
@@ -3275,7 +3279,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
      }
      ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
      DBUG_EXECUTE("info", print_records(table, table->record[n]););
      IF_DBUG(int ret =) trans.delete_row(::server_id,
      IF_DBUG(int ret =) trans.delete_row(originating_server_id,
                                          injector::transaction::table(table,
                                                                       TRUE),
                                          &b, n_fields, table->record[n]);
@@ -3305,7 +3309,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
          since table has a primary key, we can do a write
          using only after values
        */
        trans.write_row(::server_id, injector::transaction::table(table, TRUE),
        trans.write_row(originating_server_id,
                        injector::transaction::table(table, TRUE),
                        &b, n_fields, table->record[0]);// after values
      }
      else
@@ -3325,7 +3330,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
        }
        ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
        DBUG_EXECUTE("info", print_records(table, table->record[1]););
        IF_DBUG(int ret =) trans.update_row(::server_id,
        IF_DBUG(int ret =) trans.update_row(originating_server_id,
                                            injector::transaction::table(table,
                                                                         TRUE),
                                            &b, n_fields,
+2 −1
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ public:
  STATIC_CONST( ROWID        = 0xFFF6 );
  STATIC_CONST( ROW_GCI      = 0xFFF5 );
  STATIC_CONST( FRAGMENT_VARSIZED_MEMORY = 0xFFF4 );
  STATIC_CONST( ANY_VALUE    = 0xFFF3 );
  
  // NOTE: in 5.1 ctors and init take size in bytes

+16 −1
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ class FireTrigOrd {
public:
  STATIC_CONST( SignalLength = 8 );
  STATIC_CONST( SignalWithGCILength = 9 );
  STATIC_CONST( SignalWithHashValueLength = 10 );
  STATIC_CONST( SignalLengthSuma = 11 );

private:
  Uint32 m_connectionPtr;
@@ -66,6 +66,7 @@ private:
  Uint32 fragId;
  Uint32 m_gci;
  Uint32 m_hashValue;
  Uint32 m_any_value;
  // Public methods
public:
  Uint32 getConnectionPtr() const;
@@ -86,6 +87,8 @@ public:
  void setGCI(Uint32);
  Uint32 getHashValue() const;
  void setHashValue(Uint32);
  Uint32 getAnyValue() const;
  void setAnyValue(Uint32);
};

inline
@@ -196,5 +199,17 @@ void FireTrigOrd::setHashValue(Uint32 flag)
  m_hashValue = flag;
}

inline
Uint32 FireTrigOrd::getAnyValue() const
{
  return m_any_value;
}

inline
void FireTrigOrd::setAnyValue(Uint32 any_value)
{
  m_any_value = any_value;
}


#endif
+4 −1
Original line number Diff line number Diff line
@@ -303,7 +303,10 @@ struct SubTableData {
  Uint32 tableId;
  Uint32 requestInfo;
  Uint32 logType;
  union {
    Uint32 changeMask;
    Uint32 anyValue;
  };
  Uint32 totalLen;

  static void setOperation(Uint32& ri, Uint32 val) { 
Loading