Sierra Toolkit Version of the Day
ParallelReduce.cpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #include <stk_util/parallel/ParallelReduce.hpp>
00010 
00011 #include <stdlib.h>
00012 #include <stdexcept>
00013 #include <sstream>
00014 #include <vector>
00015 
00016 namespace stk {
00017 
00018 #if defined( STK_HAS_MPI )
00019 
00022 
00023 void all_write_string( ParallelMachine arg_comm ,
00024                        std::ostream &  arg_root_os ,
00025                        const std::string & arg_msg )
00026 {
00027   const int i_zero = 0 ;
00028   const int p_root = 0 ;
00029   const unsigned p_size = parallel_machine_size( arg_comm );
00030   const unsigned p_rank = parallel_machine_rank( arg_comm );
00031   
00032   int result ;
00033 
00034   // Gather the send counts on root processor
00035 
00036   int send_count = arg_msg.size();
00037 
00038   std::vector<int> recv_count( p_size , i_zero );
00039 
00040   int * const recv_count_ptr = & recv_count[0] ;
00041 
00042   result = MPI_Gather( & send_count , 1 , MPI_INT ,
00043                        recv_count_ptr , 1 , MPI_INT ,
00044                        p_root , arg_comm );
00045 
00046   if ( MPI_SUCCESS != result ) {
00047     std::ostringstream msg ;
00048     msg << "stk::all_write FAILED: MPI_Gather = " << result ;
00049     throw std::runtime_error( msg.str() );
00050   }
00051 
00052   // Receive counts are only non-zero on the root processor:
00053 
00054   std::vector<int> recv_displ( p_size + 1 , i_zero );
00055 
00056   for ( unsigned i = 0 ; i < p_size ; ++i ) {
00057     recv_displ[i+1] = recv_displ[i] + recv_count[i] ;
00058   }
00059 
00060   const unsigned recv_size = (unsigned) recv_displ[ p_size ] ;
00061  
00062   std::vector<char> buffer( recv_size );
00063 
00064   {
00065     const char * const send_ptr = arg_msg.c_str();
00066     char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ;
00067     int * const recv_displ_ptr = & recv_displ[0] ;
00068 
00069     result = MPI_Gatherv( (void*) send_ptr, send_count, MPI_CHAR ,
00070                           recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR,
00071                           p_root, arg_comm );
00072   }
00073 
00074   if ( MPI_SUCCESS != result ) {
00075     std::ostringstream msg ;
00076     msg << "stk::all_write FAILED: MPI_Gatherv = " << result ;
00077     throw std::runtime_error( msg.str() );
00078   }
00079 
00080   if ( p_root == (int) p_rank ) {
00081 //    arg_root_os << std::endl ;
00082     for ( unsigned i = 0 ; i < p_size ; ++i ) {
00083       if ( recv_count[i] ) {
00084         char * const ptr = & buffer[ recv_displ[i] ];
00085         arg_root_os.write( ptr , recv_count[i] );
00086         arg_root_os << std::endl ;
00087       }
00088     }
00089     arg_root_os.flush();
00090   }
00091 }
00092 
00093 //----------------------------------------------------------------------
00094 //----------------------------------------------------------------------
00095 
00096 void all_reduce( ParallelMachine  arg_comm ,
00097                  ParallelReduceOp arg_op ,
00098                  void           * arg_in ,
00099                  void           * arg_out ,
00100                  unsigned         arg_len )
00101 {
00102   MPI_Op mpi_op = MPI_OP_NULL ;
00103 
00104   MPI_Op_create( arg_op , 0 , & mpi_op );
00105 
00106   // The SUN was buggy when combining an
00107   // MPI_Allreduce with a user defined operator,
00108   // use reduce/broadcast instead.
00109 /*
00110   const int result = 
00111     MPI_Allreduce(arg_in,arg_out,arg_len,MPI_BYTE,mpi_op,arg_comm);
00112 */
00113 
00114   const int result_reduce =
00115     MPI_Reduce(arg_in,arg_out,arg_len,MPI_BYTE,mpi_op,0,arg_comm);
00116 
00117   const int result_bcast =
00118     MPI_Bcast(arg_out,arg_len,MPI_BYTE,0,arg_comm);
00119 
00120   MPI_Op_free( & mpi_op );
00121 
00122   if ( MPI_SUCCESS != result_reduce || MPI_SUCCESS != result_bcast ) {
00123     std::ostringstream msg ;
00124     msg << "stk::all_reduce FAILED: MPI_Reduce = " << result_reduce
00125         << " MPI_Bcast = " << result_bcast ;
00126     throw std::runtime_error( msg.str() );
00127   }
00128 }
00129 
00130 //----------------------------------------------------------------------
00131 //----------------------------------------------------------------------
00132 
00133 void all_reduce_sum( ParallelMachine comm ,
00134                      const double * local , double * global , unsigned count )
00135 {
00136   double * tmp = const_cast<double*>( local );
00137   MPI_Allreduce( tmp , global , count , MPI_DOUBLE , MPI_SUM , comm );
00138 }
00139 
00140 void all_reduce_sum( ParallelMachine comm ,
00141                      const float * local , float * global , unsigned count )
00142 {
00143   float * tmp = const_cast<float*>( local );
00144   MPI_Allreduce( tmp , global , count , MPI_FLOAT , MPI_SUM , comm );
00145 }
00146 
00147 void all_reduce_sum( ParallelMachine comm ,
00148                      const int * local , int * global , unsigned count )
00149 {
00150   int * tmp = const_cast<int*>( local );
00151   MPI_Allreduce( tmp , global , count , MPI_INT , MPI_SUM , comm );
00152 }
00153 
00154 void all_reduce_sum( ParallelMachine comm ,
00155                      const size_t * local , size_t * global , unsigned count )
00156 {
00157   size_t * tmp = const_cast<size_t*>( local );
00158 
00159   if ( sizeof(size_t) == sizeof(unsigned) ) {
00160     MPI_Allreduce( tmp , global , count , MPI_UNSIGNED , MPI_SUM , comm );
00161   }
00162   else if ( sizeof(size_t) == sizeof(unsigned long) ) {
00163     MPI_Allreduce( tmp , global , count , MPI_UNSIGNED_LONG , MPI_SUM , comm );
00164   }
00165   else {
00166     unsigned long * const in  = new unsigned long[ count ];
00167     unsigned long * const out = new unsigned long[ count ];
00168 
00169     for ( unsigned i = 0 ; i < count ; ++i ) { in[i] = local[i] ; }
00170     MPI_Allreduce( in , out , count , MPI_UNSIGNED_LONG , MPI_SUM , comm );
00171     for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = out[i] ; }
00172 
00173     delete[] in ;
00174     delete[] out ;
00175   }
00176 }
00177 
00178 void all_reduce_bor( ParallelMachine comm ,
00179                      const unsigned * local ,
00180                      unsigned * global , unsigned count )
00181 {
00182   unsigned * tmp = const_cast<unsigned*>( local );
00183   MPI_Allreduce( tmp , global , count , MPI_UNSIGNED , MPI_BOR , comm );
00184 }
00185 
00186 //----------------------------------------------------------------------
00187 //----------------------------------------------------------------------
00188 
00189 #else
00190 
00191 void all_write_string( ParallelMachine ,
00192                        std::ostream &  arg_root_os ,
00193                        const std::string & arg_msg )
00194 {
00195   arg_root_os << arg_msg ;
00196 }
00197 
00198 void all_reduce_sum( ParallelMachine ,
00199                      const double * local , double * global , unsigned count )
00200 {
00201   for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
00202 }
00203 
00204 void all_reduce_sum( ParallelMachine ,
00205                      const float * local , float * global , unsigned count )
00206 {
00207   for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
00208 }
00209 
00210 void all_reduce_sum( ParallelMachine ,
00211                      const int * local , int * global , unsigned count )
00212 {
00213   for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
00214 }
00215 
00216 void all_reduce_sum( ParallelMachine ,
00217                      const size_t * local , size_t * global , unsigned count )
00218 {
00219   for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
00220 }
00221 
00222 void all_reduce_bor( ParallelMachine ,
00223                      const unsigned * local ,
00224                      unsigned * global , unsigned count )
00225 {
00226   for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; }
00227 }
00228 
00229 //----------------------------------------------------------------------
00230 
00231 void all_reduce( ParallelMachine ,
00232                  ParallelReduceOp ,
00233                  void   * arg_in ,
00234                  void   * arg_out ,
00235                  unsigned arg_len )
00236 {
00237   unsigned char * i = reinterpret_cast<unsigned char *>( arg_in );
00238   unsigned char * o = reinterpret_cast<unsigned char *>( arg_out );
00239   for ( unsigned char * const e = i + arg_len ; e != i ; ++i , ++o ) {
00240     *o = *i ;
00241   }
00242 }
00243 
00244 #endif
00245 
00246 }
00247 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends