Commit 38e395aa authored by unknown's avatar unknown
Browse files

WL#2269 Enable query cache for NDB

- Added a thread that fetches commit_count for open tables. This
will mean that NDB will not have to be contacted for every use of a cached query. 


sql/ha_ndbcluster.cc:
  Added a thread that periodically will fetch commit_count 
  for open tables and store that value in share. 
  The commit count value is then used when query cache 
  asks if a cached query can be used. 
  The thread activation interval is regulated by the 
  config variable ndb_cache_check_time, it's default value is 0
  which means that NDB is contacted every time a cached query is reused.
sql/ha_ndbcluster.h:
  Added commit_count to share
  Added ndb_cache_check_time
sql/mysqld.cc:
  Added config variable ndb_cache_check_time
sql/set_var.cc:
  Added config variable ndb_cache_check_time
parent d6747f96
Loading
Loading
Loading
Loading
+193 −0
Original line number Diff line number Diff line
drop table if exists t1;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=5;
reset query cache;
flush status;
CREATE TABLE t1 ( pk int not null primary key,
a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
insert into t1 value (1, 2, 3, 'First row');
select * from t1;
pk	a	b	c
1	2	3	First row
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	1
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	1
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	0
select * from t1;
pk	a	b	c
1	2	3	First row
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	1
update t1 set a=3 where pk=1;
select * from t1;
pk	a	b	c
1	3	3	First row
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	2
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	1
insert into t1 value (2, 7, 8, 'Second row');
insert into t1 value (4, 5, 6, 'Fourth row');
select * from t1;
pk	a	b	c
2	7	8	Second row
4	5	6	Fourth row
1	3	3	First row
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	3
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	1
select * from t1;
pk	a	b	c
2	7	8	Second row
4	5	6	Fourth row
1	3	3	First row
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	2
select * from t1 where b=3;
pk	a	b	c
1	3	3	First row
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	2
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	2
select * from t1 where b=3;
pk	a	b	c
1	3	3	First row
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	3
delete from t1 where c='Fourth row';
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	0
select * from t1 where b=3;
pk	a	b	c
1	3	3	First row
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	3
use test;
select * from t1;
pk	a	b	c
2	7	8	Second row
1	3	3	First row
select * from t1 where b=3;
pk	a	b	c
1	3	3	First row
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	4
update t1 set a=4 where b=3;
use test;
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	0
select * from t1;
pk	a	b	c
2	7	8	Second row
1	4	3	First row
select * from t1;
pk	a	b	c
2	7	8	Second row
1	4	3	First row
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	7
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	5
select * from t1;
pk	a	b	c
2	7	8	Second row
1	4	3	First row
select * from t1;
pk	a	b	c
2	7	8	Second row
1	4	3	First row
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	1
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	7
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	7
begin;
update t1 set a=5 where pk=1;
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	0
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	7
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	7
select * from t1;
pk	a	b	c
2	7	8	Second row
1	4	3	First row
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	1
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	8
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	7
commit;
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	1
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	8
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	7
select * from t1;
pk	a	b	c
2	7	8	Second row
1	5	3	First row
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	9
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	7
select * from t1;
pk	a	b	c
2	7	8	Second row
1	5	3	First row
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	1
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	9
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	8
drop table t1;
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	0
SET GLOBAL query_cache_size=0;
SET GLOBAL ndb_cache_check_time=0;
+74 −0
Original line number Diff line number Diff line
drop table if exists t1, t2;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;
create table t1 (a int) engine=ndbcluster;
create table t2 (a int) engine=ndbcluster;
insert into t1 value (2);
insert into t2 value (3);
select * from t1;
a
2
select * from t2;
a
3
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	2
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	2
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	0
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	0
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	0
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	0
select * from t1;
a
2
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	1
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	1
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	0
update t1 set a=3 where a=2;
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	2
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	2
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	0
select * from t1;
a
3
show status like "Qcache_queries_in_cache";
Variable_name	Value
Qcache_queries_in_cache	2
show status like "Qcache_inserts";
Variable_name	Value
Qcache_inserts	3
show status like "Qcache_hits";
Variable_name	Value
Qcache_hits	0
drop table t1, t2;
+126 −0
Original line number Diff line number Diff line
-- source include/have_query_cache.inc
-- source include/have_ndb.inc

--disable_warnings
drop table if exists t1;
--enable_warnings


# Turn on and reset query cache
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
# Turn on thread that will fetch commit count for open tables
set GLOBAL ndb_cache_check_time=5;
reset query cache;
flush status;

# Wait for thread to wake up and start "working"
sleep 20; 

# Create test table in NDB
CREATE TABLE t1 ( pk int not null primary key,
 a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
insert into t1 value (1, 2, 3, 'First row');

# Perform one query which should be inerted in query cache
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";

# Perform the same query and make sure the query cache is hit
select * from t1;
show status like "Qcache_hits";

# Update the table and make sure the correct data is returned
update t1 set a=3 where pk=1;
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";

# Insert a new record and make sure the correct data is returned
insert into t1 value (2, 7, 8, 'Second row');
insert into t1 value (4, 5, 6, 'Fourth row');
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1;
show status like "Qcache_hits";

# Perform a "new" query and make sure the query cache is not hit
select * from t1 where b=3;
show status like "Qcache_queries_in_cache";
show status like "Qcache_hits";

# Same query again...
select * from t1 where b=3;
show status like "Qcache_hits";

# Delete from the table
delete from t1 where c='Fourth row';
show status like "Qcache_queries_in_cache";
select * from t1 where b=3;
show status like "Qcache_hits";

# Start another connection and check that the query cache is hit
connect (con1,localhost,root,,);
connection con1;
use test;
select * from t1;
select * from t1 where b=3;
show status like "Qcache_hits";

# Update the table and switch to other connection 
update t1 set a=4 where b=3;
connect (con2,localhost,root,,);
connection con2;
use test;
show status like "Qcache_queries_in_cache";
select * from t1;
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1;
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";

# Use transactions and make sure the query cache is not updated until
# transaction is commited
begin;
update t1 set a=5 where pk=1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
commit;
# Sleep to let the query cache thread update commit count
sleep 10;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";

drop table t1;

show status like "Qcache_queries_in_cache";

SET GLOBAL query_cache_size=0;
SET GLOBAL ndb_cache_check_time=0;

+71 −0
Original line number Diff line number Diff line
-- source include/have_query_cache.inc
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc

--disable_warnings
drop table if exists t1, t2;
--enable_warnings


# Turn on and reset query cache on server1
connection server1;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;

# Turn on and reset query cache on server2
connection server2;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;

# Sleep so that the query cache check thread has time to start
sleep 15;


# Create test tables in NDB and load them into cache
# on server1
connection server1;
create table t1 (a int) engine=ndbcluster;
create table t2 (a int) engine=ndbcluster;
insert into t1 value (2);
insert into t2 value (3);
select * from t1;
select * from t2;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";


# Connect server2, load table in to cache, then update the table
connection server2;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
update t1 set a=3 where a=2;

# Sleep so that the query cache check thread has time to run
sleep 5;

# Connect to server1 and check that cache is invalidated 
# and correct data is returned
connection server1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";

drop table t1, t2;

+233 −73
Original line number Diff line number Diff line
@@ -86,6 +86,12 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *, 
				    Uint64* rows, Uint64* commits);

// Util thread variables
static pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
pthread_cond_t COND_ndb_util_thread;
extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
ulong ndb_cache_check_time;

/*
  Dummy buffer to read zero pack_length fields
@@ -3865,6 +3871,7 @@ ha_ndbcluster::~ha_ndbcluster()
}



/*
  Open a table for further use
  - fetch metadata for this table from NDB
@@ -3963,16 +3970,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)

Ndb* check_ndb_in_thd(THD* thd)
{
  DBUG_ENTER("check_ndb_in_thd");
  Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;  
  
  if (!thd_ndb)
  {
    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
      DBUG_RETURN(NULL);
      return NULL;
    thd->transaction.thd_ndb= thd_ndb;
  }
  DBUG_RETURN(thd_ndb->ndb);
  return thd_ndb->ndb;
}


@@ -4310,13 +4315,21 @@ bool ndbcluster_init()
  (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
                   (hash_get_key) ndbcluster_get_key,0,0);
  pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&LOCK_ndb_util_thread,MY_MUTEX_INIT_FAST);
  pthread_cond_init(&COND_ndb_util_thread,NULL);

  ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
  if (ndb_discover_tables() != 0)

  // Create utility thread
  pthread_t tmp;
  if (pthread_create(&tmp,&connection_attrib,ndb_util_thread_func,0))
  {
    DBUG_PRINT("error", ("Could not create ndb utility thread"));
    goto ndbcluster_init_error;
#endif
  }
  
  ndbcluster_inited= 1;
  DBUG_RETURN(FALSE);

 ndbcluster_init_error:
  ndbcluster_end();
  DBUG_RETURN(TRUE);
@@ -4326,12 +4339,19 @@ bool ndbcluster_init()
/*
  End use of the NDB Cluster table handler
  - free all global variables allocated by 
    ndcluster_init()
    ndbcluster_init()
*/

bool ndbcluster_end()
{
  DBUG_ENTER("ndbcluster_end");

  // Kill ndb utility thread
  (void) pthread_mutex_lock(&LOCK_ndb_util_thread);  
  DBUG_PRINT("exit",("killing ndb util thread: %lx",ndb_util_thread));
  (void) pthread_cond_signal(&COND_ndb_util_thread);
  (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);

  if(g_ndb)
    delete g_ndb;
  g_ndb= NULL;
@@ -4342,6 +4362,8 @@ bool ndbcluster_end()
    DBUG_RETURN(0);
  hash_free(&ndbcluster_open_tables);
  pthread_mutex_destroy(&ndbcluster_mutex);
  pthread_mutex_destroy(&LOCK_ndb_util_thread);
  pthread_cond_destroy(&COND_ndb_util_thread);
  ndbcluster_inited= 0;
  DBUG_RETURN(0);
}
@@ -4534,12 +4556,53 @@ const char* ha_ndbcluster::index_type(uint key_number)
    return "HASH";
  }
}

