Commit fa7b260a authored by unknown's avatar unknown
Browse files

ndb Updated example


ndb/examples/ndbapi_scan_example/ndbapi_scan.cpp:
  Updated example
ndb/include/ndbapi/NdbScanFilter.hpp:
  Fix default value
parent 52722f1b
Loading
Loading
Loading
Loading
+181 −207
Original line number Diff line number Diff line
@@ -30,15 +30,19 @@
 *       getDictionary()
 *       startTransaction()
 *       closeTransaction()
 *       sendPreparedTransactions()
 *       pollNdb()
 *
 *  NdbConnection
 *       getNdbOperation()
 *       executeAsynchPrepare()
 *       getNdbError()
 *       executeScan()
 *       nextScanResult()
 *  NdbTransaction
 *       getNdbScanOperation()
 *       execute()
 *
 *  NdbResultSet
 *
 *  NdbScanOperation
 *       getValue() 
 *       readTuples()
 *       nextResult()
 *       deleteCurrentTuple()
 *       updateCurrentTuple()
 *
 *  NdbDictionary::Dictionary
 *       getTable()
@@ -60,33 +64,15 @@
 *       insertTuple()
 *       equal()
 *       setValue()
 *       openScanRead()
 *       openScanExclusive()
 *
 *  NdbRecAttr
 *       aRef()
 *       u_32_value()
 *
 *  NdbResultSet
 *       nextResult()
 *       deleteTuple()
 *       updateTuple()
 *
 *  NdbScanOperation
 *       getValue() 
 *       readTuplesExclusive()
 *
 *  NdbScanFilter
 *       begin()
 *	 eq()
 *	 end()
 *
 *       
 */


#include <ndb_global.h>

#include <NdbApi.hpp>
#include <NdbScanFilter.hpp>
// Used for cout
@@ -114,30 +100,12 @@ milliSleep(int milliseconds){
              << error.code << ", msg: " << error.message << "." << std::endl; \
    exit(-1); }

/*
 *   callback : This is called when the transaction is polled
 *              
 *   (This function must have three arguments: 
 *   - The result of the transaction, 
 *   - The NdbConnection object, and 
 *   - A pointer to an arbitrary object.)
 */
static void
callback(int result, NdbConnection* myTrans, void* aObject)
struct Car 
{
  if (result == -1) {
    std::cout << "In callback: " << std::endl;     
    /**
     * Put error checking code here (see ndb_async_example)
     */
    APIERROR(myTrans->getNdbError());
  } else {
    /**
     * Ok!
     */
    return;
  }
}
  unsigned int reg_no;
  char brand[20];
  char color[20];
};

