Commit b582ea2b authored by unknown's avatar unknown
Browse files

Re-applying the work initially done by Brian, and since worked upon by me...

Re-applying the work initially done by Brian, and since worked upon by me previously in several separate patches to the 5.1 parent but never pushed.
WL#2952 - add simple single-table only transactions to federated.


sql/ha_federated.cc:
  added handlerton functions for commit and rollback, added handler methods for same.
sql/ha_federated.h:
  added member variable for transaction data (linked list of federated handlers used in transaction) and member functions for support commit and rollback.
mysql-test/r/federated_transactions.result:
  New BitKeeper file ``mysql-test/r/federated_transactions.result''
mysql-test/t/federated_transactions.test:
  New BitKeeper file ``mysql-test/t/federated_transactions.test''
parent c4239fee
Loading
Loading
Loading
Loading
+49 −0
Original line number Diff line number Diff line
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP TABLE IF EXISTS federated.t1;
Warnings:
Note	1051	Unknown table 't1'
CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(32) NOT NULL default ''
    )
DEFAULT CHARSET=latin1 ENGINE=BerkeleyDB;
DROP TABLE IF EXISTS federated.t1;
Warnings:
Note	1051	Unknown table 't1'
CREATE TABLE federated.t1 (
`id` int(20) NOT NULL,
`name` varchar(32) NOT NULL default ''
    )
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
set autocommit=0;
INSERT INTO federated.t1 (id, name) VALUES (1, 'foo');
INSERT INTO federated.t1 (id, name) VALUES (2, 'fee');
COMMIT;
INSERT INTO federated.t1 (id, name) VALUES (3, 'fie');
INSERT INTO federated.t1 (id, name) VALUES (4, 'fum');
ROLLBACK;
set autocommit=1;
INSERT INTO federated.t1 (id, name) VALUES (5, 'foe');
INSERT INTO federated.t1 (id, name) VALUES (6, 'fig');
SELECT * FROM federated.t1;
id	name
1	foo
2	fee
5	foe
6	fig
DELETE FROM federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
+37 −0
Original line number Diff line number Diff line
source include/federated.inc;

connection slave;
DROP TABLE IF EXISTS federated.t1;
#SHOW ENGINES;
CREATE TABLE federated.t1 (
    `id` int(20) NOT NULL,
    `name` varchar(32) NOT NULL default ''
    )
  DEFAULT CHARSET=latin1 ENGINE=BerkeleyDB;

connection master;
DROP TABLE IF EXISTS federated.t1;
# # correct connection, same named tables
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CREATE TABLE federated.t1 (
    `id` int(20) NOT NULL,
    `name` varchar(32) NOT NULL default ''
    )
  ENGINE="FEDERATED" DEFAULT CHARSET=latin1
  CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1';

set autocommit=0;
INSERT INTO federated.t1 (id, name) VALUES (1, 'foo');
INSERT INTO federated.t1 (id, name) VALUES (2, 'fee');
COMMIT;
INSERT INTO federated.t1 (id, name) VALUES (3, 'fie');
INSERT INTO federated.t1 (id, name) VALUES (4, 'fum');
ROLLBACK;
set autocommit=1;
INSERT INTO federated.t1 (id, name) VALUES (5, 'foe');
INSERT INTO federated.t1 (id, name) VALUES (6, 'fig');

SELECT * FROM federated.t1;
DELETE FROM federated.t1;

source include/federated_cleanup.inc;
+159 −8
Original line number Diff line number Diff line
@@ -363,9 +363,9 @@ static int federated_init= FALSE; // Variable for checking the
                                                // init state of hash

/* Static declaration for handerton */

static handler *federated_create_handler(TABLE *table);

static int federated_commit(THD *thd, bool all);
static int federated_rollback(THD *thd, bool all);

/* Federated storage engine handlerton */

