Commit 551df1b3 authored by unknown's avatar unknown
Browse files

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb

into mysql.com:/usr/local/home/marty/MySQL/test/mysql-5.0-ndb

parents 0463c736 6fc95e50
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
@@ -1011,6 +1011,24 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
    out << "Type" << (Uint32)col.getType();
    break;
  }
  // show unusual (non-MySQL) array size
  if (col.getLength() != 1) {
    switch (col.getType()) {
    case NdbDictionary::Column::Char:
    case NdbDictionary::Column::Varchar:
    case NdbDictionary::Column::Binary:
    case NdbDictionary::Column::Varbinary:
    case NdbDictionary::Column::Blob:
    case NdbDictionary::Column::Text:
    case NdbDictionary::Column::Bit:
    case NdbDictionary::Column::Longvarchar:
    case NdbDictionary::Column::Longvarbinary:
      break;
    default:
      out << " [" << col.getLength() << "]";
      break;
    }
  }
  if (col.getPrimaryKey())
    out << " PRIMARY KEY";
  else if (! col.getNullable())
+12 −8
Original line number Diff line number Diff line
@@ -1026,9 +1026,12 @@ NdbOperation::branch_col(Uint32 type,
    abort();
  }

  Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
  if (val == NULL)
    len = 0;
  else {
    if (! col->getCharType()) {
      // prevent assert in NdbSqlUtil on length error
      Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
      if (len != 0 && len != sizeInBytes)
      {
        setErrorCodeAbort(4209);
@@ -1036,6 +1039,7 @@ NdbOperation::branch_col(Uint32 type,
      }
      len = sizeInBytes;
    }
  }

  if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
    return -1;
+3 −2
Original line number Diff line number Diff line
@@ -139,8 +139,9 @@ NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){

static void
ndbrecattr_print_string(NdbOut& out, const char *type,
			const char *ref, unsigned sz)
			const char *aref, unsigned sz)
{
  const unsigned char* ref = (const unsigned char*)aref;
  int i, len, printable= 1;
  // trailing zeroes are not printed
  for (i=sz-1; i >= 0; i--)
@@ -166,7 +167,7 @@ ndbrecattr_print_string(NdbOut& out, const char *type,
    for (i= len+1; ref[i] != 0; i++)
    out.print("%u]",len-i);
    assert((int)sz > i);
    ndbrecattr_print_string(out,type,ref+i,sz-i);
    ndbrecattr_print_string(out,type,aref+i,sz-i);
  }
}

