Loading ndb/include/ndbapi/NdbEventOperation.hpp +39 −34 Original line number Diff line number Diff line Loading @@ -24,73 +24,78 @@ class NdbEventOperationImpl; * @class NdbEventOperation * @brief Class of operations for getting change events from database. * * An NdbEventOperation object is instantiated by * Ndb::createEventOperation * * Prior to that an event must have been created in the Database through * NdbDictionary::createEvent * * The instance is removed by Ndb::dropEventOperation * Brief description on how to work with events: * * - An event i created in the Database through * NdbDictionary::Dictionary::createEvent() (note that this can be done * by any application or thread and not necessarily by the "listener") * - To listen to events, an NdbEventOperation object is instantiated by * Ndb::createEventOperation() * - execute() starts the event flow. Use Ndb::pollEvents() to wait * for an event to occur. Use next() to iterate * through the events that have occured. * - The instance is removed by Ndb::dropEventOperation() * * For more info see: * @ref ndbapi_event.cpp * * Known limitations: * * Maximum number of active NdbEventOperations are now set at compile time. * - Maximum number of active NdbEventOperations are now set at compile time. * Today 100. This will become a configuration parameter later. * * Maximum number of NdbEventOperations tied to same event are maximum 16 * - Maximum number of NdbEventOperations tied to same event are maximum 16 * per process. * * Known issues: * * When several NdbEventOperation's are tied to the same event in the same * - When several NdbEventOperation's are tied to the same event in the same * process they will share the circular buffer. The BufferLength will then * be the same for all and decided by the first NdbEventOperation * instantiation. Just make sure to instantiate the "largest" one first. * * Today all events INSERT/DELETE/UPDATE and all changed attributes are * - Today all events INSERT/DELETE/UPDATE and all changed attributes are * sent to the API, even if only specific attributes have been specified. * These are however hidden from the user and only relevant data is shown * after next(). * However false exits from Ndb::pollEvents() may occur and thus * - "False" exits from Ndb::pollEvents() may occur and thus * the subsequent next() will return zero, * since there was no available data. Just do Ndb::pollEvents() again. * * Event code does not check table schema version. Make sure to drop events * - Event code does not check table schema version. Make sure to drop events * after table is dropped. Will be fixed in later * versions. * * If a node failure has occured not all events will be recieved * - If a node failure has occured not all events will be recieved * anymore. Drop NdbEventOperation and Create again after nodes are up * again. Will be fixed in later versions. * * Test status: * Tests have been run on 1-node and 2-node systems * * Known bugs: * * None, except if we can call some of the "issues" above bugs * - Tests have been run on 1-node and 2-node systems * * Useful API programs: * * ndb_select_all -d sys 'NDB$EVENTS_0' * Will show contents in the system table containing created events. * - ndb_select_all -d sys 'NDB$EVENTS_0' * shows contents in the system table containing created events. * * @note this is an inteface to viewing events that is subject to change */ class NdbEventOperation { public: /** * State of the NdbEventOperation object */ enum State { EO_CREATED, ///< Created but execute() not called EO_EXECUTING, ///< execute() called EO_ERROR ///< An error has occurred. Object unusable. }; /** * Retrieve current state of the NdbEventOperation object */ enum State {CREATED,EXECUTING,ERROR}; State getState(); /** * Activates the NdbEventOperation to start receiving events. The * changed attribute values may be retrieved after next() has returned * a value greater than zero. The getValue() methods below must be called * a value greater than zero. The getValue() methods must be called * prior to execute(). * * @return 0 if successful otherwise -1. Loading @@ -112,21 +117,21 @@ public: * aligned appropriately. The buffer is used directly * (avoiding a copy penalty) only if it is aligned on a * 4-byte boundary and the attribute size in bytes * (i.e. NdbRecAttr::attrSize times NdbRecAttr::arraySize is * (i.e. NdbRecAttr::attrSize() times NdbRecAttr::arraySize() is * a multiple of 4). * * @note There are two versions, NdbOperation::getValue and * NdbOperation::getPreValue for retrieving the current and * @note There are two versions, getValue() and * getPreValue() for retrieving the current and * previous value repectively. * * @note This method does not fetch the attribute value from * the database! The NdbRecAttr object returned by this method * is <em>not</em> readable/printable before the * NdbEventConnection::execute has been made and * NdbEventConnection::next has returned a value greater than * execute() has been made and * next() has returned a value greater than * zero. If a specific attribute has not changed the corresponding * NdbRecAttr will be in state UNDEFINED. This is checked by * NdbRecAttr::isNull which then returns -1. * NdbRecAttr::isNull() which then returns -1. * * @param anAttrName Attribute name * @param aValue If this is non-NULL, then the attribute value Loading @@ -143,11 +148,11 @@ public: /** * Retrieves event resultset if available, inserted into the NdbRecAttrs * specified in getValue() and getPreValue(). To avoid polling for * a resultset, one can use Ndb::pollEvents * a resultset, one can use Ndb::pollEvents() * which will wait on a mutex until an event occurs or the specified * timeout occurs. * * @return >=0 if successful otherwise -1. Return value inicates number * @return >=0 if successful otherwise -1. Return value indicates number * of available events. By sending pOverRun one may query for buffer * overflow and *pOverRun will indicate the number of events that have * overwritten. Loading ndb/src/ndbapi/Ndb.cpp +8 −1 Original line number Diff line number Diff line Loading @@ -1177,7 +1177,14 @@ NdbEventOperation* Ndb::createEventOperation(const char* eventName, tOp = new NdbEventOperation(this, eventName, bufferLength); if (tOp->getState() != NdbEventOperation::CREATED) { if (tOp == 0) { theError.code= 4000; return NULL; } if (tOp->getState() != NdbEventOperation::EO_CREATED) { theError.code= tOp->getNdbError().code; delete tOp; tOp = NULL; } Loading ndb/src/ndbapi/NdbEventOperationImpl.cpp +16 −18 Original line number Diff line number Diff line Loading @@ -55,9 +55,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, const char* eventName, const int bufferLength) : NdbEventOperation(*this), m_ndb(theNdb), m_state(ERROR), m_bufferL(bufferLength) m_state(EO_ERROR), m_bufferL(bufferLength) { m_eventId = 0; theFirstRecAttrs[0] = NULL; theCurrentRecAttrs[0] = NULL; Loading @@ -71,16 +70,15 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, // we should lookup id in Dictionary, TODO // also make sure we only have one listener on each event if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; } if (!m_ndb) abort(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { ndbout_c("getDictionary=NULL"); return; } if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; } const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; } if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; } m_eventImpl = &myEvnt->m_impl; if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; } m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); if (m_bufferHandle->m_bufferL > 0) Loading @@ -88,7 +86,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, else m_bufferHandle->m_bufferL = m_bufferL; m_state = CREATED; m_state = EO_CREATED; } NdbEventOperationImpl::~NdbEventOperationImpl() Loading @@ -106,7 +104,7 @@ NdbEventOperationImpl::~NdbEventOperationImpl() p = p_next; } } if (m_state == NdbEventOperation::EXECUTING) { if (m_state == EO_EXECUTING) { stop(); // m_bufferHandle->dropSubscribeEvent(m_bufferId); ; // We should send stop signal here Loading @@ -122,7 +120,7 @@ NdbEventOperationImpl::getState() NdbRecAttr* NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n) { if (m_state != NdbEventOperation::CREATED) { if (m_state != EO_CREATED) { ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()"); return NULL; } Loading Loading @@ -211,8 +209,8 @@ NdbEventOperationImpl::execute() { NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { ndbout_c("NdbEventOperation::execute(): getDictionary=NULL"); return 0; m_error.code= m_ndb->getNdbError().code; return -1; } if (theFirstRecAttrs[0] == NULL) { // defaults to get all Loading Loading @@ -245,14 +243,14 @@ NdbEventOperationImpl::execute() if (r) { //Error m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId); m_state = NdbEventOperation::ERROR; m_state = EO_ERROR; } else { m_bufferHandle->addSubscribeEvent(m_bufferId, this); m_state = NdbEventOperation::EXECUTING; m_state = EO_EXECUTING; } } else { //Error m_state = NdbEventOperation::ERROR; m_state = EO_ERROR; } return r; } Loading @@ -261,14 +259,14 @@ int NdbEventOperationImpl::stop() { DBUG_ENTER("NdbEventOperationImpl::stop"); if (m_state != NdbEventOperation::EXECUTING) if (m_state != EO_EXECUTING) DBUG_RETURN(-1); // ndbout_c("NdbEventOperation::stopping()"); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { ndbout_c("NdbEventOperation::stop(): getDictionary=NULL"); m_error.code= m_ndb->getNdbError().code; DBUG_RETURN(-1); } Loading Loading @@ -299,13 +297,13 @@ NdbEventOperationImpl::stop() //Error m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId); m_error.code= myDictImpl.m_error.code; m_state = NdbEventOperation::ERROR; m_state = EO_ERROR; } else { #ifdef EVENT_DEBUG ndbout_c("NdbEventOperation::dropping()"); #endif m_bufferHandle->dropSubscribeEvent(m_bufferId); m_state = NdbEventOperation::CREATED; m_state = EO_CREATED; } DBUG_RETURN(r); Loading Loading
ndb/include/ndbapi/NdbEventOperation.hpp +39 −34 Original line number Diff line number Diff line Loading @@ -24,73 +24,78 @@ class NdbEventOperationImpl; * @class NdbEventOperation * @brief Class of operations for getting change events from database. * * An NdbEventOperation object is instantiated by * Ndb::createEventOperation * * Prior to that an event must have been created in the Database through * NdbDictionary::createEvent * * The instance is removed by Ndb::dropEventOperation * Brief description on how to work with events: * * - An event i created in the Database through * NdbDictionary::Dictionary::createEvent() (note that this can be done * by any application or thread and not necessarily by the "listener") * - To listen to events, an NdbEventOperation object is instantiated by * Ndb::createEventOperation() * - execute() starts the event flow. Use Ndb::pollEvents() to wait * for an event to occur. Use next() to iterate * through the events that have occured. * - The instance is removed by Ndb::dropEventOperation() * * For more info see: * @ref ndbapi_event.cpp * * Known limitations: * * Maximum number of active NdbEventOperations are now set at compile time. * - Maximum number of active NdbEventOperations are now set at compile time. * Today 100. This will become a configuration parameter later. * * Maximum number of NdbEventOperations tied to same event are maximum 16 * - Maximum number of NdbEventOperations tied to same event are maximum 16 * per process. * * Known issues: * * When several NdbEventOperation's are tied to the same event in the same * - When several NdbEventOperation's are tied to the same event in the same * process they will share the circular buffer. The BufferLength will then * be the same for all and decided by the first NdbEventOperation * instantiation. Just make sure to instantiate the "largest" one first. * * Today all events INSERT/DELETE/UPDATE and all changed attributes are * - Today all events INSERT/DELETE/UPDATE and all changed attributes are * sent to the API, even if only specific attributes have been specified. * These are however hidden from the user and only relevant data is shown * after next(). * However false exits from Ndb::pollEvents() may occur and thus * - "False" exits from Ndb::pollEvents() may occur and thus * the subsequent next() will return zero, * since there was no available data. Just do Ndb::pollEvents() again. * * Event code does not check table schema version. Make sure to drop events * - Event code does not check table schema version. Make sure to drop events * after table is dropped. Will be fixed in later * versions. * * If a node failure has occured not all events will be recieved * - If a node failure has occured not all events will be recieved * anymore. Drop NdbEventOperation and Create again after nodes are up * again. Will be fixed in later versions. * * Test status: * Tests have been run on 1-node and 2-node systems * * Known bugs: * * None, except if we can call some of the "issues" above bugs * - Tests have been run on 1-node and 2-node systems * * Useful API programs: * * ndb_select_all -d sys 'NDB$EVENTS_0' * Will show contents in the system table containing created events. * - ndb_select_all -d sys 'NDB$EVENTS_0' * shows contents in the system table containing created events. * * @note this is an inteface to viewing events that is subject to change */ class NdbEventOperation { public: /** * State of the NdbEventOperation object */ enum State { EO_CREATED, ///< Created but execute() not called EO_EXECUTING, ///< execute() called EO_ERROR ///< An error has occurred. Object unusable. }; /** * Retrieve current state of the NdbEventOperation object */ enum State {CREATED,EXECUTING,ERROR}; State getState(); /** * Activates the NdbEventOperation to start receiving events. The * changed attribute values may be retrieved after next() has returned * a value greater than zero. The getValue() methods below must be called * a value greater than zero. The getValue() methods must be called * prior to execute(). * * @return 0 if successful otherwise -1. Loading @@ -112,21 +117,21 @@ public: * aligned appropriately. The buffer is used directly * (avoiding a copy penalty) only if it is aligned on a * 4-byte boundary and the attribute size in bytes * (i.e. NdbRecAttr::attrSize times NdbRecAttr::arraySize is * (i.e. NdbRecAttr::attrSize() times NdbRecAttr::arraySize() is * a multiple of 4). * * @note There are two versions, NdbOperation::getValue and * NdbOperation::getPreValue for retrieving the current and * @note There are two versions, getValue() and * getPreValue() for retrieving the current and * previous value repectively. * * @note This method does not fetch the attribute value from * the database! The NdbRecAttr object returned by this method * is <em>not</em> readable/printable before the * NdbEventConnection::execute has been made and * NdbEventConnection::next has returned a value greater than * execute() has been made and * next() has returned a value greater than * zero. If a specific attribute has not changed the corresponding * NdbRecAttr will be in state UNDEFINED. This is checked by * NdbRecAttr::isNull which then returns -1. * NdbRecAttr::isNull() which then returns -1. * * @param anAttrName Attribute name * @param aValue If this is non-NULL, then the attribute value Loading @@ -143,11 +148,11 @@ public: /** * Retrieves event resultset if available, inserted into the NdbRecAttrs * specified in getValue() and getPreValue(). To avoid polling for * a resultset, one can use Ndb::pollEvents * a resultset, one can use Ndb::pollEvents() * which will wait on a mutex until an event occurs or the specified * timeout occurs. * * @return >=0 if successful otherwise -1. Return value inicates number * @return >=0 if successful otherwise -1. Return value indicates number * of available events. By sending pOverRun one may query for buffer * overflow and *pOverRun will indicate the number of events that have * overwritten. Loading
ndb/src/ndbapi/Ndb.cpp +8 −1 Original line number Diff line number Diff line Loading @@ -1177,7 +1177,14 @@ NdbEventOperation* Ndb::createEventOperation(const char* eventName, tOp = new NdbEventOperation(this, eventName, bufferLength); if (tOp->getState() != NdbEventOperation::CREATED) { if (tOp == 0) { theError.code= 4000; return NULL; } if (tOp->getState() != NdbEventOperation::EO_CREATED) { theError.code= tOp->getNdbError().code; delete tOp; tOp = NULL; } Loading
ndb/src/ndbapi/NdbEventOperationImpl.cpp +16 −18 Original line number Diff line number Diff line Loading @@ -55,9 +55,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, const char* eventName, const int bufferLength) : NdbEventOperation(*this), m_ndb(theNdb), m_state(ERROR), m_bufferL(bufferLength) m_state(EO_ERROR), m_bufferL(bufferLength) { m_eventId = 0; theFirstRecAttrs[0] = NULL; theCurrentRecAttrs[0] = NULL; Loading @@ -71,16 +70,15 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, // we should lookup id in Dictionary, TODO // also make sure we only have one listener on each event if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; } if (!m_ndb) abort(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { ndbout_c("getDictionary=NULL"); return; } if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; } const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; } if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; } m_eventImpl = &myEvnt->m_impl; if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; } m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); if (m_bufferHandle->m_bufferL > 0) Loading @@ -88,7 +86,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, else m_bufferHandle->m_bufferL = m_bufferL; m_state = CREATED; m_state = EO_CREATED; } NdbEventOperationImpl::~NdbEventOperationImpl() Loading @@ -106,7 +104,7 @@ NdbEventOperationImpl::~NdbEventOperationImpl() p = p_next; } } if (m_state == NdbEventOperation::EXECUTING) { if (m_state == EO_EXECUTING) { stop(); // m_bufferHandle->dropSubscribeEvent(m_bufferId); ; // We should send stop signal here Loading @@ -122,7 +120,7 @@ NdbEventOperationImpl::getState() NdbRecAttr* NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n) { if (m_state != NdbEventOperation::CREATED) { if (m_state != EO_CREATED) { ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()"); return NULL; } Loading Loading @@ -211,8 +209,8 @@ NdbEventOperationImpl::execute() { NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { ndbout_c("NdbEventOperation::execute(): getDictionary=NULL"); return 0; m_error.code= m_ndb->getNdbError().code; return -1; } if (theFirstRecAttrs[0] == NULL) { // defaults to get all Loading Loading @@ -245,14 +243,14 @@ NdbEventOperationImpl::execute() if (r) { //Error m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId); m_state = NdbEventOperation::ERROR; m_state = EO_ERROR; } else { m_bufferHandle->addSubscribeEvent(m_bufferId, this); m_state = NdbEventOperation::EXECUTING; m_state = EO_EXECUTING; } } else { //Error m_state = NdbEventOperation::ERROR; m_state = EO_ERROR; } return r; } Loading @@ -261,14 +259,14 @@ int NdbEventOperationImpl::stop() { DBUG_ENTER("NdbEventOperationImpl::stop"); if (m_state != NdbEventOperation::EXECUTING) if (m_state != EO_EXECUTING) DBUG_RETURN(-1); // ndbout_c("NdbEventOperation::stopping()"); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { ndbout_c("NdbEventOperation::stop(): getDictionary=NULL"); m_error.code= m_ndb->getNdbError().code; DBUG_RETURN(-1); } Loading Loading @@ -299,13 +297,13 @@ NdbEventOperationImpl::stop() //Error m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId); m_error.code= myDictImpl.m_error.code; m_state = NdbEventOperation::ERROR; m_state = EO_ERROR; } else { #ifdef EVENT_DEBUG ndbout_c("NdbEventOperation::dropping()"); #endif m_bufferHandle->dropSubscribeEvent(m_bufferId); m_state = NdbEventOperation::CREATED; m_state = EO_CREATED; } DBUG_RETURN(r); Loading