@@ -381,8 +381,8 @@ handlerton federated_hton= {
  NULL,    /* savepoint */
  NULL,    /* rollback to savepoint */
  NULL,    /* release savepoint */
  NULL,    /* commit */
  NULL,    /* rollback */
  federated_commit,    /* commit */
  federated_rollback,    /* rollback */
  NULL,    /* prepare */
  NULL,    /* recover */
  NULL,    /* commit_by_xid */
@@ -647,8 +647,8 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table,

  share->port= 0;
  share->socket= 0;
  DBUG_PRINT("info", ("Length %d \n", table->s->connect_string.length));
  DBUG_PRINT("info", ("String %.*s \n", table->s->connect_string.length, 
  DBUG_PRINT("info", ("Length: %d", table->s->connect_string.length));
  DBUG_PRINT("info", ("String: '%.*s'", table->s->connect_string.length, 
                      table->s->connect_string.str));
  share->scheme= my_strdup_with_length((const byte*)table->s->
                                       connect_string.str, 
@@ -740,7 +740,7 @@ static int parse_url(FEDERATED_SHARE *share, TABLE *table,

  DBUG_PRINT("info",
             ("scheme %s username %s password %s \
              hostname %s port %d database %s tablename %s\n",
              hostname %s port %d database %s tablename %s",
              share->scheme, share->username, share->password,
              share->hostname, share->port, share->database,
              share->table_name));
@@ -760,7 +760,9 @@ ha_federated::ha_federated(TABLE *table_arg)
  :handler(&federated_hton, table_arg),
  mysql(0), stored_result(0), scan_flag(0),
  ref_length(sizeof(MYSQL_ROW_OFFSET)), current_position(0)
{}
{
  trx_next= 0;
}


/*
@@ -1488,6 +1490,7 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked)
    with transactions
  */
  mysql->reconnect= 1;

  DBUG_RETURN(0);
}

@@ -2633,3 +2636,151 @@ bool ha_federated::get_error_message(int error, String* buf)
  DBUG_RETURN(FALSE);
}

int ha_federated::external_lock(THD *thd, int lock_type)
{
  int error= 0;
  ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
  DBUG_ENTER("ha_federated::external_lock");

  if (lock_type != F_UNLCK)
  {
    DBUG_PRINT("info",("federated not lock F_UNLCK"));
    if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
    {
      DBUG_PRINT("info",("federated autocommit"));
      /* 
        This means we are doing an autocommit
      */
      error= connection_autocommit(TRUE);
      if (error)
      {
        DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error));
        DBUG_RETURN(error);
      }
      trans_register_ha(thd, FALSE, &federated_hton);
    }
    else 
    { 
      DBUG_PRINT("info",("not autocommit"));
      if (!trx)
      {
        /* 
          This is where a transaction gets its start
        */
        error= connection_autocommit(FALSE);
        if (error)
        { 
          DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error));
          DBUG_RETURN(error);
        }
        thd->ha_data[federated_hton.slot]= this;
        trans_register_ha(thd, TRUE, &federated_hton);
        /*
          Send a lock table to the remote end.
          We do not support this at the moment
        */
        if (thd->options & (OPTION_TABLE_LOCK))
        {
          DBUG_PRINT("info", ("We do not support lock table yet"));
        }
      }
      else
      {
        ha_federated *ptr;
        for (ptr= trx; ptr; ptr= ptr->trx_next)
          if (ptr == this)
            break;
          else if (!ptr->trx_next)
            ptr->trx_next= this;
      }
    }
  }
  DBUG_RETURN(0);
}


static int federated_commit(THD *thd, bool all)
{
  int return_val= 0;
  ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
  DBUG_ENTER("federated_commit");

  if (all)
  {
    int error= 0;
    ha_federated *ptr, *old= NULL;
    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
    {
      if (old)
        old->trx_next= NULL;
      error= ptr->connection_commit();
      if (error && !return_val);
        return_val= error;
    }
    thd->ha_data[federated_hton.slot]= NULL;
  }

  DBUG_PRINT("info", ("error val: %d", return_val));
  DBUG_RETURN(return_val);
}


static int federated_rollback(THD *thd, bool all)
{
  int return_val= 0;
  ha_federated *trx= (ha_federated *)thd->ha_data[federated_hton.slot];
  DBUG_ENTER("federated_rollback");

  if (all)
  {
    int error= 0;
    ha_federated *ptr, *old= NULL;
    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
    {
      if (old)
        old->trx_next= NULL;
      error= ptr->connection_rollback();
      if (error && !return_val)
        return_val= error;
    }
    thd->ha_data[federated_hton.slot]= NULL;
  }

  DBUG_PRINT("info", ("error val: %d", return_val));
  DBUG_RETURN(return_val);
}

int ha_federated::connection_commit()
{
  DBUG_ENTER("ha_federated::connection_commit");
  DBUG_RETURN(execute_simple_query("COMMIT", 6));
}


int ha_federated::connection_rollback()
{
  DBUG_ENTER("ha_federated::connection_rollback");
  DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
}


int ha_federated::connection_autocommit(bool state)
{
  const char *text;
  DBUG_ENTER("ha_federated::connection_autocommit");
  text= (state == true) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
  DBUG_RETURN(execute_simple_query(text, 16));
}     


int ha_federated::execute_simple_query(const char *query, int len)
{
  DBUG_ENTER("ha_federated::execute_simple_query");

  if (mysql_real_query(mysql, query, len))
  {
    DBUG_RETURN(stash_remote_error());
  }
  DBUG_RETURN(0);
}
+12 −3
Original line number Diff line number Diff line
@@ -174,11 +174,13 @@ class ha_federated: public handler

public:
  ha_federated(TABLE *table_arg);
  ~ha_federated()
  {
  }
  ~ha_federated() {}
  /* The name that will be used for display purposes */
  const char *table_type() const { return "FEDERATED"; }
  /*
    Next pointer used in transaction
  */
  ha_federated *trx_next;
  /*
    The name of the index type that will be used for display
    don't implement this method unless you really have indexes
@@ -298,7 +300,14 @@ class ha_federated: public handler
  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
                             enum thr_lock_type lock_type);     //required
  virtual bool get_error_message(int error, String *buf);
  int external_lock(THD *thd, int lock_type);
  int connection_commit();
  int connection_rollback();
  bool has_transactions()  { return 1; }
  int connection_autocommit(bool state);
  int execute_simple_query(const char *query, int len);
};

bool federated_db_init(void);
int federated_db_end(ha_panic_function type);