uint8 ha_ndbcluster::table_cache_type()
{
  DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT");
  DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT);
}


uint ndb_get_commitcount(THD* thd, char* dbname, char* tabname, 
			 Uint64* commit_count)
{
  DBUG_ENTER("ndb_get_commitcount");
 
  if (ndb_cache_check_time > 0)
  {
    // Use cached commit_count from share
    char name[FN_REFLEN];
    NDB_SHARE* share;
    (void)strxnmov(name, FN_REFLEN, 
		   "./",dbname,"/",tabname,NullS);
    DBUG_PRINT("info", ("name: %s", name));
    pthread_mutex_lock(&ndbcluster_mutex);
    if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
				   (byte*) name,
				   strlen(name))))
    {
      pthread_mutex_unlock(&ndbcluster_mutex);
      DBUG_RETURN(1);
    }
    *commit_count= share->commit_count;    
    DBUG_PRINT("info", ("commit_count: %d", *commit_count));
    pthread_mutex_unlock(&ndbcluster_mutex);
    DBUG_RETURN(0);
  }
  
  // Get commit_count from NDB
  Ndb *ndb;
  if (!(ndb= check_ndb_in_thd(thd)))
    DBUG_RETURN(1);
  ndb->setDatabaseName(dbname);
  
  if (ndb_get_table_statistics(ndb, tabname, 0, commit_count))
    DBUG_RETURN(1);
  DBUG_RETURN(0);
}


