Commit 48e56f47 authored by unknown's avatar unknown
Browse files

Restored old shared memory buffer implementation (used by SCI and SHM).

Improved Default SCI config params
Added missing SCI libraries in ndb_mgm and atrt
Added max of 1024 signals per receive on transporter (to improve
real-time bahaviour and to ensure no job buffer explosion, still
some more work left on avoiding job buffer explosion in the general
case)


ndb/src/common/transporter/Packer.cpp:
  Fix for job buffer explosion and real-time behaviour also in
  high load scenarios.
ndb/src/common/transporter/SCI_Transporter.cpp:
  Restored old Shared memory buffer implementation.
  Changed condition slightly on when to send SCI buffer.
ndb/src/common/transporter/SCI_Transporter.hpp:
  Changed back to old shared memory implementation
ndb/src/common/transporter/SHM_Buffer.hpp:
  Changed back to old shared memory implementation
ndb/src/common/transporter/SHM_Transporter.cpp:
  Changed back to old shared memory implementation
ndb/src/common/transporter/SHM_Transporter.hpp:
  Changed back to old shared memory implementation
ndb/src/common/transporter/TransporterRegistry.cpp:
  Changed back to old shared memory implementation
ndb/src/kernel/vm/FastScheduler.hpp:
  Spelling error
ndb/src/mgmclient/Makefile.am:
  Missing SCI library
ndb/src/mgmsrv/ConfigInfo.cpp:
  Changed to more proper config parameters
ndb/test/run-test/Makefile.am:
  Added missing SCI library
parent a562315e
Loading
Loading
Loading
Loading
+14 −7
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@
#include <TransporterCallback.hpp>
#include <RefConvert.hpp>