/**
 * Function to create table
@@ -162,6 +130,8 @@ int create_table(Ndb * myNdb)
    }
  } 
  
  Car car;

  myTable.setName("GARAGE");
  
  myColumn.setName("REG_NO");
@@ -173,7 +143,7 @@ int create_table(Ndb * myNdb)

  myColumn.setName("BRAND");
  myColumn.setType(NdbDictionary::Column::Char);
  myColumn.setLength(20);
  myColumn.setLength(sizeof(car.brand));
  myColumn.setPrimaryKey(false);
  myColumn.setNullable(false);
  myTable.addColumn(myColumn);
@@ -181,7 +151,7 @@ int create_table(Ndb * myNdb)

  myColumn.setName("COLOR");
  myColumn.setType(NdbDictionary::Column::Char);
  myColumn.setLength(20);
  myColumn.setLength(sizeof(car.color));
  myColumn.setPrimaryKey(false);
  myColumn.setNullable(false);
  myTable.addColumn(myColumn);
@@ -196,91 +166,64 @@ int create_table(Ndb * myNdb)

int populate(Ndb * myNdb)
{
  NdbConnection*  myNdbConnection[15];   // For transactions
  NdbOperation*   myNdbOperation;       // For operations
  /******************************************************
   * Insert (we do 15 insert transactions in parallel) *
   ******************************************************/
  int i;
  Car cars[15];

  /**
   * Five blue mercedes
   */
  for (int i = 0; i < 5; i++) 
  for (i = 0; i < 5; i++)
  {
    myNdbConnection[i] = myNdb->startTransaction();
    if (myNdbConnection[i] == NULL) 
      APIERROR(myNdb->getNdbError());
    myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
    // Error check. If error, then maybe table GARAGE is not in database
    if (myNdbOperation == NULL) 
      APIERROR(myNdbConnection[i]->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", i);
    myNdbOperation->setValue("BRAND", "Mercedes");
    myNdbOperation->setValue("COLOR", "Blue");
    // Prepare transaction (the transaction is NOT yet sent to NDB)
    myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
    cars[i].reg_no = i;
    sprintf(cars[i].brand, "Mercedes");
    sprintf(cars[i].color, "Blue");
  }


  /**
   * Five black bmw
   */
  for (int i = 5; i < 10; i++) 
  for (i = 5; i < 10; i++)
  {
    myNdbConnection[i] = myNdb->startTransaction();
    if (myNdbConnection[i] == NULL)
      APIERROR(myNdb->getNdbError());
    myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
    // Error check. If error, then maybe table MYTABLENAME is not in database
    if (myNdbOperation == NULL) 
      APIERROR(myNdbConnection[i]->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", i);
    myNdbOperation->setValue("BRAND", "BMW");
    myNdbOperation->setValue("COLOR", "Black");
    // Prepare transaction (the transaction is NOT yet sent to NDB)
    myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
    cars[i].reg_no = i;
    sprintf(cars[i].brand, "BMW");
    sprintf(cars[i].color, "Black");
  }

  /**
   * Five pink toyotas
   */
  for (int i = 10; i < 15; i++) {
    myNdbConnection[i] = myNdb->startTransaction();
    if (myNdbConnection[i] == NULL) APIERROR(myNdb->getNdbError());
    myNdbOperation = myNdbConnection[i]->getNdbOperation("GARAGE");
  for (i = 10; i < 15; i++)
  {
    cars[i].reg_no = i;
    sprintf(cars[i].brand, "Toyota");
    sprintf(cars[i].color, "Pink");
  }
  
  NdbTransaction* myTrans = myNdb->startTransaction();
  if (myTrans == NULL)
    APIERROR(myNdb->getNdbError());

  for (i = 0; i < 15; i++) 
  {
    NdbOperation* myNdbOperation = myTrans->getNdbOperation("GARAGE");
    // Error check. If error, then maybe table MYTABLENAME is not in database
    if (myNdbOperation == NULL) APIERROR(myNdbConnection[i]->getNdbError());
    if (myNdbOperation == NULL) 
      APIERROR(myTrans->getNdbError());
    myNdbOperation->insertTuple();
    myNdbOperation->equal("REG_NO", i);
    myNdbOperation->setValue("BRAND", "Toyota");
    myNdbOperation->setValue("COLOR", "Pink");
    // Prepare transaction (the transaction is NOT yet sent to NDB)
    myNdbConnection[i]->executeAsynchPrepare(Commit, &callback, NULL);
  }

  // Send all transactions to NDB 
  myNdb->sendPreparedTransactions(0);
  // Poll all transactions
  myNdb->pollNdb(3000, 0);

  //  it is also possible to use sendPollNdb instead of
  //  myNdb->sendPreparedTransactions(0); and myNdb->pollNdb(3000, 15);  above.
  //  myNdb->sendPollNdb(3000,0);
  //  Note! Neither sendPollNdb or pollNdb returs until all 15 callbacks have 
  //  executed.

  //  Close all transactions. It is also possible to close transactions
  //  in the callback.
  for (int i = 0; i < 15; i++) 
    myNdb->closeTransaction(myNdbConnection[i]);
  return 1;
    myNdbOperation->equal("REG_NO", cars[i].reg_no);
    myNdbOperation->setValue("BRAND", cars[i].brand);
    myNdbOperation->setValue("COLOR", cars[i].color);
  }

  int check = myTrans->execute(Commit);

  myTrans->close();

  return check != -1;
}

int scan_delete(Ndb* myNdb, 
		int parallelism,
		int column,
		int column_len,
		const char * color)
  
{
@@ -342,8 +285,8 @@ int scan_delete(Ndb* myNdb,
    /**
     * Define a result set for the scan.
     */ 
    NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
    if( rs == 0 ) {
    if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
@@ -354,7 +297,7 @@ int scan_delete(Ndb* myNdb,
     */ 
    NdbScanFilter filter(myScanOp) ;   
    if(filter.begin(NdbScanFilter::AND) < 0  || 
       filter.eq(column, color, column_len, false) <0||
       filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 ||
       filter.end() < 0)
    {
      std::cout <<  myTrans->getNdbError().message << std::endl;
@@ -384,9 +327,11 @@ int scan_delete(Ndb* myNdb,
    * start of loop: nextResult(true) means that "parallelism" number of
    * rows are fetched from NDB and cached in NDBAPI
    */    
    while((check = rs->nextResult(true)) == 0){
      do {
	if (rs->deleteTuple() != 0){
    while((check = myScanOp->nextResult(true)) == 0){
      do 
      {
	if (myScanOp->deleteCurrentTuple() != 0)
	{
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  return -1;
@@ -398,21 +343,32 @@ int scan_delete(Ndb* myNdb,
	 * cached in the NDBAPI are modified before
	 * fetching more rows from NDB.
	 */    
      } while((check = rs->nextResult(false)) == 0);
      } while((check = myScanOp->nextResult(false)) == 0);
      
      /**
       * Commit when all cached tuple have been marked for deletion
       */    
      if(check != -1){
      if(check != -1)
      {
	check = myTrans->execute(Commit);   
	myTrans->releaseCompletedOperations();
      }

      if(check == -1)
      {
	/**
	 * Create a new transaction, while keeping scan open
	 */
	check = myTrans->restart();
      }

      /**
       * Check for errors
       */
      err = myTrans->getNdbError();    
      if(check == -1){
	if(err.status == NdbError::TemporaryError){
      if(check == -1)
      {
	if(err.status == NdbError::TemporaryError)
	{
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  milliSleep(50);
@@ -426,10 +382,10 @@ int scan_delete(Ndb* myNdb,
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return 0;

    
  }
  if(myTrans!=0) {
  
  if(myTrans!=0) 
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
  }
@@ -438,10 +394,7 @@ int scan_delete(Ndb* myNdb,


int scan_update(Ndb* myNdb, 
		int parallelism,
		int column_len,
		int update_column,
		const char * column_name,
		const char * before_color,
		const char * after_color)
		
@@ -505,8 +458,8 @@ int scan_update(Ndb* myNdb,
    /**
     * Define a result set for the scan.
     */ 
    NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
    if( rs == 0 ) {
    if( myScanOp->readTuplesExclusive(NdbOperation::LM_Exclusive) ) 
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
@@ -517,7 +470,7 @@ int scan_update(Ndb* myNdb,
     */ 
    NdbScanFilter filter(myScanOp) ;   
    if(filter.begin(NdbScanFilter::AND) < 0  || 
       filter.eq(update_column, before_color, column_len, false) <0||
       filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0||
       filter.end() <0)
    {
      std::cout <<  myTrans->getNdbError().message << std::endl;
@@ -528,7 +481,8 @@ int scan_update(Ndb* myNdb,
    /**
     * Start scan    (NoCommit since we are only reading at this stage);
     */     
    if(myTrans->execute(NoCommit) != 0){      
    if(myTrans->execute(NoCommit) != 0)
    {      
      err = myTrans->getNdbError();    
      if(err.status == NdbError::TemporaryError){
	std::cout << myTrans->getNdbError().message << std::endl;
@@ -541,26 +495,24 @@ int scan_update(Ndb* myNdb,
      return -1;
    }

   /**
    * Define an update operation
    */    
    NdbOperation * myUpdateOp;
    /**
     * start of loop: nextResult(true) means that "parallelism" number of
     * rows are fetched from NDB and cached in NDBAPI
     */    
    while((check = rs->nextResult(true)) == 0){
    while((check = myScanOp->nextResult(true)) == 0){
      do {
	/**
	 * Get update operation
	 */    
	myUpdateOp = rs->updateTuple();
	if (myUpdateOp == 0){
	NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
	if (myUpdateOp == 0)
	{
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
	  return -1;
	}
	updatedRows++;

	/**
	 * do the update
	 */    
@@ -570,20 +522,22 @@ int scan_update(Ndb* myNdb,
	 * cached in the NDBAPI are modified before
	 * fetching more rows from NDB.
	 */    
      } while((check = rs->nextResult(false)) == 0);
      } while((check = myScanOp->nextResult(false)) == 0);
      
      /**
       * Commit when all cached tuple have been updated
       * NoCommit when all cached tuple have been updated
       */    
      if(check != -1){
	check = myTrans->execute(Commit);   
	myTrans->releaseCompletedOperations();
      if(check != -1)
      {
	check = myTrans->execute(NoCommit);   
      }

      /**
       * Check for errors
       */
      err = myTrans->getNdbError();    
      if(check == -1){
      if(check == -1)
      {
	if(err.status == NdbError::TemporaryError){
	  std::cout << myTrans->getNdbError().message << std::endl;
	  myNdb->closeTransaction(myTrans);
@@ -595,13 +549,28 @@ int scan_update(Ndb* myNdb,
       * End of loop 
       */
    }

    /**
     * Commit all prepared operations
     */
    if(myTrans->execute(Commit) == -1)
    {
      if(err.status == NdbError::TemporaryError){
	std::cout << myTrans->getNdbError().message << std::endl;
	myNdb->closeTransaction(myTrans);
	milliSleep(50);
	continue;
      }	
    }

    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
    return 0;    
  }


  }
  if(myTrans!=0) {
  if(myTrans!=0) 
  {
    std::cout << myTrans->getNdbError().message << std::endl;
    myNdb->closeTransaction(myTrans);
  }
@@ -610,9 +579,7 @@ int scan_update(Ndb* myNdb,



int scan_print(Ndb * myNdb, int parallelism, 
	       int column_len_brand, 
	       int column_len_color) 
int scan_print(Ndb * myNdb)
{
// Scan all records exclusive and update
  // them one by one
@@ -674,10 +641,10 @@ int scan_print(Ndb * myNdb, int parallelism,
    }

    /**
     * Define a result set for the scan.
     * Read without locks, without being placed in lock queue
     */
    NdbResultSet * rs = myScanOp->readTuplesExclusive(parallelism);
    if( rs == 0 ) {
    if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
    {
      std::cout << myTrans->getNdbError().message << std::endl;
      myNdb->closeTransaction(myTrans);
      return -1;
@@ -719,7 +686,7 @@ int scan_print(Ndb * myNdb, int parallelism,
     * start of loop: nextResult(true) means that "parallelism" number of
     * rows are fetched from NDB and cached in NDBAPI
     */    
    while((check = rs->nextResult(true)) == 0){
    while((check = myScanOp->nextResult(true)) == 0){
      do {
	
	fetchedRows++;
@@ -727,28 +694,23 @@ int scan_print(Ndb * myNdb, int parallelism,
	 * print  REG_NO unsigned int
	 */
	std::cout << myRecAttr[0]->u_32_value() << "\t";
	char * buf_brand = new char[column_len_brand+1];
	char * buf_color = new char[column_len_color+1];

	/**
	 * print  BRAND character string
	 */
	memcpy(buf_brand, myRecAttr[1]->aRef(), column_len_brand);
	buf_brand[column_len_brand] = 0;
	std::cout << buf_brand << "\t";
	delete [] buf_brand;
	std::cout << myRecAttr[1]->aRef() << "\t";

	/**
	 * print  COLOR character string
	 */
	memcpy(buf_color, myRecAttr[2]->aRef(), column_len_color);
	buf_brand[column_len_color] = 0;
	std::cout << buf_color << std::endl;
	delete [] buf_color;	
	std::cout << myRecAttr[2]->aRef() << std::endl;

	/**
	 * nextResult(false) means that the records 
	 * cached in the NDBAPI are modified before
	 * fetching more rows from NDB.
	 */    
      } while((check = rs->nextResult(false)) == 0);
      } while((check = myScanOp->nextResult(false)) == 0);

    }    
    myNdb->closeTransaction(myTrans);
@@ -762,55 +724,67 @@ int scan_print(Ndb * myNdb, int parallelism,
int main()
{
  ndb_init();
  Ndb* myNdb = new Ndb( "TEST_DB" );  // Object representing the database

  Ndb_cluster_connection cluster_connection;

  if (cluster_connection.connect(12, 5, 1))
  {
    std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
    exit(-1);
  }

  if (cluster_connection.wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
    exit(-1);
  }
  
  Ndb myNdb(&cluster_connection,"TEST_DB" );  
  
  /*******************************************
   * Initialize NDB and wait until its ready *
   *******************************************/
  if (myNdb->init(1024) == -1) {          // Set max 1024  parallel transactions
    APIERROR(myNdb->getNdbError());
  if (myNdb.init(1024) == -1) {          // Set max 1024  parallel transactions
    APIERROR(myNdb.getNdbError());
    exit(-1);
  }

  if (myNdb->waitUntilReady(30) != 0) {
    std::cout << "NDB was not ready within 30 secs." << std::endl;
    exit(-1);
  }
  create_table(myNdb);
  create_table(&myNdb);
  
  NdbDictionary::Dictionary* myDict = myNdb->getDictionary();
  NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
  int column_color = myDict->getTable("GARAGE")->getColumn("COLOR")->getColumnNo();
  int column_len_color = 
    myDict->getTable("GARAGE")->getColumn("COLOR")->getLength();
  int column_len_brand = 
    myDict->getTable("GARAGE")->getColumn("BRAND")->getLength();
  int parallelism = 16;
  

  if(populate(myNdb) > 0)
  if(populate(&myNdb) > 0)
    std::cout << "populate: Success!" << std::endl;
  
  if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
  if(scan_print(&myNdb) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
  
  std::cout << "Going to delete all pink cars!" << std::endl;
  if(scan_delete(myNdb, parallelism, column_color,
		 column_len_color, "Pink") > 0)
  
  {
    /**
     * Note! color needs to be of exact the same size as column defined
     */
    char color[20] = "Pink";
    if(scan_delete(&myNdb, column_color, color) > 0)
      std::cout << "scan_delete: Success!" << std::endl  << std::endl;
  }

  if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
  if(scan_print(&myNdb) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
  
  std::cout << "Going to update all blue cars to black cars!" << std::endl;
  if(scan_update(myNdb, parallelism, column_len_color, column_color, 
		 "COLOR", "Blue", "Black") > 0) 
  {
    /**
     * Note! color1 & 2 need to be of exact the same size as column defined
     */
    char color1[20] = "Blue";
    char color2[20] = "Black";
    std::cout << "Going to update all " << color1 
	      << " cars to " << color2 << " cars!" << std::endl;
    if(scan_update(&myNdb, column_color, color1, color2) > 0) 
      std::cout << "scan_update: Success!" << std::endl  << std::endl;
  }
  if(scan_print(myNdb, parallelism, column_len_brand, column_len_color) > 0)
  if(scan_print(&myNdb) > 0)
    std::cout << "scan_print: Success!" << std::endl  << std::endl;

  delete myNdb; 
}
+1 −1
Original line number Diff line number Diff line
@@ -88,7 +88,7 @@ public:
  /**
   * Compare column <b>ColId</b> with <b>val</b>
   */
  int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len); 
  int cmp(BinaryCondition cond, int ColId, const void *val, Uint32 len = 0); 

  /** 
   * @name Integer Comparators