static
my_bool
ndbcluster_cache_retrieval_allowed(
@@ -4561,51 +4624,33 @@ ndbcluster_cache_retrieval_allowed(
				all cached queries with this table*/
{
  DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
  char tabname[128];

  Uint64 commit_count;
  bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
  char* dbname= full_name;
  my_bool is_autocommit;
  {
    int dbname_len= strlen(full_name);
    int tabname_len= full_name_len-dbname_len-1;
    memcpy(tabname, full_name+dbname_len+1, tabname_len);
    tabname[tabname_len]= '\0';
  }
  if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
    is_autocommit = FALSE;
  else
    is_autocommit = TRUE;
  char* tabname= dbname+strlen(dbname)+1;

  DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d",
		      dbname, tabname, is_autocommit));

  if (!is_autocommit)
  {
    DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
		       thd->options & OPTION_NOT_AUTOCOMMIT,
		       thd->options & OPTION_BEGIN));
    // ToDo enable cache inside a transaction
    // no need to invalidate though so leave *engine_data
    DBUG_RETURN(FALSE);
  }
  {
    Ndb *ndb;
    Uint64 commit_count;
    if (!(ndb= check_ndb_in_thd(thd)))
    {
      *engine_data= *engine_data+1; // invalidate
    DBUG_RETURN(FALSE);
    }
    ndb->setDatabaseName(dbname);
    if (ndb_get_table_statistics(ndb, tabname, 0, &commit_count))

  if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
  {
    *engine_data= *engine_data+1; // invalidate
    DBUG_RETURN(FALSE);
  }
  DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu", 
		      *engine_data, commit_count));
  if (*engine_data != commit_count)
  {
    *engine_data= commit_count; // invalidate
    DBUG_PRINT("exit",("Do not use cache, commit_count has changed"));
    DBUG_RETURN(FALSE);
  }
  }
  DBUG_PRINT("exit",("*engine_data=%d ok, use cache",*engine_data));

  DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data));
  DBUG_RETURN(TRUE);
}

