Loading ndb/src/kernel/blocks/backup/restore/Restore.cpp +116 −133 Original line number Diff line number Diff line Loading @@ -122,7 +122,8 @@ RestoreMetaData::readMetaTableList() { Uint32 sectionInfo[2]; if (fread(§ionInfo, sizeof(sectionInfo), 1, m_file) != 1){ if (buffer_read(§ionInfo, sizeof(sectionInfo), 1) != 1){ err << "readMetaTableList read header error" << endl; return 0; } sectionInfo[0] = ntohl(sectionInfo[0]); Loading @@ -130,11 +131,9 @@ RestoreMetaData::readMetaTableList() { const Uint32 tabCount = sectionInfo[1] - 2; const Uint32 len = 4 * tabCount; if(createBuffer(len) == 0) abort(); if (fread(m_buffer, 1, len, m_file) != len){ void *tmp; if (buffer_get_ptr(&tmp, 4, tabCount) != tabCount){ err << "readMetaTableList read tabCount error" << endl; return 0; } Loading @@ -147,7 +146,7 @@ RestoreMetaData::readMetaTableDesc() { Uint32 sectionInfo[2]; // Read section header if (fread(§ionInfo, sizeof(sectionInfo), 1, m_file) != 1){ if (buffer_read(§ionInfo, sizeof(sectionInfo), 1) != 1){ err << "readMetaTableDesc read header error" << endl; return false; } // if Loading @@ -156,20 +155,15 @@ RestoreMetaData::readMetaTableDesc() { assert(sectionInfo[0] == BackupFormat::TABLE_DESCRIPTION); // Allocate temporary storage for dictTabInfo buffer const Uint32 len = (sectionInfo[1] - 2); if (createBuffer(4 * (len+1)) == NULL) { err << "readMetaTableDesc allocation error" << endl; return false; } // if // Read dictTabInfo buffer if (fread(m_buffer, 4, len, m_file) != len){ const Uint32 len = (sectionInfo[1] - 2); void *ptr; if (buffer_get_ptr(&ptr, 4, len) != len){ err << "readMetaTableDesc read error" << endl; return false; } // if return parseTableDescriptor(m_buffer, len); return parseTableDescriptor((Uint32*)ptr, len); } bool Loading @@ -177,11 +171,10 @@ RestoreMetaData::readGCPEntry() { Uint32 data[4]; BackupFormat::CtlFile::GCPEntry * dst = (BackupFormat::CtlFile::GCPEntry *)&data[0]; if(fread(dst, 4, 4, m_file) != 4){ if(buffer_read(dst, 4, 4) != 4){ err << "readGCPEntry read error" << endl; return false; } Loading Loading @@ -212,6 +205,12 @@ TableS::TableS(NdbTableImpl* tableImpl) createAttr(tableImpl->getColumn(i)); } TableS::~TableS() { for (int i = 0; i < allAttributesDesc.size(); i++) delete allAttributesDesc[i]; } // Parse dictTabInfo buffer and pushback to to vector storage bool RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) Loading Loading @@ -247,31 +246,18 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) // Constructor RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md, void (* _free_data_callback)()) : m_metaData(md), free_data_callback(_free_data_callback) : BackupFile(_free_data_callback), m_metaData(md) { debug << "RestoreDataIterator constructor" << endl; setDataFile(md, 0); m_buffer_sz = 64*1024; m_buffer = malloc(m_buffer_sz); m_buffer_ptr = m_buffer; m_buffer_data_left = 0; } RestoreDataIterator::~RestoreDataIterator() { if (m_buffer) free(m_buffer); } TupleS & TupleS::operator=(const TupleS& tuple) { prepareRecord(*tuple.m_currentTable); if (allAttrData) { allAttrData= new AttributeData[getNoOfAttributes()]; if (allAttrData) memcpy(allAttrData, tuple.allAttrData, getNoOfAttributes()*sizeof(AttributeData)); } return *this; }; Loading @@ -296,12 +282,16 @@ AttributeData * TupleS::getData(int i) const{ bool TupleS::prepareRecord(const TableS & tab){ if (allAttrData) { if (getNoOfAttributes() == tab.getNoOfAttributes()) { m_currentTable = &tab; return true; } delete [] allAttrData; m_currentTable= 0; } allAttrData = new AttributeData[tab.getNoOfAttributes()]; if (allAttrData == 0) return false; Loading @@ -310,47 +300,12 @@ TupleS::prepareRecord(const TableS & tab){ return true; } Uint32 RestoreDataIterator::get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream) { Uint32 sz = size*nmemb; if (sz > m_buffer_data_left) { if (free_data_callback) (*free_data_callback)(); memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left); size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, stream); m_buffer_data_left += r; m_buffer_ptr = m_buffer; if (sz > m_buffer_data_left) sz = size * (m_buffer_data_left / size); } *p_buf_ptr = m_buffer_ptr; m_buffer_ptr = ((char*)m_buffer_ptr)+sz; m_buffer_data_left -= sz; return sz/size; } Uint32 RestoreDataIterator::fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream) { void *buf_ptr; Uint32 r = get_buffer_ptr(&buf_ptr, size, nmemb, stream); memcpy(ptr, buf_ptr, r*size); return r; } const TupleS * RestoreDataIterator::getNextTuple(int & res) { Uint32 dataLength = 0; // Read record length if (fread_buffer(&dataLength, sizeof(dataLength), 1, m_file) != 1){ if (buffer_read(&dataLength, sizeof(dataLength), 1) != 1){ err << "getNextTuple:Error reading length of data part" << endl; res = -1; return NULL; Loading @@ -370,7 +325,7 @@ RestoreDataIterator::getNextTuple(int & res) // Read tuple data void *_buf_ptr; if (get_buffer_ptr(&_buf_ptr, 1, dataLenBytes, m_file) != dataLenBytes) { if (buffer_get_ptr(&_buf_ptr, 1, dataLenBytes) != dataLenBytes) { err << "getNextTuple:Read error: " << endl; res = -1; return NULL; Loading Loading @@ -468,12 +423,17 @@ RestoreDataIterator::getNextTuple(int & res) return &m_tuple; } // RestoreDataIterator::getNextTuple BackupFile::BackupFile(){ BackupFile::BackupFile(void (* _free_data_callback)()) : free_data_callback(_free_data_callback) { m_file = 0; m_path[0] = 0; m_fileName[0] = 0; m_buffer = 0; m_bufferSize = 0; m_buffer_sz = 64*1024; m_buffer = malloc(m_buffer_sz); m_buffer_ptr = m_buffer; m_buffer_data_left = 0; } BackupFile::~BackupFile(){ Loading @@ -494,15 +454,54 @@ BackupFile::openFile(){ return m_file != 0; } Uint32 * BackupFile::createBuffer(Uint32 bytes){ if(bytes > m_bufferSize){ if(m_buffer != 0) free(m_buffer); m_bufferSize = m_bufferSize + 2 * bytes; m_buffer = (Uint32*)malloc(m_bufferSize); Uint32 BackupFile::buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb) { Uint32 sz = size*nmemb; if (sz > m_buffer_data_left) { if (free_data_callback) (*free_data_callback)(); memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left); size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, m_file); m_buffer_data_left += r; m_buffer_ptr = m_buffer; if (sz > m_buffer_data_left) sz = size * (m_buffer_data_left / size); } *p_buf_ptr = m_buffer_ptr; return sz/size; } Uint32 BackupFile::buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb) { Uint32 r = buffer_get_ptr_ahead(p_buf_ptr, size, nmemb); m_buffer_ptr = ((char*)m_buffer_ptr)+(r*size); m_buffer_data_left -= (r*size); return r; } Uint32 BackupFile::buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb) { void *buf_ptr; Uint32 r = buffer_get_ptr_ahead(&buf_ptr, size, nmemb); memcpy(ptr, buf_ptr, r*size); return r; } return m_buffer; Uint32 BackupFile::buffer_read(void *ptr, Uint32 size, Uint32 nmemb) { void *buf_ptr; Uint32 r = buffer_get_ptr(&buf_ptr, size, nmemb); memcpy(ptr, buf_ptr, r*size); return r; } void Loading Loading @@ -563,7 +562,7 @@ BackupFile::readHeader(){ return false; } if(fread(&m_fileHeader, sizeof(m_fileHeader), 1, m_file) != 1){ if(buffer_read(&m_fileHeader, sizeof(m_fileHeader), 1) != 1){ err << "readDataFileHeader: Error reading header" << endl; return false; } Loading Loading @@ -611,14 +610,13 @@ BackupFile::validateFooter(){ return true; } bool RestoreDataIterator::readFragmentHeader(int & ret) bool RestoreDataIterator::readFragmentHeader(int & ret) { BackupFormat::DataFile::FragmentHeader Header; debug << "RestoreDataIterator::getNextFragment" << endl; if (fread_buffer(&Header, sizeof(Header), 1, m_file) != 1){ if (buffer_read(&Header, sizeof(Header), 1) != 1){ ret = 0; return false; } // if Loading Loading @@ -663,7 +661,7 @@ bool RestoreDataIterator::validateFragmentFooter() { BackupFormat::DataFile::FragmentFooter footer; if (fread_buffer(&footer, sizeof(footer), 1, m_file) != 1){ if (buffer_read(&footer, sizeof(footer), 1) != 1){ err << "getFragmentFooter:Error reading fragment footer" << endl; return false; } Loading Loading @@ -776,36 +774,23 @@ RestoreLogIterator::getNextLogEntry(int & res) { Uint32 len= ~0; const Uint32 stopGCP = m_metaData.getStopGCP(); do { if(createBuffer(4) == 0) { if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){ res= -1; return NULL; return 0; } len= ntohl(len); if (fread(m_buffer, sizeof(Uint32), 1, m_file) != 1){ res = -1; return NULL; Uint32 data_len = sizeof(Uint32) + len*4; if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) { res= -2; return 0; } m_buffer[0] = ntohl(m_buffer[0]); len = m_buffer[0]; if(len == 0){ res= 0; return 0; } if(createBuffer(4 * (len + 1)) == 0){ res = -1; return NULL; } if (fread(&m_buffer[1], 4, len, m_file) != len) { res = -1; return NULL; } logE = (LogE *)&m_buffer[0]; logE->TableId= ntohl(logE->TableId); logE->TriggerEvent= ntohl(logE->TriggerEvent); Loading @@ -818,9 +803,6 @@ RestoreLogIterator::getNextLogEntry(int & res) { } } while(gcp > stopGCP + 1); for(int i=0; i<m_logEntry.m_values.size();i++) delete m_logEntry.m_values[i]; m_logEntry.m_values.clear(); m_logEntry.m_table = m_metaData.getTable(logE->TableId); switch(logE->TriggerEvent){ case TriggerEvent::TE_INSERT: Loading @@ -838,31 +820,32 @@ RestoreLogIterator::getNextLogEntry(int & res) { } const TableS * tab = m_logEntry.m_table; m_logEntry.clear(); AttributeHeader * ah = (AttributeHeader *)&logE->Data[0]; AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 2]; AttributeS * attr; while(ah < end){ attr = new AttributeS; attr= m_logEntry.add_attr(); if(attr == NULL) { ndbout_c("Restore: Failed to allocate memory"); res = -1; return NULL; return 0; } attr->Desc = (* tab)[ah->getAttributeId()]; assert(attr->Desc != 0); const Uint32 sz = ah->getDataSize(); if(sz == 0){ attr->Data->null = true; attr->Data->void_value = NULL; attr->Data.null = true; attr->Data.void_value = NULL; } else { attr->Data->null = false; attr->Data->void_value = ah->getDataPtr(); attr->Data.null = false; attr->Data.void_value = ah->getDataPtr(); } Twiddle(attr->Desc, attr->Data); m_logEntry.m_values.push_back(attr); Twiddle(attr->Desc, &(attr->Data)); ah = ah->getNext(); } Loading @@ -874,7 +857,7 @@ RestoreLogIterator::getNextLogEntry(int & res) { NdbOut & operator<<(NdbOut& ndbout, const AttributeS& attr){ const AttributeData & data = *(attr.Data); const AttributeData & data = attr.Data; const AttributeDesc & desc = *(attr.Desc); if (data.null) Loading @@ -899,7 +882,7 @@ operator<<(NdbOut& ndbout, const TupleS& tuple) { AttributeData * attr_data = tuple.getData(i); const AttributeDesc * attr_desc = tuple.getDesc(i); const AttributeS attr = {attr_desc, attr_data}; const AttributeS attr = {attr_desc, *attr_data}; debug << i << " " << attr_desc->m_column->getName(); ndbout << attr; Loading Loading @@ -928,12 +911,12 @@ operator<<(NdbOut& ndbout, const LogEntry& logE) ndbout << "Unknown log entry type (not insert, delete or update)" ; } for (int i = 0; i < logE.m_values.size();i++) for (int i = 0; i < logE.size();i++) { const AttributeS * attr = logE.m_values[i]; const AttributeS * attr = logE[i]; ndbout << attr->Desc->m_column->getName() << "="; ndbout << (* attr); if (i < (logE.m_values.size() - 1)) if (i < (logE.size() - 1)) ndbout << ", "; } return ndbout; Loading ndb/src/kernel/blocks/backup/restore/Restore.hpp +50 −27 Original line number Diff line number Diff line Loading @@ -85,7 +85,7 @@ public: struct AttributeS { const AttributeDesc * Desc; AttributeData * Data; AttributeData Data; }; class TupleS { Loading @@ -97,7 +97,10 @@ private: bool prepareRecord(const TableS &); public: TupleS() {}; TupleS() { m_currentTable= 0; allAttrData= 0; }; ~TupleS() { if (allAttrData) Loading Loading @@ -129,17 +132,13 @@ class TableS { Uint32 m_nullBitmaskSize; int pos; char create_string[2048]; /* char mysqlTableName[1024]; char mysqlDatabaseName[1024]; */ void createAttr(NdbDictionary::Column *column); public: class NdbDictionary::Table* m_dictTable; TableS (class NdbTableImpl* dictTable); ~TableS(); Uint32 getTableId() const { return m_dictTable->getTableId(); Loading Loading @@ -192,18 +191,26 @@ protected: BackupFormat::FileHeader m_expectedFileHeader; Uint32 m_nodeId; Uint32 * m_buffer; Uint32 m_bufferSize; Uint32 * createBuffer(Uint32 bytes); void * m_buffer; void * m_buffer_ptr; Uint32 m_buffer_sz; Uint32 m_buffer_data_left; void (* free_data_callback)(); bool openFile(); void setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path); void setDataFile(const BackupFile & bf, Uint32 no); void setLogFile(const BackupFile & bf, Uint32 no); Uint32 buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb); Uint32 buffer_read(void *ptr, Uint32 size, Uint32 nmemb); Uint32 buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb); Uint32 buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb); void setName(const char * path, const char * name); BackupFile(); BackupFile(void (* free_data_callback)() = 0); ~BackupFile(); public: bool readHeader(); Loading Loading @@ -231,14 +238,11 @@ class RestoreMetaData : public BackupFile { bool parseTableDescriptor(const Uint32 * data, Uint32 len); public: RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo); ~RestoreMetaData(); virtual ~RestoreMetaData(); int loadContent(); Uint32 getNoOfTables() const { return allTables.size();} const TableS * operator[](int i) const { return allTables[i];} Loading @@ -254,24 +258,16 @@ class RestoreDataIterator : public BackupFile { const TableS* m_currentTable; TupleS m_tuple; void * m_buffer; void * m_buffer_ptr; Uint32 m_buffer_sz; Uint32 m_buffer_data_left; void (* free_data_callback)(); public: // Constructor RestoreDataIterator(const RestoreMetaData &, void (* free_data_callback)()); ~RestoreDataIterator(); ~RestoreDataIterator() {}; // Read data file fragment header bool readFragmentHeader(int & res); bool validateFragmentFooter(); Uint32 get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream); Uint32 fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream); const TupleS *getNextTuple(int & res); }; Loading @@ -285,8 +281,34 @@ public: EntryType m_type; const TableS * m_table; myVector<AttributeS*> m_values; myVector<AttributeS*> m_values_e; AttributeS *add_attr() { AttributeS * attr; if (m_values_e.size() > 0) { attr = m_values_e[m_values_e.size()-1]; m_values_e.pop_back(); } else { attr = new AttributeS; } m_values.push_back(attr); return attr; } void clear() { for(int i= 0; i < m_values.size(); i++) m_values_e.push_back(m_values[i]); m_values.clear(); } ~LogEntry() { for(int i= 0; i< m_values.size(); i++) delete m_values[i]; for(int i= 0; i< m_values_e.size(); i++) delete m_values_e[i]; } int size() const { return m_values.size(); } const AttributeS * operator[](int i) const { return m_values[i];} }; class RestoreLogIterator : public BackupFile { Loading @@ -297,6 +319,7 @@ private: LogEntry m_logEntry; public: RestoreLogIterator(const RestoreMetaData &); virtual ~RestoreLogIterator() {}; const LogEntry * getNextLogEntry(int & res); }; Loading ndb/src/kernel/blocks/backup/restore/consumer.hpp +1 −7 Original line number Diff line number Diff line Loading @@ -21,20 +21,14 @@ class BackupConsumer { public: virtual ~BackupConsumer() { } virtual bool init() { return true;} virtual bool table(const TableS &){return true;} #ifdef USE_MYSQL virtual bool table(const TableS &, MYSQL* mysqlp) {return true;}; #endif virtual void tuple(const TupleS &){} virtual void tuple_free(){} virtual void endOfTuples(){} virtual void logEntry(const LogEntry &){} virtual void endOfLogEntrys(){} protected: #ifdef USE_MYSQL int create_table_string(const TableS & table, char * ,char *); #endif }; #endif ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp +1 −42 Original line number Diff line number Diff line Loading @@ -27,47 +27,6 @@ BackupPrinter::table(const TableS & tab) return true; } #ifdef USE_MYSQL bool BackupPrinter::table(const TableS & tab, MYSQL * mysql) { if (m_print || m_print_meta) { char tmpTabName[MAX_TAB_NAME_SIZE*2]; sprintf(tmpTabName, "%s", tab.getTableName()); char * database = strtok(tmpTabName, "/"); char * schema = strtok( NULL , "/"); char * tableName = strtok( NULL , "/"); /** * this means that the user did not specify schema * and it is a v2x backup */ if(database == NULL) return false; if(schema == NULL) return false; if(tableName==NULL) tableName = schema; char stmtCreateDB[255]; sprintf(stmtCreateDB,"CREATE DATABASE %s", database); ndbout_c("%s", stmtCreateDB); char buf [2048]; create_table_string(tab, tableName, buf); ndbout_c("%s", buf); ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); } return true; } #endif void BackupPrinter::tuple(const TupleS & tup) { Loading ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp +35 −27 Original line number Diff line number Diff line Loading @@ -105,9 +105,8 @@ BackupRestore::~BackupRestore() bool BackupRestore::table(const TableS & table){ if (!m_restore_meta) { return true; } NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); if (dict->createTable(*table.m_dictTable) == -1) { Loading Loading @@ -320,14 +319,15 @@ BackupRestore::tuple_free() if (!m_restore) return; if (m_transactions > 0) { // Send all transactions to NDB if (m_transactions > 0) m_ndb->sendPreparedTransactions(0); // Poll all transactions while (m_transactions > 0) m_ndb->pollNdb(3000, m_transactions); } } void BackupRestore::endOfTuples() Loading Loading @@ -375,12 +375,12 @@ BackupRestore::logEntry(const LogEntry & tup) exit(-1); } for (int i = 0; i < tup.m_values.size(); i++) for (int i = 0; i < tup.size(); i++) { const AttributeS * attr = tup.m_values[i]; const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data->string_value; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size / 8) * arraySize; if (attr->Desc->m_column->getPrimaryKey()) Loading @@ -389,19 +389,27 @@ BackupRestore::logEntry(const LogEntry & tup) op->setValue(attr->Desc->attrId, dataPtr, length); } #if 1 trans->execute(Commit); #else const int ret = trans->execute(Commit); if (ret != 0) { // Both insert update and delete can fail during log running // and it's ok if (ret != 0) // TODO: check that the error is either tuple exists or tuple does not exist? switch(tup.m_type) { case LogEntry::LE_INSERT: break; case LogEntry::LE_UPDATE: break; case LogEntry::LE_DELETE: break; } if (false) { err << "execute failed: " << trans->getNdbError() << endl; exit(-1); } #endif } m_ndb->closeTransaction(trans); m_logCount++; Loading @@ -410,12 +418,12 @@ BackupRestore::logEntry(const LogEntry & tup) void BackupRestore::endOfLogEntrys() { if (m_restore) { if (!m_restore) return; info << "Restored " << m_dataCount << " tuples and " << m_logCount << " log entries" << endl; } } /* * callback : This is called when the transaction is polled Loading Loading @@ -471,7 +479,7 @@ BackupRestore::tuple(const TupleS & tup) const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data->string_value; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size * arraySize) / 8; if (attr->Desc->m_column->getPrimaryKey()) Loading @@ -483,11 +491,11 @@ BackupRestore::tuple(const TupleS & tup) const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data->string_value; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size * arraySize) / 8; if (!attr->Desc->m_column->getPrimaryKey()) if (attr->Data->null) if (attr->Data.null) op->setValue(i, NULL, 0); else op->setValue(i, dataPtr, length); Loading Loading
ndb/src/kernel/blocks/backup/restore/Restore.cpp +116 −133 Original line number Diff line number Diff line Loading @@ -122,7 +122,8 @@ RestoreMetaData::readMetaTableList() { Uint32 sectionInfo[2]; if (fread(§ionInfo, sizeof(sectionInfo), 1, m_file) != 1){ if (buffer_read(§ionInfo, sizeof(sectionInfo), 1) != 1){ err << "readMetaTableList read header error" << endl; return 0; } sectionInfo[0] = ntohl(sectionInfo[0]); Loading @@ -130,11 +131,9 @@ RestoreMetaData::readMetaTableList() { const Uint32 tabCount = sectionInfo[1] - 2; const Uint32 len = 4 * tabCount; if(createBuffer(len) == 0) abort(); if (fread(m_buffer, 1, len, m_file) != len){ void *tmp; if (buffer_get_ptr(&tmp, 4, tabCount) != tabCount){ err << "readMetaTableList read tabCount error" << endl; return 0; } Loading @@ -147,7 +146,7 @@ RestoreMetaData::readMetaTableDesc() { Uint32 sectionInfo[2]; // Read section header if (fread(§ionInfo, sizeof(sectionInfo), 1, m_file) != 1){ if (buffer_read(§ionInfo, sizeof(sectionInfo), 1) != 1){ err << "readMetaTableDesc read header error" << endl; return false; } // if Loading @@ -156,20 +155,15 @@ RestoreMetaData::readMetaTableDesc() { assert(sectionInfo[0] == BackupFormat::TABLE_DESCRIPTION); // Allocate temporary storage for dictTabInfo buffer const Uint32 len = (sectionInfo[1] - 2); if (createBuffer(4 * (len+1)) == NULL) { err << "readMetaTableDesc allocation error" << endl; return false; } // if // Read dictTabInfo buffer if (fread(m_buffer, 4, len, m_file) != len){ const Uint32 len = (sectionInfo[1] - 2); void *ptr; if (buffer_get_ptr(&ptr, 4, len) != len){ err << "readMetaTableDesc read error" << endl; return false; } // if return parseTableDescriptor(m_buffer, len); return parseTableDescriptor((Uint32*)ptr, len); } bool Loading @@ -177,11 +171,10 @@ RestoreMetaData::readGCPEntry() { Uint32 data[4]; BackupFormat::CtlFile::GCPEntry * dst = (BackupFormat::CtlFile::GCPEntry *)&data[0]; if(fread(dst, 4, 4, m_file) != 4){ if(buffer_read(dst, 4, 4) != 4){ err << "readGCPEntry read error" << endl; return false; } Loading Loading @@ -212,6 +205,12 @@ TableS::TableS(NdbTableImpl* tableImpl) createAttr(tableImpl->getColumn(i)); } TableS::~TableS() { for (int i = 0; i < allAttributesDesc.size(); i++) delete allAttributesDesc[i]; } // Parse dictTabInfo buffer and pushback to to vector storage bool RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) Loading Loading @@ -247,31 +246,18 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) // Constructor RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md, void (* _free_data_callback)()) : m_metaData(md), free_data_callback(_free_data_callback) : BackupFile(_free_data_callback), m_metaData(md) { debug << "RestoreDataIterator constructor" << endl; setDataFile(md, 0); m_buffer_sz = 64*1024; m_buffer = malloc(m_buffer_sz); m_buffer_ptr = m_buffer; m_buffer_data_left = 0; } RestoreDataIterator::~RestoreDataIterator() { if (m_buffer) free(m_buffer); } TupleS & TupleS::operator=(const TupleS& tuple) { prepareRecord(*tuple.m_currentTable); if (allAttrData) { allAttrData= new AttributeData[getNoOfAttributes()]; if (allAttrData) memcpy(allAttrData, tuple.allAttrData, getNoOfAttributes()*sizeof(AttributeData)); } return *this; }; Loading @@ -296,12 +282,16 @@ AttributeData * TupleS::getData(int i) const{ bool TupleS::prepareRecord(const TableS & tab){ if (allAttrData) { if (getNoOfAttributes() == tab.getNoOfAttributes()) { m_currentTable = &tab; return true; } delete [] allAttrData; m_currentTable= 0; } allAttrData = new AttributeData[tab.getNoOfAttributes()]; if (allAttrData == 0) return false; Loading @@ -310,47 +300,12 @@ TupleS::prepareRecord(const TableS & tab){ return true; } Uint32 RestoreDataIterator::get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream) { Uint32 sz = size*nmemb; if (sz > m_buffer_data_left) { if (free_data_callback) (*free_data_callback)(); memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left); size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, stream); m_buffer_data_left += r; m_buffer_ptr = m_buffer; if (sz > m_buffer_data_left) sz = size * (m_buffer_data_left / size); } *p_buf_ptr = m_buffer_ptr; m_buffer_ptr = ((char*)m_buffer_ptr)+sz; m_buffer_data_left -= sz; return sz/size; } Uint32 RestoreDataIterator::fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream) { void *buf_ptr; Uint32 r = get_buffer_ptr(&buf_ptr, size, nmemb, stream); memcpy(ptr, buf_ptr, r*size); return r; } const TupleS * RestoreDataIterator::getNextTuple(int & res) { Uint32 dataLength = 0; // Read record length if (fread_buffer(&dataLength, sizeof(dataLength), 1, m_file) != 1){ if (buffer_read(&dataLength, sizeof(dataLength), 1) != 1){ err << "getNextTuple:Error reading length of data part" << endl; res = -1; return NULL; Loading @@ -370,7 +325,7 @@ RestoreDataIterator::getNextTuple(int & res) // Read tuple data void *_buf_ptr; if (get_buffer_ptr(&_buf_ptr, 1, dataLenBytes, m_file) != dataLenBytes) { if (buffer_get_ptr(&_buf_ptr, 1, dataLenBytes) != dataLenBytes) { err << "getNextTuple:Read error: " << endl; res = -1; return NULL; Loading Loading @@ -468,12 +423,17 @@ RestoreDataIterator::getNextTuple(int & res) return &m_tuple; } // RestoreDataIterator::getNextTuple BackupFile::BackupFile(){ BackupFile::BackupFile(void (* _free_data_callback)()) : free_data_callback(_free_data_callback) { m_file = 0; m_path[0] = 0; m_fileName[0] = 0; m_buffer = 0; m_bufferSize = 0; m_buffer_sz = 64*1024; m_buffer = malloc(m_buffer_sz); m_buffer_ptr = m_buffer; m_buffer_data_left = 0; } BackupFile::~BackupFile(){ Loading @@ -494,15 +454,54 @@ BackupFile::openFile(){ return m_file != 0; } Uint32 * BackupFile::createBuffer(Uint32 bytes){ if(bytes > m_bufferSize){ if(m_buffer != 0) free(m_buffer); m_bufferSize = m_bufferSize + 2 * bytes; m_buffer = (Uint32*)malloc(m_bufferSize); Uint32 BackupFile::buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb) { Uint32 sz = size*nmemb; if (sz > m_buffer_data_left) { if (free_data_callback) (*free_data_callback)(); memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left); size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, m_file); m_buffer_data_left += r; m_buffer_ptr = m_buffer; if (sz > m_buffer_data_left) sz = size * (m_buffer_data_left / size); } *p_buf_ptr = m_buffer_ptr; return sz/size; } Uint32 BackupFile::buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb) { Uint32 r = buffer_get_ptr_ahead(p_buf_ptr, size, nmemb); m_buffer_ptr = ((char*)m_buffer_ptr)+(r*size); m_buffer_data_left -= (r*size); return r; } Uint32 BackupFile::buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb) { void *buf_ptr; Uint32 r = buffer_get_ptr_ahead(&buf_ptr, size, nmemb); memcpy(ptr, buf_ptr, r*size); return r; } return m_buffer; Uint32 BackupFile::buffer_read(void *ptr, Uint32 size, Uint32 nmemb) { void *buf_ptr; Uint32 r = buffer_get_ptr(&buf_ptr, size, nmemb); memcpy(ptr, buf_ptr, r*size); return r; } void Loading Loading @@ -563,7 +562,7 @@ BackupFile::readHeader(){ return false; } if(fread(&m_fileHeader, sizeof(m_fileHeader), 1, m_file) != 1){ if(buffer_read(&m_fileHeader, sizeof(m_fileHeader), 1) != 1){ err << "readDataFileHeader: Error reading header" << endl; return false; } Loading Loading @@ -611,14 +610,13 @@ BackupFile::validateFooter(){ return true; } bool RestoreDataIterator::readFragmentHeader(int & ret) bool RestoreDataIterator::readFragmentHeader(int & ret) { BackupFormat::DataFile::FragmentHeader Header; debug << "RestoreDataIterator::getNextFragment" << endl; if (fread_buffer(&Header, sizeof(Header), 1, m_file) != 1){ if (buffer_read(&Header, sizeof(Header), 1) != 1){ ret = 0; return false; } // if Loading Loading @@ -663,7 +661,7 @@ bool RestoreDataIterator::validateFragmentFooter() { BackupFormat::DataFile::FragmentFooter footer; if (fread_buffer(&footer, sizeof(footer), 1, m_file) != 1){ if (buffer_read(&footer, sizeof(footer), 1) != 1){ err << "getFragmentFooter:Error reading fragment footer" << endl; return false; } Loading Loading @@ -776,36 +774,23 @@ RestoreLogIterator::getNextLogEntry(int & res) { Uint32 len= ~0; const Uint32 stopGCP = m_metaData.getStopGCP(); do { if(createBuffer(4) == 0) { if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){ res= -1; return NULL; return 0; } len= ntohl(len); if (fread(m_buffer, sizeof(Uint32), 1, m_file) != 1){ res = -1; return NULL; Uint32 data_len = sizeof(Uint32) + len*4; if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) { res= -2; return 0; } m_buffer[0] = ntohl(m_buffer[0]); len = m_buffer[0]; if(len == 0){ res= 0; return 0; } if(createBuffer(4 * (len + 1)) == 0){ res = -1; return NULL; } if (fread(&m_buffer[1], 4, len, m_file) != len) { res = -1; return NULL; } logE = (LogE *)&m_buffer[0]; logE->TableId= ntohl(logE->TableId); logE->TriggerEvent= ntohl(logE->TriggerEvent); Loading @@ -818,9 +803,6 @@ RestoreLogIterator::getNextLogEntry(int & res) { } } while(gcp > stopGCP + 1); for(int i=0; i<m_logEntry.m_values.size();i++) delete m_logEntry.m_values[i]; m_logEntry.m_values.clear(); m_logEntry.m_table = m_metaData.getTable(logE->TableId); switch(logE->TriggerEvent){ case TriggerEvent::TE_INSERT: Loading @@ -838,31 +820,32 @@ RestoreLogIterator::getNextLogEntry(int & res) { } const TableS * tab = m_logEntry.m_table; m_logEntry.clear(); AttributeHeader * ah = (AttributeHeader *)&logE->Data[0]; AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 2]; AttributeS * attr; while(ah < end){ attr = new AttributeS; attr= m_logEntry.add_attr(); if(attr == NULL) { ndbout_c("Restore: Failed to allocate memory"); res = -1; return NULL; return 0; } attr->Desc = (* tab)[ah->getAttributeId()]; assert(attr->Desc != 0); const Uint32 sz = ah->getDataSize(); if(sz == 0){ attr->Data->null = true; attr->Data->void_value = NULL; attr->Data.null = true; attr->Data.void_value = NULL; } else { attr->Data->null = false; attr->Data->void_value = ah->getDataPtr(); attr->Data.null = false; attr->Data.void_value = ah->getDataPtr(); } Twiddle(attr->Desc, attr->Data); m_logEntry.m_values.push_back(attr); Twiddle(attr->Desc, &(attr->Data)); ah = ah->getNext(); } Loading @@ -874,7 +857,7 @@ RestoreLogIterator::getNextLogEntry(int & res) { NdbOut & operator<<(NdbOut& ndbout, const AttributeS& attr){ const AttributeData & data = *(attr.Data); const AttributeData & data = attr.Data; const AttributeDesc & desc = *(attr.Desc); if (data.null) Loading @@ -899,7 +882,7 @@ operator<<(NdbOut& ndbout, const TupleS& tuple) { AttributeData * attr_data = tuple.getData(i); const AttributeDesc * attr_desc = tuple.getDesc(i); const AttributeS attr = {attr_desc, attr_data}; const AttributeS attr = {attr_desc, *attr_data}; debug << i << " " << attr_desc->m_column->getName(); ndbout << attr; Loading Loading @@ -928,12 +911,12 @@ operator<<(NdbOut& ndbout, const LogEntry& logE) ndbout << "Unknown log entry type (not insert, delete or update)" ; } for (int i = 0; i < logE.m_values.size();i++) for (int i = 0; i < logE.size();i++) { const AttributeS * attr = logE.m_values[i]; const AttributeS * attr = logE[i]; ndbout << attr->Desc->m_column->getName() << "="; ndbout << (* attr); if (i < (logE.m_values.size() - 1)) if (i < (logE.size() - 1)) ndbout << ", "; } return ndbout; Loading
ndb/src/kernel/blocks/backup/restore/Restore.hpp +50 −27 Original line number Diff line number Diff line Loading @@ -85,7 +85,7 @@ public: struct AttributeS { const AttributeDesc * Desc; AttributeData * Data; AttributeData Data; }; class TupleS { Loading @@ -97,7 +97,10 @@ private: bool prepareRecord(const TableS &); public: TupleS() {}; TupleS() { m_currentTable= 0; allAttrData= 0; }; ~TupleS() { if (allAttrData) Loading Loading @@ -129,17 +132,13 @@ class TableS { Uint32 m_nullBitmaskSize; int pos; char create_string[2048]; /* char mysqlTableName[1024]; char mysqlDatabaseName[1024]; */ void createAttr(NdbDictionary::Column *column); public: class NdbDictionary::Table* m_dictTable; TableS (class NdbTableImpl* dictTable); ~TableS(); Uint32 getTableId() const { return m_dictTable->getTableId(); Loading Loading @@ -192,18 +191,26 @@ protected: BackupFormat::FileHeader m_expectedFileHeader; Uint32 m_nodeId; Uint32 * m_buffer; Uint32 m_bufferSize; Uint32 * createBuffer(Uint32 bytes); void * m_buffer; void * m_buffer_ptr; Uint32 m_buffer_sz; Uint32 m_buffer_data_left; void (* free_data_callback)(); bool openFile(); void setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path); void setDataFile(const BackupFile & bf, Uint32 no); void setLogFile(const BackupFile & bf, Uint32 no); Uint32 buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb); Uint32 buffer_read(void *ptr, Uint32 size, Uint32 nmemb); Uint32 buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb); Uint32 buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb); void setName(const char * path, const char * name); BackupFile(); BackupFile(void (* free_data_callback)() = 0); ~BackupFile(); public: bool readHeader(); Loading Loading @@ -231,14 +238,11 @@ class RestoreMetaData : public BackupFile { bool parseTableDescriptor(const Uint32 * data, Uint32 len); public: RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo); ~RestoreMetaData(); virtual ~RestoreMetaData(); int loadContent(); Uint32 getNoOfTables() const { return allTables.size();} const TableS * operator[](int i) const { return allTables[i];} Loading @@ -254,24 +258,16 @@ class RestoreDataIterator : public BackupFile { const TableS* m_currentTable; TupleS m_tuple; void * m_buffer; void * m_buffer_ptr; Uint32 m_buffer_sz; Uint32 m_buffer_data_left; void (* free_data_callback)(); public: // Constructor RestoreDataIterator(const RestoreMetaData &, void (* free_data_callback)()); ~RestoreDataIterator(); ~RestoreDataIterator() {}; // Read data file fragment header bool readFragmentHeader(int & res); bool validateFragmentFooter(); Uint32 get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream); Uint32 fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream); const TupleS *getNextTuple(int & res); }; Loading @@ -285,8 +281,34 @@ public: EntryType m_type; const TableS * m_table; myVector<AttributeS*> m_values; myVector<AttributeS*> m_values_e; AttributeS *add_attr() { AttributeS * attr; if (m_values_e.size() > 0) { attr = m_values_e[m_values_e.size()-1]; m_values_e.pop_back(); } else { attr = new AttributeS; } m_values.push_back(attr); return attr; } void clear() { for(int i= 0; i < m_values.size(); i++) m_values_e.push_back(m_values[i]); m_values.clear(); } ~LogEntry() { for(int i= 0; i< m_values.size(); i++) delete m_values[i]; for(int i= 0; i< m_values_e.size(); i++) delete m_values_e[i]; } int size() const { return m_values.size(); } const AttributeS * operator[](int i) const { return m_values[i];} }; class RestoreLogIterator : public BackupFile { Loading @@ -297,6 +319,7 @@ private: LogEntry m_logEntry; public: RestoreLogIterator(const RestoreMetaData &); virtual ~RestoreLogIterator() {}; const LogEntry * getNextLogEntry(int & res); }; Loading
ndb/src/kernel/blocks/backup/restore/consumer.hpp +1 −7 Original line number Diff line number Diff line Loading @@ -21,20 +21,14 @@ class BackupConsumer { public: virtual ~BackupConsumer() { } virtual bool init() { return true;} virtual bool table(const TableS &){return true;} #ifdef USE_MYSQL virtual bool table(const TableS &, MYSQL* mysqlp) {return true;}; #endif virtual void tuple(const TupleS &){} virtual void tuple_free(){} virtual void endOfTuples(){} virtual void logEntry(const LogEntry &){} virtual void endOfLogEntrys(){} protected: #ifdef USE_MYSQL int create_table_string(const TableS & table, char * ,char *); #endif }; #endif
ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp +1 −42 Original line number Diff line number Diff line Loading @@ -27,47 +27,6 @@ BackupPrinter::table(const TableS & tab) return true; } #ifdef USE_MYSQL bool BackupPrinter::table(const TableS & tab, MYSQL * mysql) { if (m_print || m_print_meta) { char tmpTabName[MAX_TAB_NAME_SIZE*2]; sprintf(tmpTabName, "%s", tab.getTableName()); char * database = strtok(tmpTabName, "/"); char * schema = strtok( NULL , "/"); char * tableName = strtok( NULL , "/"); /** * this means that the user did not specify schema * and it is a v2x backup */ if(database == NULL) return false; if(schema == NULL) return false; if(tableName==NULL) tableName = schema; char stmtCreateDB[255]; sprintf(stmtCreateDB,"CREATE DATABASE %s", database); ndbout_c("%s", stmtCreateDB); char buf [2048]; create_table_string(tab, tableName, buf); ndbout_c("%s", buf); ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); } return true; } #endif void BackupPrinter::tuple(const TupleS & tup) { Loading
ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp +35 −27 Original line number Diff line number Diff line Loading @@ -105,9 +105,8 @@ BackupRestore::~BackupRestore() bool BackupRestore::table(const TableS & table){ if (!m_restore_meta) { return true; } NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); if (dict->createTable(*table.m_dictTable) == -1) { Loading Loading @@ -320,14 +319,15 @@ BackupRestore::tuple_free() if (!m_restore) return; if (m_transactions > 0) { // Send all transactions to NDB if (m_transactions > 0) m_ndb->sendPreparedTransactions(0); // Poll all transactions while (m_transactions > 0) m_ndb->pollNdb(3000, m_transactions); } } void BackupRestore::endOfTuples() Loading Loading @@ -375,12 +375,12 @@ BackupRestore::logEntry(const LogEntry & tup) exit(-1); } for (int i = 0; i < tup.m_values.size(); i++) for (int i = 0; i < tup.size(); i++) { const AttributeS * attr = tup.m_values[i]; const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data->string_value; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size / 8) * arraySize; if (attr->Desc->m_column->getPrimaryKey()) Loading @@ -389,19 +389,27 @@ BackupRestore::logEntry(const LogEntry & tup) op->setValue(attr->Desc->attrId, dataPtr, length); } #if 1 trans->execute(Commit); #else const int ret = trans->execute(Commit); if (ret != 0) { // Both insert update and delete can fail during log running // and it's ok if (ret != 0) // TODO: check that the error is either tuple exists or tuple does not exist? switch(tup.m_type) { case LogEntry::LE_INSERT: break; case LogEntry::LE_UPDATE: break; case LogEntry::LE_DELETE: break; } if (false) { err << "execute failed: " << trans->getNdbError() << endl; exit(-1); } #endif } m_ndb->closeTransaction(trans); m_logCount++; Loading @@ -410,12 +418,12 @@ BackupRestore::logEntry(const LogEntry & tup) void BackupRestore::endOfLogEntrys() { if (m_restore) { if (!m_restore) return; info << "Restored " << m_dataCount << " tuples and " << m_logCount << " log entries" << endl; } } /* * callback : This is called when the transaction is polled Loading Loading @@ -471,7 +479,7 @@ BackupRestore::tuple(const TupleS & tup) const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data->string_value; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size * arraySize) / 8; if (attr->Desc->m_column->getPrimaryKey()) Loading @@ -483,11 +491,11 @@ BackupRestore::tuple(const TupleS & tup) const AttributeS * attr = tup[i]; int size = attr->Desc->size; int arraySize = attr->Desc->arraySize; const char * dataPtr = attr->Data->string_value; const char * dataPtr = attr->Data.string_value; const Uint32 length = (size * arraySize) / 8; if (!attr->Desc->m_column->getPrimaryKey()) if (attr->Data->null) if (attr->Data.null) op->setValue(i, NULL, 0); else op->setValue(i, dataPtr, length); Loading