+169 −7
Original line number Diff line number Diff line
@@ -612,7 +612,10 @@ struct Col {
  bool m_pk;
  Type m_type;
  unsigned m_length;
  unsigned m_bytelength;
  unsigned m_bytelength;        // multiplied by char width
  unsigned m_attrsize;          // base type size
  unsigned m_headsize;          // length bytes
  unsigned m_bytesize;          // full value size
  bool m_nullable;
  const Chs* m_chs;
  Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
@@ -629,12 +632,26 @@ Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type typ
  m_type(type),
  m_length(length),
  m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
  m_attrsize(
      type == Unsigned ? sizeof(Uint32) :
      type == Char ? sizeof(char) :
      type == Varchar ? sizeof(char) :
      type == Longvarchar ? sizeof(char) : ~0),
  m_headsize(
      type == Unsigned ? 0 :
      type == Char ? 0 :
      type == Varchar ? 1 :
      type == Longvarchar ? 2 : ~0),
  m_bytesize(m_headsize + m_attrsize * m_bytelength),
  m_nullable(nullable),
  m_chs(chs)
{
  // fix long varchar
  if (type == Varchar && m_bytelength > 255)
  if (type == Varchar && m_bytelength > 255) {
    m_type = Longvarchar;
    m_headsize += 1;
    m_bytesize += 1;
  }
}

Col::~Col()
@@ -1119,13 +1136,15 @@ struct Con {
  NdbIndexOperation* m_indexop;
  NdbScanOperation* m_scanop;
  NdbIndexScanOperation* m_indexscanop;
  NdbScanFilter* m_scanfilter;
  enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
  ScanMode m_scanmode;
  enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther };
  ErrType m_errtype;
  Con() :
    m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
    m_scanop(0), m_indexscanop(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
    m_scanop(0), m_indexscanop(0), m_scanfilter(0),
    m_scanmode(ScanNo), m_errtype(ErrNone) {}
  ~Con() {
    if (m_tx != 0)
      closeTransaction();
@@ -1140,10 +1159,14 @@ struct Con {
  int getNdbScanOperation(const Tab& tab);
  int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
  int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
  int getNdbScanFilter();
  int equal(int num, const char* addr);
  int getValue(int num, NdbRecAttr*& rec);
  int setValue(int num, const char* addr);
  int setBound(int num, int type, const void* value);
  int beginFilter(int group);
  int endFilter();
  int setFilter(int num, int cond, const void* value, unsigned len);
  int execute(ExecType t);
  int execute(ExecType t, bool& deadlock);
  int readTuples(Par par);
@@ -1253,6 +1276,15 @@ Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
  return 0;
}

int
Con::getNdbScanFilter()
{
  assert(m_tx != 0 && m_scanop != 0);
  delete m_scanfilter;
  m_scanfilter = new NdbScanFilter(m_scanop);
  return 0;
}

int
Con::equal(int num, const char* addr)
{
@@ -1280,11 +1312,35 @@ Con::setValue(int num, const char* addr)
int
Con::setBound(int num, int type, const void* value)
{
  assert(m_tx != 0 && m_op != 0);
  assert(m_tx != 0 && m_indexscanop != 0);
  CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
  return 0;
}

int
Con::beginFilter(int group)
{
  assert(m_tx != 0 && m_scanfilter != 0);
  CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
  return 0;
}

int
Con::endFilter()
{
  assert(m_tx != 0 && m_scanfilter != 0);
  CHKCON(m_scanfilter->end() == 0, *this);
  return 0;
}

int
Con::setFilter(int num, int cond, const void* value, unsigned len)
{
  assert(m_tx != 0 && m_scanfilter != 0);
  CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
  return 0;
}

int
Con::execute(ExecType t)
{
@@ -1502,7 +1558,7 @@ createtable(Par par)
    const Col& col = *tab.m_col[k];
    NdbDictionary::Column c(col.m_name);
    c.setType((NdbDictionary::Column::Type)col.m_type);
    c.setLength(col.m_bytelength); // NDB API uses length in bytes
    c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
    c.setPrimaryKey(col.m_pk);
    c.setNullable(col.m_nullable);
    if (col.m_chs != 0)
@@ -2836,6 +2892,7 @@ struct BVal : public Val {
  int m_type;
  BVal(const ICol& icol);
  int setbnd(Par par) const;
  int setflt(Par par) const;
};

BVal::BVal(const ICol& icol) :
@@ -2855,6 +2912,27 @@ BVal::setbnd(Par par) const
  return 0;
}

int
BVal::setflt(Par par) const
{
  static unsigned index_bound_to_filter_bound[5] = {
    NdbScanFilter::COND_GE,
    NdbScanFilter::COND_GT,
    NdbScanFilter::COND_LE,
    NdbScanFilter::COND_LT,
    NdbScanFilter::COND_EQ
  };
  Con& con = par.con();
  assert(g_compare_null || ! m_null);
  const char* addr = ! m_null ? (const char*)dataaddr() : 0;
  const ICol& icol = m_icol;
  const Col& col = icol.m_col;
  unsigned length = col.m_bytesize;
  unsigned cond = index_bound_to_filter_bound[m_type];
  CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
  return 0;
}

static NdbOut&
operator<<(NdbOut& out, const BVal& bval)
{
@@ -2882,6 +2960,7 @@ struct BSet {
  void calc(Par par);
  void calcpk(Par par, unsigned i);
  int setbnd(Par par) const;
  int setflt(Par par) const;
  void filter(Par par, const Set& set, Set& set2) const;
};

@@ -3005,6 +3084,33 @@ BSet::setbnd(Par par) const
  return 0;
}

int
BSet::setflt(Par par) const
{
  Con& con = par.con();
  CHK(con.getNdbScanFilter() == 0);
  CHK(con.beginFilter(NdbScanFilter::AND) == 0);
  if (m_bvals != 0) {
    unsigned p1 = urandom(m_bvals);
    unsigned p2 = 10009;        // prime
    const unsigned extras = 5;
    // random order
    for (unsigned j = 0; j < m_bvals + extras; j++) {
      unsigned k = p1 + p2 * j;
      const BVal& bval = *m_bval[k % m_bvals];
      CHK(bval.setflt(par) == 0);
    }
    // duplicate
    if (urandom(5) == 0) {
      unsigned k = urandom(m_bvals);
      const BVal& bval = *m_bval[k];
      CHK(bval.setflt(par) == 0);
    }
  }
  CHK(con.endFilter() == 0);
  return 0;
}

void
BSet::filter(Par par, const Set& set, Set& set2) const
{
@@ -3593,6 +3699,62 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
  return 0;
}

static int
scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
{
  Con& con = par.con();
  const Tab& tab = par.tab();
  const Set& set = par.set();
  Set set1(tab, set.m_rows);
  if (calc) {
    while (true) {
      bset.calc(par);
      bset.filter(par, set, set1);
      unsigned n = set1.count();
      // prefer proper subset
      if (0 < n && n < set.m_rows)
        break;
      if (urandom(3) == 0)
        break;
      set1.reset();
    }
  } else {
    bset.filter(par, set, set1);
  }
  LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
  Set set2(tab, set.m_rows);
  CHK(con.startTransaction() == 0);
  CHK(con.getNdbScanOperation(tab) == 0);
  CHK(con.readTuples(par) == 0);
  CHK(bset.setflt(par) == 0);
  set2.getval(par);
  CHK(con.executeScan() == 0);
  unsigned n = 0;
  bool deadlock = false;
  while (1) {
    int ret;
    deadlock = par.m_deadlock;
    CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
    if (ret == 1)
      break;
    if (deadlock) {
      LL1("scanfilter: stop on deadlock");
      break;
    }
    unsigned i = (unsigned)-1;
    CHK(set2.getkey(par, &i) == 0);
    CHK(set2.putval(i, par.m_dups, n) == 0);
    LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
    n++;
  }
  con.closeTransaction();
  if (par.m_verify) {
    CHK(set1.verify(par, set2) == 0);
  }
  LL3("scanfilter " << itab.m_name << " done rows=" << n);
  return 0;
}

static int
scanreadindex(Par par, const ITab& itab)
{
@@ -3600,6 +3762,7 @@ scanreadindex(Par par, const ITab& itab)
  for (unsigned i = 0; i < par.m_subsubloop; i++) {
    if (itab.m_type == ITab::OrderedIndex) {
      BSet bset(tab, itab, par.m_rows);
      CHK(scanreadfilter(par, itab, bset, true) == 0);
      CHK(scanreadindex(par, itab, bset, true) == 0);
    }
  }
@@ -3626,7 +3789,6 @@ scanreadindex(Par par)
static int
scanreadall(Par par)
{
  if (par.m_no < 11)
  CHK(scanreadtable(par) == 0);
  CHK(scanreadindex(par) == 0);
  return 0;