#define MAX_RECEIVED_SIGNALS 1024
Uint32
TransporterRegistry::unpack(Uint32 * readPtr,
			    Uint32 sizeOfData,
@@ -30,12 +31,15 @@ TransporterRegistry::unpack(Uint32 * readPtr,
  LinearSectionPtr ptr[3];
  
  Uint32 usedData   = 0;
  Uint32 loop_count = 0; 
 
  if(state == NoHalt || state == HaltOutput){
    while(sizeOfData >= 4 + sizeof(Protocol6)){
    while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
           (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      loop_count++;
      
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
@@ -112,10 +116,12 @@ TransporterRegistry::unpack(Uint32 * readPtr,
  } else {
    /** state = HaltIO || state == HaltInput */

    while(sizeOfData >= 4 + sizeof(Protocol6)){
    while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
           (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      loop_count++;
      
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
@@ -208,12 +214,13 @@ TransporterRegistry::unpack(Uint32 * readPtr,
			    IOState state) {
  static SignalHeader signalHeader;
  static LinearSectionPtr ptr[3];
  Uint32 loop_count = 0;
  if(state == NoHalt || state == HaltOutput){
    while(readPtr < eodPtr){
    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      
      loop_count++; 
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
	//Do funky stuff
@@ -280,11 +287,11 @@ TransporterRegistry::unpack(Uint32 * readPtr,
  } else {
    /** state = HaltIO || state == HaltInput */

    while(readPtr < eodPtr){
    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      
      loop_count++; 
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
	//Do funky stuff
+2 −11
Original line number Diff line number Diff line
@@ -530,7 +530,6 @@ void SCI_Transporter::setupLocalSegment()
   Uint32 * localReadIndex =  
     (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;  
   Uint32 * localWriteIndex =  (Uint32*)(localReadIndex+ 1); 
   Uint32 * localEndWriteIndex = (Uint32*)(localReadIndex + 2); 
   m_localStatusFlag = (Uint32*)(localReadIndex + 3); 
 
   char * localStartOfBuf = (char*)  
@@ -538,7 +537,6 @@ void SCI_Transporter::setupLocalSegment()
 
   * localReadIndex = 0; 
   * localWriteIndex = 0; 
   * localEndWriteIndex = 0;

   const Uint32 slack = MAX_MESSAGE_SIZE;

@@ -546,7 +544,6 @@ void SCI_Transporter::setupLocalSegment()
			   sizeOfBuffer, 
			   slack,
			   localReadIndex, 
			   localEndWriteIndex, 
			   localWriteIndex);
    
   reader->clear(); 
@@ -570,7 +567,6 @@ void SCI_Transporter::setupRemoteSegment()
    
   Uint32 * remoteReadIndex = (Uint32*)segPtr;  
   Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1); 
   Uint32 * remoteEndWriteIndex = (Uint32*) (segPtr + 2); 
   m_remoteStatusFlag = (Uint32*)(segPtr + 3);
    
   char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize)); 
@@ -579,7 +575,6 @@ void SCI_Transporter::setupRemoteSegment()
			   sizeOfBuffer, 
			   slack,
			   remoteReadIndex, 
			   remoteEndWriteIndex, 
			   remoteWriteIndex);
   
   writer->clear(); 
@@ -598,7 +593,6 @@ void SCI_Transporter::setupRemoteSegment()
    
     Uint32 * remoteReadIndex2 = (Uint32*)segPtr;  
     Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1); 
     Uint32 * remoteEndWriteIndex2 = (Uint32*) (segPtr + 2); 
     m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
    
     char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize); 
@@ -613,12 +607,10 @@ void SCI_Transporter::setupRemoteSegment()
                              sizeOfBuffer, 
                              slack,
                              remoteReadIndex2, 
                              remoteEndWriteIndex2, 
                              remoteWriteIndex2);

     * remoteReadIndex = 0; 
     * remoteWriteIndex = 0; 
     * remoteEndWriteIndex = 0; 
     writer2->clear(); 
     m_TargetSegm[1].writer=writer2; 
     if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { 
@@ -918,14 +910,13 @@ SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
  Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
  Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
  Uint32 new_curr_data_size = curr_data_size + lenBytes;
  if ((new_curr_data_size >= send_buf_size) ||
  if ((curr_data_size >= send_buf_size) ||
      (curr_data_size >= sci_buffer_remaining)) {
    /**
     * The new message will not fit in the send buffer. We need to
     * send the send buffer before filling it up with the new
     * signal data. If current data size will spill over buffer edge
     * we will also send to avoid writing larger than possible in
     * buffer.
     * we will also send to ensure correct operation.
     */  
    if (!doSend()) { 
      /**
+4 −5
Original line number Diff line number Diff line
@@ -297,13 +297,12 @@ private:
   */ 
  bool sendIsPossible(struct timeval * timeout); 
   

  void getReceivePtr(Uint32 ** ptr, Uint32 &size){
    size = reader->getReadPtr(* ptr);
  void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
    reader->getReadPtr(* ptr, * eod);
  }

  void updateReceivePtr(Uint32 size){
    reader->updateReadPtr(size);
  void updateReceivePtr(Uint32 *ptr){
    reader->updateReadPtr(ptr);
  }
 
  /** 
+8 −18
Original line number Diff line number Diff line
@@ -42,13 +42,11 @@ public:
	     Uint32 _sizeOfBuffer,
	     Uint32 _slack,
	     Uint32 * _readIndex,
             Uint32 * _endWriteIndex,
	     Uint32 * _writeIndex) :
    m_startOfBuffer(_startOfBuffer),
    m_totalBufferSize(_sizeOfBuffer),
    m_bufferSize(_sizeOfBuffer - _slack),
    m_sharedReadIndex(_readIndex),
    m_sharedEndWriteIndex(_endWriteIndex),
    m_sharedWriteIndex(_writeIndex)
  {
  }
@@ -68,12 +66,12 @@ public:
   *  returns ptr - where to start reading
   *           sz - how much can I read
   */
  inline Uint32 getReadPtr(Uint32 * & ptr);
  inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);

  /**
   * Update read ptr
   */
  inline void updateReadPtr(Uint32 size);
  inline void updateReadPtr(Uint32 *ptr);
  
private:
  char * const m_startOfBuffer;
@@ -82,7 +80,6 @@ private:
  Uint32 m_readIndex;

  Uint32 * m_sharedReadIndex;
  Uint32 * m_sharedEndWriteIndex;
  Uint32 * m_sharedWriteIndex;
};

@@ -100,22 +97,19 @@ SHM_Reader::empty() const{
 *           sz - how much can I read
 */
inline 
Uint32
SHM_Reader::getReadPtr(Uint32 * & ptr)
void
SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod)
{
  Uint32 *eod;  
  Uint32 tReadIndex  = m_readIndex;
  Uint32 tWriteIndex = * m_sharedWriteIndex;
  Uint32 tEndWriteIndex = * m_sharedEndWriteIndex;
  
  ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
  
  if(tReadIndex <= tWriteIndex){
    eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
  } else {
    eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex];
    eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
  }
  return (Uint32)((char*)eod - (char*)ptr); 
}

/**
@@ -123,10 +117,10 @@ SHM_Reader::getReadPtr(Uint32 * & ptr)
 */
inline
void 
SHM_Reader::updateReadPtr(Uint32 size)
SHM_Reader::updateReadPtr(Uint32 *ptr)
{
  Uint32 tReadIndex = m_readIndex;
  tReadIndex += size;
  Uint32 tReadIndex = ((char*)ptr) - m_startOfBuffer;

  assert(tReadIndex < m_totalBufferSize);

  if(tReadIndex >= m_bufferSize){
@@ -145,13 +139,11 @@ public:
	     Uint32 _sizeOfBuffer,
	     Uint32 _slack,
	     Uint32 * _readIndex,
	     Uint32 * _endWriteIndex,
	     Uint32 * _writeIndex) :
    m_startOfBuffer(_startOfBuffer),
    m_totalBufferSize(_sizeOfBuffer),
    m_bufferSize(_sizeOfBuffer - _slack),
    m_sharedReadIndex(_readIndex),
    m_sharedEndWriteIndex(_endWriteIndex),
    m_sharedWriteIndex(_writeIndex)
  {
  }
@@ -176,7 +168,6 @@ private:
  Uint32 m_writeIndex;
  
  Uint32 * m_sharedReadIndex;
  Uint32 * m_sharedEndWriteIndex;
  Uint32 * m_sharedWriteIndex;
};

@@ -215,7 +206,6 @@ SHM_Writer::updateWritePtr(Uint32 sz){
  assert(tWriteIndex < m_totalBufferSize);

  if(tWriteIndex >= m_bufferSize){
    * m_sharedEndWriteIndex = tWriteIndex;
    tWriteIndex = 0;
  }

+0 −9
Original line number Diff line number Diff line
@@ -82,14 +82,12 @@ SHM_Transporter::setupBuffers(){

  Uint32 * sharedReadIndex1 = base1;
  Uint32 * sharedWriteIndex1 = base1 + 1;
  Uint32 * sharedEndWriteIndex1 = base1 + 2;
  serverStatusFlag = base1 + 4;
  char * startOfBuf1 = shmBuf+sharedSize;

  Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
  Uint32 * sharedReadIndex2 = base2;
  Uint32 * sharedWriteIndex2 = base2 + 1;
  Uint32 * sharedEndWriteIndex2 = base2 + 2;
  clientStatusFlag = base2 + 4;
  char * startOfBuf2 = ((char *)base2)+sharedSize;
  
@@ -99,23 +97,19 @@ SHM_Transporter::setupBuffers(){
			    sizeOfBuffer,
			    slack,
			    sharedReadIndex1,
			    sharedEndWriteIndex1,
			    sharedWriteIndex1);

    writer = new SHM_Writer(startOfBuf2, 
			    sizeOfBuffer,
			    slack,
			    sharedReadIndex2,
			    sharedEndWriteIndex2,
			    sharedWriteIndex2);

    * sharedReadIndex1 = 0;
    * sharedWriteIndex1 = 0;
    * sharedEndWriteIndex1 = 0;

    * sharedReadIndex2 = 0;
    * sharedWriteIndex2 = 0;
    * sharedEndWriteIndex2 = 0;
    
    reader->clear();
    writer->clear();
@@ -148,19 +142,16 @@ SHM_Transporter::setupBuffers(){
			    sizeOfBuffer,
			    slack,
			    sharedReadIndex2,
			    sharedEndWriteIndex2,
			    sharedWriteIndex2);
    
    writer = new SHM_Writer(startOfBuf1, 
			    sizeOfBuffer,
			    slack,
			    sharedReadIndex1,
			    sharedEndWriteIndex1,
			    sharedWriteIndex1);
    
    * sharedReadIndex2 = 0;
    * sharedWriteIndex1 = 0;
    * sharedEndWriteIndex1 = 0;
    
    reader->clear();
    writer->clear();
Loading