@@ -4630,35 +4675,24 @@ ha_ndbcluster::cached_table_registration(
				   invalidate all cached queries with this table*/
{
  DBUG_ENTER("ha_ndbcluster::cached_table_registration");
  my_bool is_autocommit;
  if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
    is_autocommit = FALSE;
  else
    is_autocommit = TRUE;

  bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
  DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
		      m_dbname,m_tabname,is_autocommit));
  if (!is_autocommit)
  {
    DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
		       thd->options & OPTION_NOT_AUTOCOMMIT,
		       thd->options & OPTION_BEGIN));
    // ToDo enable cache inside a transaction
    // no need to invalidate though so leave *engine_data
    DBUG_RETURN(FALSE);
  }
  {

  
  Uint64 commit_count;
    Ndb *ndb= get_ndb();
    ndb->setDatabaseName(m_dbname);
    if (ndb_get_table_statistics(ndb, m_tabname, 0, &commit_count))
  if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
  {
    *engine_data= 0;
    DBUG_PRINT("error", ("Could not get commitcount"))
    DBUG_RETURN(FALSE);
  }
  *engine_data= commit_count;
  }
  *engine_callback= ndbcluster_cache_retrieval_allowed;
  DBUG_PRINT("exit",("*engine_data=%d", *engine_data));
  DBUG_PRINT("exit",("*engine_data=%llu", *engine_data));
  DBUG_RETURN(TRUE);
}

@@ -4700,8 +4734,14 @@ static NDB_SHARE* get_share(const char *table_name)
      }
      thr_lock_init(&share->lock);
      pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
      share->commit_count= 0;
    }
  }
  DBUG_PRINT("share", 
	     ("table_name: %s, length: %d, use_count: %d, commit_count: %d", 
	      share->table_name, share->table_name_length, share->use_count, 
	      share->commit_count));

  share->use_count++;
  pthread_mutex_unlock(&ndbcluster_mutex);
  return share;
