Loading storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent.cpp 0 → 100644 +140 −0 Original line number Diff line number Diff line /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <mysql.h> #include <ndbapi/NdbApi.hpp> #include <mgmapi.h> #include <stdio.h> /* * export LD_LIBRARY_PATH=../../../libmysql_r/.libs:../../../ndb/src/.libs */ #define MGMERROR(h) \ { \ fprintf(stderr, "code: %d msg: %s\n", \ ndb_mgm_get_latest_error(h), \ ndb_mgm_get_latest_error_msg(h)); \ exit(-1); \ } #define LOGEVENTERROR(h) \ { \ fprintf(stderr, "code: %d msg: %s\n", \ ndb_logevent_get_latest_error(h), \ ndb_logevent_get_latest_error_msg(h)); \ exit(-1); \ } int main() { NdbMgmHandle h; NdbLogEventHandle le; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 15, NDB_MGM_EVENT_CATEGORY_CONNECTION, 15, NDB_MGM_EVENT_CATEGORY_NODE_RESTART, 15, NDB_MGM_EVENT_CATEGORY_STARTUP, 15, NDB_MGM_EVENT_CATEGORY_ERROR, 0 }; struct ndb_logevent event; ndb_init(); h= ndb_mgm_create_handle(); if ( h == 0) { printf("Unable to create handle\n"); exit(-1); } if (ndb_mgm_connect(h,0,0,0)) MGMERROR(h); le= ndb_mgm_create_logevent_handle(h, filter); if ( le == 0 ) MGMERROR(h); while (1) { int timeout= 5000; int r= ndb_logevent_get_next(le,&event,timeout); if (r == 0) printf("No event within %d milliseconds\n", timeout); else if (r < 0) LOGEVENTERROR(le) else { switch (event.type) { case NDB_LE_BackupStarted: printf("Node %d: BackupStarted\n", event.source_nodeid); printf(" Starting node ID: %d\n", event.BackupStarted.starting_node); printf(" Backup ID: %d\n", event.BackupStarted.backup_id); break; case NDB_LE_BackupCompleted: printf("Node %d: BackupCompleted\n", event.source_nodeid); printf(" Backup ID: %d\n", event.BackupStarted.backup_id); break; case NDB_LE_BackupAborted: printf("Node %d: BackupAborted\n", event.source_nodeid); break; case NDB_LE_BackupFailedToStart: printf("Node %d: BackupFailedToStart\n", event.source_nodeid); break; case NDB_LE_NodeFailCompleted: printf("Node %d: NodeFailCompleted\n", event.source_nodeid); break; case NDB_LE_ArbitResult: printf("Node %d: ArbitResult\n", event.source_nodeid); printf(" code %d, arbit_node %d\n", event.ArbitResult.code & 0xffff, event.ArbitResult.arbit_node); break; case NDB_LE_DeadDueToHeartbeat: printf("Node %d: DeadDueToHeartbeat\n", event.source_nodeid); printf(" node %d\n", event.DeadDueToHeartbeat.node); break; case NDB_LE_Connected: printf("Node %d: Connected\n", event.source_nodeid); printf(" node %d\n", event.Connected.node); break; case NDB_LE_Disconnected: printf("Node %d: Disconnected\n", event.source_nodeid); printf(" node %d\n", event.Disconnected.node); break; case NDB_LE_NDBStartCompleted: printf("Node %d: StartCompleted\n", event.source_nodeid); printf(" version %d.%d.%d\n", event.NDBStartCompleted.version >> 16 & 0xff, event.NDBStartCompleted.version >> 8 & 0xff, event.NDBStartCompleted.version >> 0 & 0xff); break; case NDB_LE_ArbitState: printf("Node %d: ArbitState\n", event.source_nodeid); printf(" code %d, arbit_node %d\n", event.ArbitState.code & 0xffff, event.ArbitResult.arbit_node); break; default: break; } } } ndb_mgm_destroy_logevent_handle(&le); ndb_mgm_destroy_handle(&h); ndb_end(0); return 0; } storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async.cpp 0 → 100644 +476 −0 Original line number Diff line number Diff line /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /** * ndbapi_async.cpp: * Illustrates how to use callbacks and error handling using the asynchronous * part of the NDBAPI. * * Classes and methods in NDBAPI used in this example: * * Ndb_cluster_connection * connect() * wait_until_ready() * * Ndb * init() * startTransaction() * closeTransaction() * sendPollNdb() * getNdbError() * * NdbConnection * getNdbOperation() * executeAsynchPrepare() * getNdbError() * * NdbOperation * insertTuple() * equal() * setValue() * */ #include <mysql.h> #include <mysqld_error.h> #include <NdbApi.hpp> #include <iostream> // Used for cout /** * Helper sleep function */ static void milliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime); } /** * error printout macro */ #define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL /** * callback struct. * transaction : index of the transaction in transaction[] array below * data : the data that the transaction was modifying. * retries : counter for how many times the trans. has been retried */ typedef struct { Ndb * ndb; int transaction; int data; int retries; } async_callback_t; /** * Structure used in "free list" to a NdbTransaction */ typedef struct { NdbTransaction* conn; int used; } transaction_t; /** * Free list holding transactions */ transaction_t transaction[1024]; //1024 - max number of outstanding //transaction in one Ndb object #endif /** * prototypes */ /** * Prepare and send transaction */ int populate(Ndb * myNdb, int data, async_callback_t * cbData); /** * Error handler. */ bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb); /** * Exit function */ void asynchExitHandler(Ndb * m_ndb) ; /** * Helper function used in callback(...) */ void closeTransaction(Ndb * ndb , async_callback_t * cb); /** * Function to create table */ int create_table(Ndb * myNdb); /** * stat. variables */ int tempErrors = 0; int permErrors = 0; void closeTransaction(Ndb * ndb , async_callback_t * cb) { ndb->closeTransaction(transaction[cb->transaction].conn); transaction[cb->transaction].conn = 0; transaction[cb->transaction].used = 0; cb->retries++; } /** * Callback executed when transaction has return from NDB */ static void callback(int result, NdbTransaction* trans, void* aObject) { async_callback_t * cbData = (async_callback_t *)aObject; if (result<0) { /** * Error: Temporary or permanent? */ if (asynchErrorHandler(trans, (Ndb*)cbData->ndb)) { closeTransaction((Ndb*)cbData->ndb, cbData); while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0) milliSleep(10); } else { std::cout << "Restore: Failed to restore data " << "due to a unrecoverable error. Exiting..." << std::endl; delete cbData; asynchExitHandler((Ndb*)cbData->ndb); } } else { /** * OK! close transaction */ closeTransaction((Ndb*)cbData->ndb, cbData); delete cbData; } } /** * Create table "GARAGE" */ int create_table(MYSQL &mysql) { while (mysql_query(&mysql, "CREATE TABLE" " GARAGE" " (REG_NO INT UNSIGNED NOT NULL," " BRAND CHAR(20) NOT NULL," " COLOR CHAR(20) NOT NULL," " PRIMARY KEY USING HASH (REG_NO))" " ENGINE=NDB")) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); std::cout << "MySQL Cluster already has example table: GARAGE. " << "Dropping it..." << std::endl; /************** * Drop table * **************/ if (mysql_query(&mysql, "DROP TABLE GARAGE")) MYSQLERROR(mysql); } return 1; } void asynchExitHandler(Ndb * m_ndb) { if (m_ndb != NULL) delete m_ndb; exit(-1); } /* returns true if is recoverable (temporary), * false if it is an error that is permanent. */ bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb) { NdbError error = trans->getNdbError(); switch(error.status) { case NdbError::Success: return false; break; case NdbError::TemporaryError: /** * The error code indicates a temporary error. * The application should typically retry. * (Includes classifications: NdbError::InsufficientSpace, * NdbError::TemporaryResourceError, NdbError::NodeRecoveryError, * NdbError::OverloadError, NdbError::NodeShutdown * and NdbError::TimeoutExpired.) * * We should sleep for a while and retry, except for insufficient space */ if(error.classification == NdbError::InsufficientSpace) return false; milliSleep(10); tempErrors++; return true; break; case NdbError::UnknownResult: std::cout << error.message << std::endl; return false; break; default: case NdbError::PermanentError: switch (error.code) { case 499: case 250: milliSleep(10); return true; // SCAN errors that can be retried. Requires restart of scan. default: break; } //ERROR std::cout << error.message << std::endl; return false; break; } return false; } static int nPreparedTransactions = 0; static int MAX_RETRIES = 10; static int parallelism = 100; /************************************************************************ * populate() * 1. Prepare 'parallelism' number of insert transactions. * 2. Send transactions to NDB and wait for callbacks to execute */ int populate(Ndb * myNdb, int data, async_callback_t * cbData) { NdbOperation* myNdbOperation; // For operations const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); async_callback_t * cb; int retries = 0; int current = 0; for(int i=0; i<1024; i++) { if(transaction[i].used == 0) { current = i; if (cbData == 0) { /** * We already have a callback * This is an absolutely new transaction */ cb = new async_callback_t; cb->retries = 0; } else { /** * We already have a callback */ cb =cbData; retries = cbData->retries; } /** * Set data used by the callback */ cb->ndb = myNdb; //handle to Ndb object so that we can close transaction // in the callback (alt. make myNdb global). cb->data = data; //this is the data we want to insert cb->transaction = current; //This is the number (id) of this transaction transaction[current].used = 1 ; //Mark the transaction as used break; } } if(!current) return -1; while(retries < MAX_RETRIES) { transaction[current].conn = myNdb->startTransaction(); if (transaction[current].conn == NULL) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { /** * no transaction to close since conn == null */ milliSleep(10); retries++; continue; } asynchExitHandler(myNdb); } myNdbOperation = transaction[current].conn->getNdbOperation(myTable); if (myNdbOperation == NULL) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { myNdb->closeTransaction(transaction[current].conn); transaction[current].conn = 0; milliSleep(10); retries++; continue; } asynchExitHandler(myNdb); } // if if(myNdbOperation->insertTuple() < 0 || myNdbOperation->equal("REG_NO", data) < 0 || myNdbOperation->setValue("BRAND", "Mercedes") <0 || myNdbOperation->setValue("COLOR", "Blue") < 0) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { myNdb->closeTransaction(transaction[current].conn); transaction[current].conn = 0; retries++; milliSleep(10); continue; } asynchExitHandler(myNdb); } /*Prepare transaction (the transaction is NOT yet sent to NDB)*/ transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit, &callback, cb); /** * When we have prepared parallelism number of transactions -> * send the transaction to ndb. * Next time we will deal with the transactions are in the * callback. There we will see which ones that were successful * and which ones to retry. */ if (nPreparedTransactions == parallelism-1) { // send-poll all transactions // close transaction is done in callback myNdb->sendPollNdb(3000, parallelism ); nPreparedTransactions=0; } else nPreparedTransactions++; return 1; } std::cout << "Unable to recover from errors. Exiting..." << std::endl; asynchExitHandler(myNdb); return -1; } int main() { ndb_init(); MYSQL mysql; /************************************************************** * Connect to mysql server and create table * **************************************************************/ { if ( !mysql_init(&mysql) ) { std::cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE TEST_DB"); if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql); create_table(mysql); } /************************************************************** * Connect to ndb cluster * **************************************************************/ Ndb_cluster_connection cluster_connection; if (cluster_connection.connect(4, 5, 1)) { std::cout << "Unable to connect to cluster within 30 secs." << std::endl; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb* myNdb = new Ndb( &cluster_connection, "TEST_DB" ); // Object representing the database if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb->getNdbError()); } /** * Initialise transaction array */ for(int i = 0 ; i < 1024 ; i++) { transaction[i].used = 0; transaction[i].conn = 0; } int i=0; /** * Do 20000 insert transactions. */ while(i < 20000) { while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again. milliSleep(10); i++; } std::cout << "Number of temporary errors: " << tempErrors << std::endl; delete myNdb; } storage/ndb/ndbapi-examples/ndbapi_async/readme.txt 0 → 100644 +3 −0 Original line number Diff line number Diff line 1. Set NDB_OS in Makefile 2. Add path to libNDB_API.so in LD_LIBRARY_PATH 3. Set NDB_CONNECTSTRING storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1.cpp 0 → 100644 +144 −0 Original line number Diff line number Diff line /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ // // ndbapi_async1.cpp: Using asynchronous transactions in NDB API // // Execute ndbapi_example1 to create the table "MYTABLENAME" // before executing this program. // // Correct output from this program is: // // Successful insert. // Successful insert. #include <NdbApi.hpp> // Used for cout #include <iostream> #define APIERROR(error) \ { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \ << error.code << ", msg: " << error.message << "." << std::endl; \ exit(-1); } static void callback(int result, NdbTransaction* NdbObject, void* aObject); int main() { ndb_init(); Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(); // Object representing the cluster if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } int r= cluster_connection->connect(5 /* retries */, 3 /* delay between retries */, 1 /* verbose */); if (r > 0) { std::cout << "Cluster connect failed, possibly resolved with more retries.\n"; exit(-1); } else if (r < 0) { std::cout << "Cluster connect failed.\n"; exit(-1); } if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } Ndb* myNdb = new Ndb( cluster_connection, "TEST_DB_2" ); // Object representing the database NdbTransaction* myNdbTransaction[2]; // For transactions NdbOperation* myNdbOperation; // For operations if (myNdb->init(2) == -1) { // Want two parallel insert transactions APIERROR(myNdb->getNdbError()); exit(-1); } /****************************************************** * Insert (we do two insert transactions in parallel) * ******************************************************/ const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME"); if (myTable == NULL) APIERROR(myDict->getNdbError()); for (int i = 0; i < 2; i++) { myNdbTransaction[i] = myNdb->startTransaction(); if (myNdbTransaction[i] == NULL) APIERROR(myNdb->getNdbError()); myNdbOperation = myNdbTransaction[i]->getNdbOperation(myTable); if (myNdbOperation == NULL) APIERROR(myNdbTransaction[i]->getNdbError()); myNdbOperation->insertTuple(); myNdbOperation->equal("ATTR1", 20 + i); myNdbOperation->setValue("ATTR2", 20 + i); // Prepare transaction (the transaction is NOT yet sent to NDB) myNdbTransaction[i]->executeAsynchPrepare(NdbTransaction::Commit, &callback, NULL); } // Send all transactions to NDB myNdb->sendPreparedTransactions(0); // Poll all transactions myNdb->pollNdb(3000, 2); // Close all transactions for (int i = 0; i < 2; i++) myNdb->closeTransaction(myNdbTransaction[i]); delete myNdb; delete cluster_connection; ndb_end(0); return 0; } /* * callback : This is called when the transaction is polled * * (This function must have three arguments: * - The result of the transaction, * - The NdbTransaction object, and * - A pointer to an arbitrary object.) */ static void callback(int result, NdbTransaction* myTrans, void* aObject) { if (result == -1) { std::cout << "Poll error: " << std::endl; APIERROR(myTrans->getNdbError()); } else { std::cout << "Successful insert." << std::endl; } } storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp 0 → 100644 +264 −0 File added.Preview size limit exceeded, changes collapsed. Show changes Loading
storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent.cpp 0 → 100644 +140 −0 Original line number Diff line number Diff line /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <mysql.h> #include <ndbapi/NdbApi.hpp> #include <mgmapi.h> #include <stdio.h> /* * export LD_LIBRARY_PATH=../../../libmysql_r/.libs:../../../ndb/src/.libs */ #define MGMERROR(h) \ { \ fprintf(stderr, "code: %d msg: %s\n", \ ndb_mgm_get_latest_error(h), \ ndb_mgm_get_latest_error_msg(h)); \ exit(-1); \ } #define LOGEVENTERROR(h) \ { \ fprintf(stderr, "code: %d msg: %s\n", \ ndb_logevent_get_latest_error(h), \ ndb_logevent_get_latest_error_msg(h)); \ exit(-1); \ } int main() { NdbMgmHandle h; NdbLogEventHandle le; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 15, NDB_MGM_EVENT_CATEGORY_CONNECTION, 15, NDB_MGM_EVENT_CATEGORY_NODE_RESTART, 15, NDB_MGM_EVENT_CATEGORY_STARTUP, 15, NDB_MGM_EVENT_CATEGORY_ERROR, 0 }; struct ndb_logevent event; ndb_init(); h= ndb_mgm_create_handle(); if ( h == 0) { printf("Unable to create handle\n"); exit(-1); } if (ndb_mgm_connect(h,0,0,0)) MGMERROR(h); le= ndb_mgm_create_logevent_handle(h, filter); if ( le == 0 ) MGMERROR(h); while (1) { int timeout= 5000; int r= ndb_logevent_get_next(le,&event,timeout); if (r == 0) printf("No event within %d milliseconds\n", timeout); else if (r < 0) LOGEVENTERROR(le) else { switch (event.type) { case NDB_LE_BackupStarted: printf("Node %d: BackupStarted\n", event.source_nodeid); printf(" Starting node ID: %d\n", event.BackupStarted.starting_node); printf(" Backup ID: %d\n", event.BackupStarted.backup_id); break; case NDB_LE_BackupCompleted: printf("Node %d: BackupCompleted\n", event.source_nodeid); printf(" Backup ID: %d\n", event.BackupStarted.backup_id); break; case NDB_LE_BackupAborted: printf("Node %d: BackupAborted\n", event.source_nodeid); break; case NDB_LE_BackupFailedToStart: printf("Node %d: BackupFailedToStart\n", event.source_nodeid); break; case NDB_LE_NodeFailCompleted: printf("Node %d: NodeFailCompleted\n", event.source_nodeid); break; case NDB_LE_ArbitResult: printf("Node %d: ArbitResult\n", event.source_nodeid); printf(" code %d, arbit_node %d\n", event.ArbitResult.code & 0xffff, event.ArbitResult.arbit_node); break; case NDB_LE_DeadDueToHeartbeat: printf("Node %d: DeadDueToHeartbeat\n", event.source_nodeid); printf(" node %d\n", event.DeadDueToHeartbeat.node); break; case NDB_LE_Connected: printf("Node %d: Connected\n", event.source_nodeid); printf(" node %d\n", event.Connected.node); break; case NDB_LE_Disconnected: printf("Node %d: Disconnected\n", event.source_nodeid); printf(" node %d\n", event.Disconnected.node); break; case NDB_LE_NDBStartCompleted: printf("Node %d: StartCompleted\n", event.source_nodeid); printf(" version %d.%d.%d\n", event.NDBStartCompleted.version >> 16 & 0xff, event.NDBStartCompleted.version >> 8 & 0xff, event.NDBStartCompleted.version >> 0 & 0xff); break; case NDB_LE_ArbitState: printf("Node %d: ArbitState\n", event.source_nodeid); printf(" code %d, arbit_node %d\n", event.ArbitState.code & 0xffff, event.ArbitResult.arbit_node); break; default: break; } } } ndb_mgm_destroy_logevent_handle(&le); ndb_mgm_destroy_handle(&h); ndb_end(0); return 0; }
storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async.cpp 0 → 100644 +476 −0 Original line number Diff line number Diff line /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /** * ndbapi_async.cpp: * Illustrates how to use callbacks and error handling using the asynchronous * part of the NDBAPI. * * Classes and methods in NDBAPI used in this example: * * Ndb_cluster_connection * connect() * wait_until_ready() * * Ndb * init() * startTransaction() * closeTransaction() * sendPollNdb() * getNdbError() * * NdbConnection * getNdbOperation() * executeAsynchPrepare() * getNdbError() * * NdbOperation * insertTuple() * equal() * setValue() * */ #include <mysql.h> #include <mysqld_error.h> #include <NdbApi.hpp> #include <iostream> // Used for cout /** * Helper sleep function */ static void milliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime); } /** * error printout macro */ #define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL /** * callback struct. * transaction : index of the transaction in transaction[] array below * data : the data that the transaction was modifying. * retries : counter for how many times the trans. has been retried */ typedef struct { Ndb * ndb; int transaction; int data; int retries; } async_callback_t; /** * Structure used in "free list" to a NdbTransaction */ typedef struct { NdbTransaction* conn; int used; } transaction_t; /** * Free list holding transactions */ transaction_t transaction[1024]; //1024 - max number of outstanding //transaction in one Ndb object #endif /** * prototypes */ /** * Prepare and send transaction */ int populate(Ndb * myNdb, int data, async_callback_t * cbData); /** * Error handler. */ bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb); /** * Exit function */ void asynchExitHandler(Ndb * m_ndb) ; /** * Helper function used in callback(...) */ void closeTransaction(Ndb * ndb , async_callback_t * cb); /** * Function to create table */ int create_table(Ndb * myNdb); /** * stat. variables */ int tempErrors = 0; int permErrors = 0; void closeTransaction(Ndb * ndb , async_callback_t * cb) { ndb->closeTransaction(transaction[cb->transaction].conn); transaction[cb->transaction].conn = 0; transaction[cb->transaction].used = 0; cb->retries++; } /** * Callback executed when transaction has return from NDB */ static void callback(int result, NdbTransaction* trans, void* aObject) { async_callback_t * cbData = (async_callback_t *)aObject; if (result<0) { /** * Error: Temporary or permanent? */ if (asynchErrorHandler(trans, (Ndb*)cbData->ndb)) { closeTransaction((Ndb*)cbData->ndb, cbData); while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0) milliSleep(10); } else { std::cout << "Restore: Failed to restore data " << "due to a unrecoverable error. Exiting..." << std::endl; delete cbData; asynchExitHandler((Ndb*)cbData->ndb); } } else { /** * OK! close transaction */ closeTransaction((Ndb*)cbData->ndb, cbData); delete cbData; } } /** * Create table "GARAGE" */ int create_table(MYSQL &mysql) { while (mysql_query(&mysql, "CREATE TABLE" " GARAGE" " (REG_NO INT UNSIGNED NOT NULL," " BRAND CHAR(20) NOT NULL," " COLOR CHAR(20) NOT NULL," " PRIMARY KEY USING HASH (REG_NO))" " ENGINE=NDB")) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); std::cout << "MySQL Cluster already has example table: GARAGE. " << "Dropping it..." << std::endl; /************** * Drop table * **************/ if (mysql_query(&mysql, "DROP TABLE GARAGE")) MYSQLERROR(mysql); } return 1; } void asynchExitHandler(Ndb * m_ndb) { if (m_ndb != NULL) delete m_ndb; exit(-1); } /* returns true if is recoverable (temporary), * false if it is an error that is permanent. */ bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb) { NdbError error = trans->getNdbError(); switch(error.status) { case NdbError::Success: return false; break; case NdbError::TemporaryError: /** * The error code indicates a temporary error. * The application should typically retry. * (Includes classifications: NdbError::InsufficientSpace, * NdbError::TemporaryResourceError, NdbError::NodeRecoveryError, * NdbError::OverloadError, NdbError::NodeShutdown * and NdbError::TimeoutExpired.) * * We should sleep for a while and retry, except for insufficient space */ if(error.classification == NdbError::InsufficientSpace) return false; milliSleep(10); tempErrors++; return true; break; case NdbError::UnknownResult: std::cout << error.message << std::endl; return false; break; default: case NdbError::PermanentError: switch (error.code) { case 499: case 250: milliSleep(10); return true; // SCAN errors that can be retried. Requires restart of scan. default: break; } //ERROR std::cout << error.message << std::endl; return false; break; } return false; } static int nPreparedTransactions = 0; static int MAX_RETRIES = 10; static int parallelism = 100; /************************************************************************ * populate() * 1. Prepare 'parallelism' number of insert transactions. * 2. Send transactions to NDB and wait for callbacks to execute */ int populate(Ndb * myNdb, int data, async_callback_t * cbData) { NdbOperation* myNdbOperation; // For operations const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); async_callback_t * cb; int retries = 0; int current = 0; for(int i=0; i<1024; i++) { if(transaction[i].used == 0) { current = i; if (cbData == 0) { /** * We already have a callback * This is an absolutely new transaction */ cb = new async_callback_t; cb->retries = 0; } else { /** * We already have a callback */ cb =cbData; retries = cbData->retries; } /** * Set data used by the callback */ cb->ndb = myNdb; //handle to Ndb object so that we can close transaction // in the callback (alt. make myNdb global). cb->data = data; //this is the data we want to insert cb->transaction = current; //This is the number (id) of this transaction transaction[current].used = 1 ; //Mark the transaction as used break; } } if(!current) return -1; while(retries < MAX_RETRIES) { transaction[current].conn = myNdb->startTransaction(); if (transaction[current].conn == NULL) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { /** * no transaction to close since conn == null */ milliSleep(10); retries++; continue; } asynchExitHandler(myNdb); } myNdbOperation = transaction[current].conn->getNdbOperation(myTable); if (myNdbOperation == NULL) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { myNdb->closeTransaction(transaction[current].conn); transaction[current].conn = 0; milliSleep(10); retries++; continue; } asynchExitHandler(myNdb); } // if if(myNdbOperation->insertTuple() < 0 || myNdbOperation->equal("REG_NO", data) < 0 || myNdbOperation->setValue("BRAND", "Mercedes") <0 || myNdbOperation->setValue("COLOR", "Blue") < 0) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { myNdb->closeTransaction(transaction[current].conn); transaction[current].conn = 0; retries++; milliSleep(10); continue; } asynchExitHandler(myNdb); } /*Prepare transaction (the transaction is NOT yet sent to NDB)*/ transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit, &callback, cb); /** * When we have prepared parallelism number of transactions -> * send the transaction to ndb. * Next time we will deal with the transactions are in the * callback. There we will see which ones that were successful * and which ones to retry. */ if (nPreparedTransactions == parallelism-1) { // send-poll all transactions // close transaction is done in callback myNdb->sendPollNdb(3000, parallelism ); nPreparedTransactions=0; } else nPreparedTransactions++; return 1; } std::cout << "Unable to recover from errors. Exiting..." << std::endl; asynchExitHandler(myNdb); return -1; } int main() { ndb_init(); MYSQL mysql; /************************************************************** * Connect to mysql server and create table * **************************************************************/ { if ( !mysql_init(&mysql) ) { std::cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE TEST_DB"); if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql); create_table(mysql); } /************************************************************** * Connect to ndb cluster * **************************************************************/ Ndb_cluster_connection cluster_connection; if (cluster_connection.connect(4, 5, 1)) { std::cout << "Unable to connect to cluster within 30 secs." << std::endl; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb* myNdb = new Ndb( &cluster_connection, "TEST_DB" ); // Object representing the database if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb->getNdbError()); } /** * Initialise transaction array */ for(int i = 0 ; i < 1024 ; i++) { transaction[i].used = 0; transaction[i].conn = 0; } int i=0; /** * Do 20000 insert transactions. */ while(i < 20000) { while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again. milliSleep(10); i++; } std::cout << "Number of temporary errors: " << tempErrors << std::endl; delete myNdb; }
storage/ndb/ndbapi-examples/ndbapi_async/readme.txt 0 → 100644 +3 −0 Original line number Diff line number Diff line 1. Set NDB_OS in Makefile 2. Add path to libNDB_API.so in LD_LIBRARY_PATH 3. Set NDB_CONNECTSTRING
storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1.cpp 0 → 100644 +144 −0 Original line number Diff line number Diff line /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ // // ndbapi_async1.cpp: Using asynchronous transactions in NDB API // // Execute ndbapi_example1 to create the table "MYTABLENAME" // before executing this program. // // Correct output from this program is: // // Successful insert. // Successful insert. #include <NdbApi.hpp> // Used for cout #include <iostream> #define APIERROR(error) \ { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \ << error.code << ", msg: " << error.message << "." << std::endl; \ exit(-1); } static void callback(int result, NdbTransaction* NdbObject, void* aObject); int main() { ndb_init(); Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(); // Object representing the cluster if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } int r= cluster_connection->connect(5 /* retries */, 3 /* delay between retries */, 1 /* verbose */); if (r > 0) { std::cout << "Cluster connect failed, possibly resolved with more retries.\n"; exit(-1); } else if (r < 0) { std::cout << "Cluster connect failed.\n"; exit(-1); } if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } Ndb* myNdb = new Ndb( cluster_connection, "TEST_DB_2" ); // Object representing the database NdbTransaction* myNdbTransaction[2]; // For transactions NdbOperation* myNdbOperation; // For operations if (myNdb->init(2) == -1) { // Want two parallel insert transactions APIERROR(myNdb->getNdbError()); exit(-1); } /****************************************************** * Insert (we do two insert transactions in parallel) * ******************************************************/ const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME"); if (myTable == NULL) APIERROR(myDict->getNdbError()); for (int i = 0; i < 2; i++) { myNdbTransaction[i] = myNdb->startTransaction(); if (myNdbTransaction[i] == NULL) APIERROR(myNdb->getNdbError()); myNdbOperation = myNdbTransaction[i]->getNdbOperation(myTable); if (myNdbOperation == NULL) APIERROR(myNdbTransaction[i]->getNdbError()); myNdbOperation->insertTuple(); myNdbOperation->equal("ATTR1", 20 + i); myNdbOperation->setValue("ATTR2", 20 + i); // Prepare transaction (the transaction is NOT yet sent to NDB) myNdbTransaction[i]->executeAsynchPrepare(NdbTransaction::Commit, &callback, NULL); } // Send all transactions to NDB myNdb->sendPreparedTransactions(0); // Poll all transactions myNdb->pollNdb(3000, 2); // Close all transactions for (int i = 0; i < 2; i++) myNdb->closeTransaction(myNdbTransaction[i]); delete myNdb; delete cluster_connection; ndb_end(0); return 0; } /* * callback : This is called when the transaction is polled * * (This function must have three arguments: * - The result of the transaction, * - The NdbTransaction object, and * - A pointer to an arbitrary object.) */ static void callback(int result, NdbTransaction* myTrans, void* aObject) { if (result == -1) { std::cout << "Poll error: " << std::endl; APIERROR(myTrans->getNdbError()); } else { std::cout << "Successful insert." << std::endl; } }
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp 0 → 100644 +264 −0 File added.Preview size limit exceeded, changes collapsed. Show changes