ParallelComm.cpp

00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #include <stdlib.h>
00010 #include <stdexcept>
00011 #include <sstream>
00012 #include <vector>
00013 
00014 #include <stk_util/parallel/ParallelComm.hpp>
00015 #include <stk_util/parallel/ParallelReduce.hpp>
00016 
00017 namespace stk {
00018 
00019 //-----------------------------------------------------------------------
00020 
00021 #if defined( STK_HAS_MPI )
00022 
00023 enum { STK_MPI_TAG_SIZING = 0 , STK_MPI_TAG_DATA = 1 };
00024 
00025 // Communicate in sparse or dense mode, as directed during allocation
00026 
00027 namespace {
00028 
00029 bool all_to_all_dense( ParallelMachine p_comm ,
00030                        const CommBuffer * const send ,
00031                        const CommBuffer * const recv ,
00032                        std::ostream & msg )
00033 {
00034   typedef unsigned char * ucharp ;
00035 
00036   static const char method[] = "stk::CommAll::communicate" ;
00037 
00038   int result ;
00039 
00040   {
00041     const unsigned p_size = parallel_machine_size( p_comm );
00042 
00043     std::vector<int> tmp( p_size * 4 );
00044 
00045     int * const send_counts = & tmp[0] ;
00046     int * const send_displs = send_counts + p_size ;
00047     int * const recv_counts = send_displs + p_size ;
00048     int * const recv_displs = recv_counts + p_size ;
00049 
00050     unsigned char * const ps = static_cast<ucharp>(send[0].buffer());
00051     unsigned char * const pr = static_cast<ucharp>(recv[0].buffer());
00052 
00053     for ( unsigned i = 0 ; i < p_size ; ++i ) {
00054       const CommBuffer & send_buf = send[i] ;
00055       const CommBuffer & recv_buf = recv[i] ;
00056 
00057       send_counts[i] = send_buf.capacity();
00058       recv_counts[i] = recv_buf.capacity();
00059 
00060       send_displs[i] = static_cast<ucharp>(send_buf.buffer()) - ps ;
00061       recv_displs[i] = static_cast<ucharp>(recv_buf.buffer()) - pr ;
00062     }
00063 
00064     result = MPI_Alltoallv( ps , send_counts , send_displs , MPI_BYTE ,
00065                             pr , recv_counts , recv_displs , MPI_BYTE ,
00066                             p_comm );
00067 
00068     if ( MPI_SUCCESS != result ) {
00069       msg << method << " GLOBAL ERROR: " << result << " == MPI_Alltoallv" ;
00070     }
00071   }
00072 
00073   return MPI_SUCCESS == result ;
00074 }
00075 
00076 bool all_to_all_sparse( ParallelMachine p_comm ,
00077                         const CommBuffer * const send ,
00078                         const CommBuffer * const recv ,
00079                         std::ostream & msg )
00080 {
00081   static const char method[] = "stk::CommAll::communicate" ;
00082   static const int mpi_tag = STK_MPI_TAG_DATA ;
00083 
00084   int result = MPI_SUCCESS ;
00085 
00086   {
00087     const unsigned p_size = parallel_machine_size( p_comm );
00088     const unsigned p_rank = parallel_machine_rank( p_comm );
00089 
00090     //------------------------------
00091     // Receive count
00092 
00093     unsigned num_recv = 0 ;
00094 
00095     for ( unsigned i = 0 ; i < p_size ; ++i ) {
00096       if ( recv[i].capacity() ) { ++num_recv ; }
00097     }
00098 
00099     //------------------------------
00100     // Post receives for specific processors with specific sizes
00101 
00102     MPI_Request request_null = MPI_REQUEST_NULL ;
00103     std::vector<MPI_Request> request( num_recv , request_null );
00104     std::vector<MPI_Status>  status(  num_recv );
00105 
00106     unsigned count = 0 ;
00107 
00108     for ( unsigned i = 0 ; result == MPI_SUCCESS && i < p_size ; ++i ) {
00109       const unsigned recv_size = recv[i].capacity();
00110       void * const   recv_buf  = recv[i].buffer();
00111       if ( recv_size ) {
00112         result = MPI_Irecv( recv_buf , recv_size , MPI_BYTE ,
00113                             i , mpi_tag , p_comm , & request[count] );
00114         ++count ;
00115       }
00116     }
00117 
00118     if ( MPI_SUCCESS != result ) {
00119       msg << method << " LOCAL[" << p_rank << "] ERROR: "
00120           << result << " == MPI_Irecv , " ;
00121     }
00122 
00123     //------------------------------
00124     // Sync to allow ready sends and for a potential error
00125 
00126     int local_error = MPI_SUCCESS == result ? 0 : 1 ;
00127     int global_error = 0 ;
00128 
00129     result = MPI_Allreduce( & local_error , & global_error ,
00130                             1 , MPI_INT , MPI_SUM , p_comm );
00131 
00132     if ( MPI_SUCCESS != result ) {
00133       msg << method << " GLOBAL ERROR: " << result << " == MPI_Allreduce" ;
00134     }
00135     else if ( global_error ) {
00136       result = MPI_ERR_UNKNOWN ;
00137     }
00138     else {
00139       // Everything is local from here on out, no more syncs
00140 
00141       //------------------------------
00142       // Ready-send the buffers, rotate the send processor
00143       // in a simple attempt to smooth out the communication traffic.
00144 
00145       for ( unsigned i = 0 ; MPI_SUCCESS == result && i < p_size ; ++i ) {
00146         const int dst = ( i + p_rank ) % p_size ;
00147         const unsigned send_size = send[dst].capacity();
00148         void * const   send_buf  = send[dst].buffer();
00149         if ( send_size ) {
00150           result = MPI_Rsend( send_buf , send_size , MPI_BYTE ,
00151                               dst , mpi_tag , p_comm );
00152         }
00153       }
00154 
00155       if ( MPI_SUCCESS != result ) {
00156         msg << method << " LOCAL ERROR: " << result << " == MPI_Rsend , " ;
00157       }
00158       else {
00159         MPI_Request * const p_request = & request[0] ;
00160         MPI_Status  * const p_status  = & status[0] ;
00161 
00162         result = MPI_Waitall( num_recv , p_request , p_status );
00163       }
00164 
00165       if ( MPI_SUCCESS != result ) {
00166         msg << method << " LOCAL[" << p_rank << "] ERROR: "
00167             << result << " == MPI_Waitall , " ;
00168       }
00169       else {
00170 
00171         for ( unsigned i = 0 ; i < num_recv ; ++i ) {
00172           MPI_Status * const recv_status = & status[i] ;
00173           const int recv_proc = recv_status->MPI_SOURCE ;
00174           const int recv_tag  = recv_status->MPI_TAG ;
00175           const int recv_plan = recv[recv_proc].capacity();
00176           int recv_count = 0 ;
00177 
00178           MPI_Get_count( recv_status , MPI_BYTE , & recv_count );
00179 
00180           if ( recv_tag != mpi_tag || recv_count != recv_plan ) {
00181             msg << method << " LOCAL[" << p_rank << "] ERROR: Recv["
00182                 << recv_proc << "] Size( "
00183                 << recv_count << " != " << recv_plan << " ) , " ;
00184             result = MPI_ERR_UNKNOWN ;
00185           }
00186         }
00187       }
00188     }
00189   }
00190 
00191   return MPI_SUCCESS == result ;
00192 }
00193 
00194 }
00195 
00196 #else
00197 
00198 // Not parallel
00199 
00200 namespace {
00201 
00202 bool all_to_all_dense( ParallelMachine ,
00203                        const CommBuffer * const send ,
00204                        const CommBuffer * const recv ,
00205                        std::ostream & )
00206 { return send == recv ; }
00207 
00208 bool all_to_all_sparse( ParallelMachine ,
00209                         const CommBuffer * const send ,
00210                         const CommBuffer * const recv ,
00211                         std::ostream & )
00212 { return send == recv ; }
00213 
00214 }
00215 
00216 #endif
00217 
00218 //----------------------------------------------------------------------
00219 
00220 namespace {
00221 
00222 inline
00223 size_t align_quad( size_t n )
00224 {
00225   enum { Size = 4 * sizeof(int) };
00226   return n + CommBufferAlign<Size>::align(n);
00227 }
00228 
00229 }
00230 
00231 //----------------------------------------------------------------------
00232 
00233 void CommBuffer::pack_overflow() const
00234 {
00235   std::ostringstream os ;
00236   os << "stk::CommBuffer::pack<T>(...){ overflow by " ;
00237   os << remaining() ;
00238   os << " bytes. }" ;
00239   throw std::overflow_error( os.str() );
00240 }
00241 
00242 void CommBuffer::unpack_overflow() const
00243 {
00244   std::ostringstream os ;
00245   os << "stk::CommBuffer::unpack<T>(...){ overflow by " ;
00246   os << remaining();
00247   os << " bytes. }" ;
00248   throw std::overflow_error( os.str() );
00249 }
00250 
00251 void CommAll::rank_error( const char * method , unsigned p ) const
00252 {
00253   std::ostringstream os ;
00254   os << "stk::CommAll::" << method
00255      << "(" << p << ") ERROR: Not in [0:" << m_size << ")" ;
00256   throw std::range_error( os.str() );
00257 }
00258 
00259 //----------------------------------------------------------------------
00260 
00261 CommBuffer::CommBuffer()
00262   : m_beg(NULL), m_ptr(NULL), m_end(NULL)
00263 { }
00264 
00265 CommBuffer::~CommBuffer()
00266 { }
00267 
00268 void CommBuffer::deallocate( const unsigned number , CommBuffer * buffers )
00269 {
00270   if ( NULL != buffers ) {
00271     for ( unsigned i = 0 ; i < number ; ++i ) {
00272       ( buffers + i )->~CommBuffer();
00273     }
00274     free( buffers );
00275   }
00276 }
00277 
00278 CommBuffer * CommBuffer::allocate(
00279   const unsigned number , const unsigned * const size )
00280 {
00281   const size_t n_base = align_quad( number * sizeof(CommBuffer) );
00282   size_t n_size = n_base ;
00283 
00284   if ( NULL != size ) {
00285     for ( unsigned i = 0 ; i < number ; ++i ) {
00286       n_size += align_quad( size[i] );
00287     }
00288   }
00289 
00290   // Allocate space for buffers
00291 
00292   void * const p_malloc = malloc( n_size );
00293 
00294   CommBuffer * const b_base =
00295     p_malloc != NULL ? reinterpret_cast<CommBuffer*>(p_malloc)
00296                      : reinterpret_cast<CommBuffer*>( NULL );
00297 
00298   if ( p_malloc != NULL ) {
00299 
00300     for ( unsigned i = 0 ; i < number ; ++i ) {
00301       new( b_base + i ) CommBuffer();
00302     }
00303 
00304     if ( NULL != size ) {
00305 
00306       ucharp ptr = reinterpret_cast<ucharp>( p_malloc );
00307 
00308       ptr += n_base ;
00309 
00310       for ( unsigned i = 0 ; i < number ; ++i ) {
00311         CommBuffer & b = b_base[i] ;
00312         b.m_beg = ptr ;
00313         b.m_ptr = ptr ;
00314         b.m_end = ptr + size[i] ;
00315         ptr += align_quad( size[i] );
00316       }
00317     }
00318   }
00319 
00320   return b_base ;
00321 }
00322 
00323 //----------------------------------------------------------------------
00324 //----------------------------------------------------------------------
00325 
00326 CommAll::~CommAll()
00327 {
00328   try {
00329     CommBuffer::deallocate( m_size , m_send );
00330     if ( 1 < m_size ) { CommBuffer::deallocate( m_size , m_recv ); }
00331   } catch(...){}
00332   m_comm = parallel_machine_null();
00333   m_size = 0 ;
00334   m_rank = 0 ;
00335   m_send = NULL ;
00336   m_recv = NULL ;
00337 }
00338 
00339 CommAll::CommAll()
00340   : m_comm( parallel_machine_null() ),
00341     m_size( 0 ), m_rank( 0 ),
00342     m_bound( 0 ),
00343     m_max( 0 ),
00344     m_send(NULL),
00345     m_recv(NULL)
00346 {}
00347 
00348 CommAll::CommAll( ParallelMachine comm )
00349   : m_comm( comm ),
00350     m_size( parallel_machine_size( comm ) ),
00351     m_rank( parallel_machine_rank( comm ) ),
00352     m_bound( 0 ),
00353     m_max( 0 ),
00354     m_send(NULL),
00355     m_recv(NULL)
00356 {
00357   m_send = CommBuffer::allocate( m_size , NULL );
00358 
00359   if ( NULL == m_send ) {
00360     std::string msg("stk::CommAll::CommAll FAILED malloc");
00361     throw std::runtime_error(msg);
00362   }
00363 }
00364 
00365 bool CommAll::allocate_buffers( const unsigned num_msg_bounds ,
00366                                 const bool symmetric ,
00367                                 const bool local_flag )
00368 {
00369   const unsigned zero = 0 ;
00370   std::vector<unsigned> tmp( m_size , zero );
00371 
00372   for ( unsigned i = 0 ; i < m_size ; ++i ) {
00373     tmp[i] = m_send[i].size();
00374   }
00375 
00376   const unsigned * const send_size = & tmp[0] ;
00377   const unsigned * const recv_size = symmetric ? & tmp[0] : NULL ;
00378 
00379   return allocate_buffers( m_comm, num_msg_bounds,
00380                            send_size, recv_size, local_flag );
00381 }
00382 
00383 //----------------------------------------------------------------------
00384 
00385 void CommAll::reset_buffers()
00386 {
00387   if ( m_send ) {
00388     CommBuffer * m = m_send ;
00389     CommBuffer * const me = m + m_size ;
00390     for ( ; m != me ; ++m ) { m->reset(); }
00391   }
00392   if ( m_recv && 1 < m_size ) {
00393     CommBuffer * m = m_recv ;
00394     CommBuffer * const me = m + m_size ;
00395     for ( ; m != me ; ++m ) { m->reset(); }
00396   }
00397 }
00398 
00399 //----------------------------------------------------------------------
00400 
00401 void CommAll::swap_send_recv()
00402 {
00403   if ( m_recv == NULL ) {
00404     // ERROR
00405     std::string
00406       msg("stk::CommAll::swap_send_recv(){ NULL recv buffers }" );
00407     throw std::logic_error( msg );
00408   }
00409 
00410   CommBuffer * tmp_msg = m_send ;
00411   m_send = m_recv ;
00412   m_recv = tmp_msg ;
00413 }
00414 
00415 //----------------------------------------------------------------------
00416 
00417 bool CommAll::allocate_buffers( ParallelMachine comm ,
00418                                 const unsigned num_msg_bounds ,
00419               const unsigned * const send_size ,
00420               const unsigned * const recv_size ,
00421               const bool local_flag )
00422 {
00423   static const char method[] = "stk::CommAll::allocate_buffers" ;
00424   const unsigned uzero = 0 ;
00425 
00426   CommBuffer::deallocate( m_size , m_send );
00427   CommBuffer::deallocate( m_size , m_recv );
00428 
00429   m_comm = comm ;
00430   m_size = parallel_machine_size( comm );
00431   m_rank = parallel_machine_rank( comm );
00432   m_bound = num_msg_bounds ;
00433 
00434   std::ostringstream msg ;
00435 
00436   //--------------------------------
00437   // Buffer allocation
00438 
00439   {
00440     const bool send_none = NULL == send_size ;
00441 
00442     std::vector<unsigned> tmp_send ;
00443 
00444     if ( send_none ) { tmp_send.resize( m_size , uzero ); }
00445 
00446     const unsigned * const send = send_none ? & tmp_send[0] : send_size ;
00447 
00448     m_send = CommBuffer::allocate( m_size , send );
00449 
00450     if ( 1 < m_size ) {
00451 
00452       std::vector<unsigned> tmp_recv ;
00453 
00454       const bool recv_tbd = NULL == recv_size ;
00455 
00456       if ( recv_tbd ) { // Had better be globally consistent.
00457 
00458         tmp_recv.resize( m_size , uzero );
00459 
00460         unsigned * const r = & tmp_recv[0] ;
00461 
00462         comm_sizes( m_comm , m_bound , m_max , send , r );
00463       }
00464 
00465       const unsigned * const recv = recv_tbd  ? & tmp_recv[0] : recv_size ;
00466 
00467       m_recv = CommBuffer::allocate( m_size , recv );
00468     }
00469     else {
00470       m_recv = m_send ;
00471     }
00472   }
00473 
00474   bool error_alloc = m_send == NULL || m_recv == NULL ;
00475 
00476   //--------------------------------
00477   // Propogation of error flag, input flag, and quick/cheap/approximate
00478   // verification of send and receive messages.
00479   // Is the number and total size of messages consistent?
00480   // Sum message counts and sizes for grouped processors.
00481   // Sent are positive and received are negative.
00482   // Should finish with all total counts of zero.
00483 
00484   enum { NPSum  = 7 };
00485   enum { Length = 2 + 2 * NPSum };
00486 
00487   int local_result[ Length ];
00488   int global_result[ Length ];
00489 
00490   Copy<Length>( local_result , 0 );
00491 
00492   local_result[ Length - 2 ] = error_alloc ;
00493   local_result[ Length - 1 ] = local_flag ;
00494 
00495   if ( ! error_alloc ) {
00496 
00497     const unsigned r = 2 * ( m_rank % NPSum );
00498 
00499     for ( unsigned i = 0 ; i < m_size ; ++i ) {
00500       const unsigned n_send = m_send[i].capacity();
00501       const unsigned n_recv = m_recv[i].capacity();
00502 
00503       const unsigned s = 2 * ( i % NPSum );
00504 
00505       local_result[s]   += n_send ? 1 : 0 ;
00506       local_result[s+1] += n_send ;
00507 
00508       local_result[r]   -= n_recv ? 1 : 0 ;
00509       local_result[r+1] -= n_recv ;
00510     }
00511   }
00512 
00513   all_reduce_sum( m_comm , local_result , global_result , Length );
00514 
00515   bool global_flag ; 
00516 
00517   error_alloc   = global_result[ Length - 2 ] ;
00518   global_flag   = global_result[ Length - 1 ] ;
00519 
00520   bool ok = true ;
00521 
00522   for ( unsigned i = 0 ; ok && i < 2 * NPSum ; ++i ) {
00523     ok = 0 == global_result[i] ;
00524   }
00525 
00526   if ( error_alloc || ! ok ) {
00527     msg << method << " ERROR:" ;
00528     if ( error_alloc   ) { msg << " Failed memory allocation ," ; }
00529     if ( ! ok          ) { msg << " Parallel inconsistent send/receive ," ; }
00530     throw std::runtime_error( msg.str() );
00531   }
00532 
00533   return global_flag ;
00534 }
00535 
00536 //----------------------------------------------------------------------
00537 
00538 void CommAll::communicate()
00539 {
00540   static const char method[] = "stk::CommAll::communicate" ;
00541 
00542   std::ostringstream msg ;
00543 
00544   // Verify the send buffers have been filled, reset the buffer pointers
00545 
00546   for ( unsigned i = 0 ; i < m_size ; ++i ) {
00547 
00548     if ( m_send[i].remaining() ) {
00549       msg << method << " LOCAL[" << m_rank << "] ERROR: Send[" << i
00550           << "] Buffer not filled." ;
00551       throw std::underflow_error( msg.str() );
00552     }
00553 /*
00554     m_send[i].reset();
00555 */
00556     m_recv[i].reset();
00557   }
00558 
00559   if ( 1 < m_size ) {
00560     bool ok ;
00561 
00562     if ( m_bound < m_max ) {
00563       ok = all_to_all_dense( m_comm , m_send , m_recv , msg );
00564     }
00565     else {
00566       ok = all_to_all_sparse( m_comm , m_send , m_recv , msg );
00567     }
00568 
00569     if ( ! ok ) { throw std::runtime_error( msg.str() ); }
00570   }
00571 }
00572 
00573 //----------------------------------------------------------------------
00574 //----------------------------------------------------------------------
00575 
00576 CommBroadcast::CommBroadcast( ParallelMachine comm , unsigned root_rank )
00577   : m_comm( comm ),
00578     m_size( parallel_machine_size( comm ) ),
00579     m_rank( parallel_machine_rank( comm ) ),
00580     m_root_rank( root_rank ),
00581     m_buffer()
00582 {}
00583 
00584 bool CommBroadcast::allocate_buffer( const bool local_flag )
00585 {
00586   static const char method[] = "stk::CommBroadcast::allocate_buffer" ;
00587 
00588   unsigned root_rank_min = m_root_rank ;
00589   unsigned root_rank_max = m_root_rank ;
00590   unsigned root_send_size = m_root_rank == m_rank ? m_buffer.size() : 0 ;
00591   unsigned flag = local_flag ;
00592 
00593   all_reduce( m_comm , ReduceMin<1>( & root_rank_min ) &
00594                        ReduceMax<1>( & root_rank_max ) &
00595                        ReduceMax<1>( & root_send_size ) &
00596                        ReduceBitOr<1>( & flag ) );
00597 
00598   if ( root_rank_min != root_rank_max ) {
00599     std::string msg ;
00600     msg.append( method );
00601     msg.append( " FAILED: inconsistent root processor" );
00602     throw std::runtime_error( msg );
00603   }
00604 
00605   m_buffer.m_beg = static_cast<CommBuffer::ucharp>( malloc( root_send_size ) );
00606   m_buffer.m_ptr = m_buffer.m_beg ;
00607   m_buffer.m_end = m_buffer.m_beg + root_send_size ;
00608 
00609   return flag ;
00610 }
00611 
00612 CommBroadcast::~CommBroadcast()
00613 {
00614   try {
00615     if ( m_buffer.m_beg ) { free( static_cast<void*>( m_buffer.m_beg ) ); }
00616   } catch(...) {}
00617   m_buffer.m_beg = NULL ;
00618   m_buffer.m_ptr = NULL ;
00619   m_buffer.m_end = NULL ;
00620 }
00621 
00622 CommBuffer & CommBroadcast::recv_buffer()
00623 {
00624   return m_buffer ;
00625 }
00626 
00627 CommBuffer & CommBroadcast::send_buffer()
00628 {
00629   static const char method[] = "stk::CommBroadcast::send_buffer" ;
00630 
00631   if ( m_root_rank != m_rank ) {
00632     std::string msg ;
00633     msg.append( method );
00634     msg.append( " FAILED: is not root processor" );
00635     throw std::runtime_error( msg );
00636   }
00637 
00638   return m_buffer ;
00639 }
00640 
00641 void CommBroadcast::communicate()
00642 {
00643 #if defined( STK_HAS_MPI )
00644   {
00645     const int count = m_buffer.capacity();
00646     void * const buf = m_buffer.buffer();
00647 
00648     const int result = MPI_Bcast( buf, count, MPI_BYTE, m_root_rank, m_comm);
00649 
00650     if ( MPI_SUCCESS != result ) {
00651       std::ostringstream msg ;
00652       msg << "stk::CommBroadcast::communicate ERROR : "
00653           << result << " == MPI_Bcast" ;
00654       throw std::runtime_error( msg.str() );
00655     }
00656   }
00657 #endif
00658 
00659   m_buffer.reset();
00660 }
00661 
00662 //----------------------------------------------------------------------
00663 //----------------------------------------------------------------------
00664 
00665 CommGather::~CommGather()
00666 {
00667   try {
00668     free( static_cast<void*>( m_send.m_beg ) );
00669 
00670     if ( NULL != m_recv_count ) { free( static_cast<void*>( m_recv_count ) ); }
00671 
00672     if ( NULL != m_recv ) { CommBuffer::deallocate( m_size , m_recv ); }
00673   } catch(...){}
00674 }
00675 
00676 void CommGather::reset()
00677 {
00678   m_send.reset();
00679 
00680   if ( NULL != m_recv ) {
00681     for ( unsigned i = 0 ; i < m_size ; ++i ) { m_recv[i].reset(); }
00682   }
00683 }
00684 
00685 CommBuffer & CommGather::recv_buffer( unsigned p )
00686 {
00687   static CommBuffer empty ; 
00688 
00689   return m_size <= p ? empty : (
00690          m_size <= 1 ? m_send : m_recv[p] );
00691 }
00692 
00693 //----------------------------------------------------------------------
00694 
00695 CommGather::CommGather( ParallelMachine comm ,
00696                         unsigned root_rank , unsigned send_size )
00697   : m_comm( comm ),
00698     m_size( parallel_machine_size( comm ) ),
00699     m_rank( parallel_machine_rank( comm ) ),
00700     m_root_rank( root_rank ),
00701     m_send(),
00702     m_recv(NULL),
00703     m_recv_count(NULL),
00704     m_recv_displ(NULL)
00705 {
00706   m_send.m_beg = static_cast<CommBuffer::ucharp>( malloc( send_size ) );
00707   m_send.m_ptr = m_send.m_beg ;
00708   m_send.m_end = m_send.m_beg + send_size ;
00709 
00710 #if defined( STK_HAS_MPI )
00711 
00712   if ( 1 < m_size ) {
00713 
00714     const bool is_root = m_rank == m_root_rank ;
00715 
00716     if ( is_root ) {
00717       m_recv_count = static_cast<int*>( malloc(2*m_size*sizeof(int)) );
00718       m_recv_displ = m_recv_count + m_size ;
00719     }
00720 
00721     MPI_Gather( & send_size ,    1 , MPI_INT ,
00722                   m_recv_count , 1 , MPI_INT ,
00723                   m_root_rank , m_comm );
00724 
00725     if ( is_root ) {
00726       m_recv = CommBuffer::allocate( m_size ,
00727                  reinterpret_cast<unsigned*>( m_recv_count ) );
00728 
00729       for ( unsigned i = 0 ; i < m_size ; ++i ) {
00730         m_recv_displ[i] = m_recv[i].m_beg - m_recv[0].m_beg ;
00731       }
00732     }
00733   }
00734 
00735 #endif
00736 
00737 }
00738 
00739 
00740 void CommGather::communicate()
00741 {
00742 #if defined( STK_HAS_MPI )
00743 
00744   if ( 1 < m_size ) {
00745 
00746     const int send_count = m_send.capacity();
00747 
00748     void * const send_buf = m_send.buffer();
00749     void * const recv_buf = m_rank == m_root_rank ? m_recv->buffer() : NULL ;
00750 
00751     MPI_Gatherv( send_buf , send_count , MPI_BYTE ,
00752                  recv_buf , m_recv_count , m_recv_displ , MPI_BYTE ,
00753                  m_root_rank , m_comm );
00754   }
00755 
00756 #endif
00757 
00758   reset();
00759 }
00760 
00761 //----------------------------------------------------------------------
00762 //----------------------------------------------------------------------
00763 
00764 #if defined( STK_HAS_MPI )
00765 
00766 bool comm_dense_sizes( ParallelMachine comm ,
00767                        const unsigned * const send_size ,
00768                              unsigned * const recv_size ,
00769                        bool local_flag )
00770 {
00771   static const char method[] = "stk::comm_dense_sizes" ;
00772 
00773   const unsigned zero = 0 ;
00774   const unsigned p_size = parallel_machine_size( comm );
00775 
00776   std::vector<unsigned> send_buf( p_size * 2 , zero );
00777   std::vector<unsigned> recv_buf( p_size * 2 , zero );
00778 
00779   for ( unsigned i = 0 ; i < p_size ; ++i ) {
00780     const unsigned i2 = i * 2 ;
00781     send_buf[i2]   = send_size[i] ;
00782     send_buf[i2+1] = local_flag ;
00783   }
00784 
00785   {
00786     unsigned * const ps = & send_buf[0] ;
00787     unsigned * const pr = & recv_buf[0] ;
00788     const int result =
00789        MPI_Alltoall( ps , 2 , MPI_UNSIGNED , pr , 2 , MPI_UNSIGNED , comm );
00790 
00791     if ( MPI_SUCCESS != result ) {
00792       std::string msg ;
00793       msg.append( method );
00794       msg.append( " FAILED: MPI_SUCCESS != MPI_Alltoall" );
00795       throw std::runtime_error( msg );
00796     }
00797   }
00798 
00799   bool global_flag = false ;
00800 
00801   for ( unsigned i = 0 ; i < p_size ; ++i ) {
00802     const unsigned i2 = i * 2 ;
00803     recv_size[i] = recv_buf[i2] ;
00804     if ( recv_buf[i2+1] ) { global_flag = true ; }
00805   }
00806 
00807   return global_flag ;
00808 }
00809 
00810 //----------------------------------------------------------------------
00811 
00812 namespace {
00813 
00814 extern "C" {
00815 
00816 void sum_np_max_2_op(
00817   void * inv , void * outv , int * len , ParallelDatatype * )
00818 {
00819   const int np = *len - 2 ;
00820   unsigned * ind  = (unsigned *) inv ;
00821   unsigned * outd = (unsigned *) outv ;
00822 
00823   // Sum all but the last two
00824   // the last two are maximum
00825 
00826   for ( int i = 0 ; i < np ; ++i ) {
00827     *outd += *ind ;
00828     ++outd ;
00829     ++ind ;
00830   }
00831   if ( outd[0] < ind[0] ) { outd[0] = ind[0] ; }
00832   if ( outd[1] < ind[1] ) { outd[1] = ind[1] ; }
00833 }
00834 
00835 }
00836 
00837 }
00838 
00839 bool comm_sizes( ParallelMachine comm ,
00840                  const unsigned   num_msg_bound ,
00841                        unsigned & num_msg_maximum ,
00842                  const unsigned * const send_size ,
00843                        unsigned * const recv_size ,
00844                  bool local_flag )
00845 {
00846   static const char method[] = "stk::comm_unknown_sizes" ;
00847   const unsigned uzero = 0 ;
00848 
00849   static MPI_Op mpi_op = MPI_OP_NULL ;
00850 
00851   if ( mpi_op == MPI_OP_NULL ) {
00852     // Is fully commutative
00853     MPI_Op_create( sum_np_max_2_op , 1 , & mpi_op );
00854   }
00855 
00856   const unsigned p_size = parallel_machine_size( comm );
00857   const unsigned p_rank = parallel_machine_rank( comm );
00858 
00859   int result ;
00860 
00861   std::ostringstream msg ;
00862 
00863   num_msg_maximum = 0 ;
00864 
00865   unsigned num_recv = 0 ;
00866   unsigned max_msg  = 0 ;
00867   bool     global_flag = false ;
00868 
00869   {
00870     std::vector<unsigned> send_buf( p_size + 2 , uzero );
00871     std::vector<unsigned> recv_buf( p_size + 2 , uzero );
00872 
00873     unsigned * const p_send = & send_buf[0] ;
00874     unsigned * const p_recv = & recv_buf[0] ;
00875 
00876     for ( unsigned i = 0 ; i < p_size ; ++i ) {
00877       recv_size[i] = 0 ; // Zero output
00878       if ( send_size[i] ) {
00879         send_buf[i] = 1 ;
00880         ++max_msg ;
00881       }
00882     }
00883     send_buf[p_size]   = max_msg ;
00884     send_buf[p_size+1] = local_flag ;
00885 
00886     result = MPI_Allreduce(p_send,p_recv,p_size+2,MPI_UNSIGNED,mpi_op,comm);
00887 
00888     if ( result != MPI_SUCCESS ) {
00889       // PARALLEL ERROR
00890       msg << method << " ERROR: " << result << " == MPI_AllReduce" ;
00891       throw std::runtime_error( msg.str() );
00892     }
00893 
00894     num_recv    = recv_buf[ p_rank ] ;
00895     max_msg     = recv_buf[ p_size ] ;
00896     global_flag = recv_buf[ p_size + 1 ] ;
00897 
00898     // max_msg is now the maximum send count,
00899     // Loop over receive counts to determine
00900     // if a receive count is larger.
00901 
00902     for ( unsigned i = 0 ; i < p_size ; ++i ) {
00903       if ( max_msg < recv_buf[i] ) { max_msg = recv_buf[i] ; }
00904     }
00905   }
00906 
00907   num_msg_maximum = max_msg ;
00908 
00909   if ( num_msg_bound < max_msg ) {
00910     // Dense, pay for an all-to-all
00911 
00912     result =
00913        MPI_Alltoall( (void*) send_size , 1 , MPI_UNSIGNED ,
00914                      recv_size , 1 , MPI_UNSIGNED , comm );
00915 
00916     if ( MPI_SUCCESS != result ) {
00917       // LOCAL ERROR ?
00918       msg << method << " ERROR: " << result << " == MPI_Alltoall" ;
00919       throw std::runtime_error( msg.str() );
00920     }
00921   }
00922   else if ( max_msg ) {
00923     // Sparse, just do point-to-point
00924 
00925     const int mpi_tag = STK_MPI_TAG_SIZING ;
00926  
00927     MPI_Request request_null = MPI_REQUEST_NULL ;
00928     std::vector<MPI_Request> request( num_recv , request_null );
00929     std::vector<MPI_Status>  status(  num_recv );
00930     std::vector<unsigned>    buf( num_recv );
00931 
00932     // Post receives for point-to-point message sizes
00933 
00934     for ( unsigned i = 0 ; i < num_recv ; ++i ) {
00935       unsigned    * const p_buf     = & buf[i] ;
00936       MPI_Request * const p_request = & request[i] ;
00937       result = MPI_Irecv( p_buf , 1 , MPI_UNSIGNED ,
00938                           MPI_ANY_SOURCE , mpi_tag , comm , p_request );
00939       if ( MPI_SUCCESS != result ) {
00940         // LOCAL ERROR
00941         msg << method << " ERROR: " << result << " == MPI_Irecv" ;
00942         throw std::runtime_error( msg.str() );
00943       }
00944     }
00945 
00946     // Send the point-to-point message sizes,
00947     // rotate the sends in an attempt to balance the message traffic.
00948 
00949     for ( unsigned i = 0 ; i < p_size ; ++i ) {
00950       int      dst = ( i + p_rank ) % p_size ;
00951       unsigned value = send_size[dst] ;
00952       if ( value ) {
00953         result = MPI_Send( & value , 1 , MPI_UNSIGNED , dst , mpi_tag , comm );
00954         if ( MPI_SUCCESS != result ) {
00955           // LOCAL ERROR
00956           msg << method << " ERROR: " << result << " == MPI_Send" ;
00957           throw std::runtime_error( msg.str() );
00958         }
00959       }
00960     }
00961 
00962     // Wait for all receives
00963 
00964     {
00965       MPI_Request * const p_request = & request[0] ;
00966       MPI_Status  * const p_status  = & status[0] ;
00967       result = MPI_Waitall( num_recv , p_request , p_status );
00968     }
00969     if ( MPI_SUCCESS != result ) {
00970       // LOCAL ERROR ?
00971       msg << method << " ERROR: " << result << " == MPI_Waitall" ;
00972       throw std::runtime_error( msg.str() );
00973     }
00974 
00975     // Set the receive message sizes
00976 
00977     for ( unsigned i = 0 ; i < num_recv ; ++i ) {
00978       MPI_Status * const recv_status = & status[i] ;
00979       const int recv_proc = recv_status->MPI_SOURCE ;
00980       const int recv_tag  = recv_status->MPI_TAG ;
00981       int recv_count  = 0 ;
00982 
00983       MPI_Get_count( recv_status , MPI_UNSIGNED , & recv_count );
00984 
00985       if ( recv_tag != mpi_tag || recv_count != 1 ) {
00986         msg << method << " ERROR: Received buffer mismatch " ;
00987         msg << "P" << p_rank << " <- P" << recv_proc ;
00988         msg << "  " << 1 << " != " << recv_count ;
00989         throw std::runtime_error( msg.str() );
00990       }
00991 
00992       const unsigned r_size = buf[i] ;
00993       recv_size[ recv_proc ] = r_size ;
00994     }
00995   }
00996 
00997   return global_flag ;
00998 }
00999 
01000 //----------------------------------------------------------------------
01001 //----------------------------------------------------------------------
01002 
01003 #else
01004 
01005 
01006 bool comm_sizes( ParallelMachine ,
01007                  const unsigned ,
01008                        unsigned & num_msg_maximum ,
01009                  const unsigned * const send_size ,
01010                        unsigned * const recv_size ,
01011                  bool local_flag )
01012 {
01013   num_msg_maximum = send_size[0] ? 1 : 0 ;
01014 
01015   recv_size[0] = send_size[0] ;
01016 
01017   return local_flag ;
01018 }
01019 
01020 bool comm_dense_sizes( ParallelMachine ,
01021                        const unsigned * const send_size ,
01022                              unsigned * const recv_size ,
01023                        bool local_flag )
01024 {
01025   recv_size[0] = send_size[0] ;
01026 
01027   return local_flag ;
01028 }
01029 
01030 //----------------------------------------------------------------------
01031 
01032 #endif
01033 
01034 }
01035 

Generated on Tue Jul 13 09:27:32 2010 for Sierra Toolkit by  doxygen 1.4.7