@@ -4871,7 +4911,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
      *row_count= sum_rows;
    if(commit_count)
      *commit_count= sum_commits;
    DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
    DBUG_PRINT("exit", ("records: %llu commits: %llu", sum_rows, sum_commits));
    DBUG_RETURN(0);
  } while(0);

@@ -4906,4 +4946,124 @@ int ha_ndbcluster::write_ndb_file()
  DBUG_RETURN(error);
}


// Utility thread main loop
extern "C" pthread_handler_decl(ndb_util_thread_func,arg __attribute__((unused)))
{
  THD *thd; // needs to be first for thread_stack
  int error = 0;
  struct timespec abstime;

  my_thread_init();
  DBUG_ENTER("ndb_util_thread");
  DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));

  thd= new THD; // note that contructor of THD uses DBUG_ !
  THD_CHECK_SENTRY(thd);

  pthread_detach_this_thread();
  ndb_util_thread = pthread_self();

  thd->thread_stack = (char*)&thd; // remember where our stack is
  if (thd->store_globals())
  {
    thd->cleanup();
    delete thd;
    DBUG_RETURN(NULL);
  }

  List<NDB_SHARE> util_open_tables;
  set_timespec(abstime, ndb_cache_check_time);
  for (;;)
  {

    pthread_mutex_lock(&LOCK_ndb_util_thread);
    error= pthread_cond_timedwait(&COND_ndb_util_thread, 
			   &LOCK_ndb_util_thread, 
			   &abstime);
    pthread_mutex_unlock(&LOCK_ndb_util_thread);

    DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d", 
				   ndb_cache_check_time));
    
    if (abort_loop)
      break; // Shutting down server
    
    if (ndb_cache_check_time == 0)
    {
      set_timespec(abstime, 10);
      continue;
    }

    // Set new time to wake up 
    set_timespec(abstime, ndb_cache_check_time);

    // Lock mutex and fill list with pointers to all open tables
    NDB_SHARE *share;
    pthread_mutex_lock(&ndbcluster_mutex);
    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
    {
      share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
      share->use_count++; // Make sure the table can't be closed
      
      DBUG_PRINT("ndb_util_thread", 
		 ("Found open table[%d]: %s, use_count: %d", 
		  i, share->table_name, share->use_count));
      
      // Store pointer to table
      util_open_tables.push_back(share);
    }
    pthread_mutex_unlock(&ndbcluster_mutex);
    
    
    // Iterate through the  open files list 
    List_iterator_fast<NDB_SHARE> it(util_open_tables);
    while (share=it++)
    {  
      // Split tab- and dbname
      char buf[FN_REFLEN];
      char *tabname, *db;
      uint length= dirname_length(share->table_name);
      tabname= share->table_name+length;
      memcpy(buf, share->table_name, length-1);
      buf[length-1]= 0;
      db= buf+dirname_length(buf);
      DBUG_PRINT("ndb_util_thread", 
		 ("Fetching commit count for: %s, db: %s, tab: %s", 
		  share->table_name, db, tabname));
      
      // Contact NDB to get commit count for table
      g_ndb->setDatabaseName(db);
      Uint64 rows, commit_count;
      if(ndb_get_table_statistics(g_ndb, tabname, 
				  &rows, &commit_count) == 0){
	DBUG_PRINT("ndb_util_thread", 
		   ("Table: %s, rows: %llu, commit_count: %llu", 
		    share->table_name, rows, commit_count));
	share->commit_count= commit_count;
      }
      else
      {
	DBUG_PRINT("ndb_util_thread", 
		   ("Error: Could not get commit count for table %s",
		    share->table_name));
	share->commit_count++; // Invalidate
      }
      // Decrease the use count and possibly free share
      free_share(share);
    }
    
    // Clear the list of open tables
    util_open_tables.empty();  
    
  }
  
  thd->cleanup();
  delete thd;
  DBUG_PRINT("exit", ("ndb_util_thread"));
  my_thread_end();
  DBUG_RETURN(NULL);
}


#endif /* HAVE_NDBCLUSTER_DB */
Loading