Epetra_MpiDistributor.cpp

Go to the documentation of this file.
00001 //@HEADER
00002 // ************************************************************************
00003 // 
00004 //               Epetra: Linear Algebra Services Package 
00005 //                 Copyright (2001) Sandia Corporation
00006 // 
00007 // Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive
00008 // license for use of this work by or on behalf of the U.S. Government.
00009 // 
00010 // This library is free software; you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as
00012 // published by the Free Software Foundation; either version 2.1 of the
00013 // License, or (at your option) any later version.
00014 //  
00015 // This library is distributed in the hope that it will be useful, but
00016 // WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 // Lesser General Public License for more details.
00019 //  
00020 // You should have received a copy of the GNU Lesser General Public
00021 // License along with this library; if not, write to the Free Software
00022 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
00023 // USA
00024 // Questions? Contact Michael A. Heroux (maherou@sandia.gov) 
00025 // 
00026 // ************************************************************************
00027 //@HEADER
00028 
00029 #include "Epetra_MpiDistributor.h"
00030 #include "Epetra_MpiComm.h"
00031 
00032 
00033 //==============================================================================
00034 // Epetra_MpiDistributor constructor
00035 Epetra_MpiDistributor::Epetra_MpiDistributor(const Epetra_MpiComm & Comm): 
00036   Epetra_Object("Epetra::MpiDistributor"),
00037   lengths_to_(0),
00038   procs_to_(0),
00039   indices_to_(0),
00040   size_indices_to_(0),
00041   lengths_from_(0),
00042   procs_from_(0),
00043   indices_from_(0),
00044   size_indices_from_(0),
00045   resized_(false),
00046   sizes_(0),
00047   sizes_to_(0),
00048   starts_to_(0),
00049   starts_to_ptr_(0),
00050   indices_to_ptr_(0),
00051   sizes_from_(0),
00052   starts_from_(0),
00053   starts_from_ptr_(0),
00054   indices_from_ptr_(0),
00055   nrecvs_(0),
00056   nsends_(0),
00057   nexports_(0),
00058   self_msg_(0),
00059   max_send_length_(0),
00060   total_recv_length_(0),
00061   tag_(Comm.GetMpiTag()),
00062   epComm_(&Comm),
00063   comm_(Comm.GetMpiComm()),
00064   request_(0),
00065   status_(0),
00066   no_delete_(false),
00067   send_array_(0),
00068   send_array_size_(0),
00069   comm_plan_reverse_(0)
00070 {
00071 }
00072 
00073 //==============================================================================
00074 Epetra_MpiDistributor::Epetra_MpiDistributor(const Epetra_MpiDistributor & Distributor):
00075   Epetra_Object("Epetra::MpiDistributor"),
00076   lengths_to_(0),
00077   procs_to_(0),
00078   indices_to_(0),
00079   size_indices_to_(Distributor.size_indices_to_),
00080   lengths_from_(0),
00081   procs_from_(0),
00082   indices_from_(0),
00083   size_indices_from_(Distributor.size_indices_from_),
00084   resized_(false),
00085   sizes_(0),
00086   sizes_to_(0),
00087   starts_to_(0),
00088   starts_to_ptr_(0),
00089   indices_to_ptr_(0),
00090   sizes_from_(0),
00091   starts_from_(0),
00092   starts_from_ptr_(0),
00093   indices_from_ptr_(0),
00094   nrecvs_(Distributor.nrecvs_),
00095   nsends_(Distributor.nsends_),
00096   nexports_(Distributor.nexports_),
00097   self_msg_(Distributor.self_msg_),
00098   max_send_length_(Distributor.max_send_length_),
00099   total_recv_length_(Distributor.total_recv_length_),
00100   tag_(Distributor.tag_),
00101   epComm_(Distributor.epComm_),
00102   comm_(Distributor.comm_),
00103   request_(0),
00104   status_(0),
00105   no_delete_(Distributor.no_delete_),
00106   send_array_(0),
00107   send_array_size_(0),
00108   comm_plan_reverse_(0)
00109 {
00110   int i;
00111   if (nsends_>0) {
00112     lengths_to_ = new int[nsends_];
00113     procs_to_ = new int[nsends_];
00114     for (i=0; i<nsends_; i++) {
00115       lengths_to_[i] = Distributor.lengths_to_[i];
00116       procs_to_[i] = Distributor.procs_to_[i];
00117     }
00118   }
00119   if (size_indices_to_>0) {
00120     indices_to_ = new int[size_indices_to_];
00121     for (i=0; i<size_indices_to_; i++) {
00122       indices_to_[i] = Distributor.indices_to_[i];
00123     }
00124   }
00125 
00126   if (nrecvs_>0) {
00127     lengths_from_ = new int[nrecvs_];
00128     procs_from_ = new int[nrecvs_];
00129     request_ = new MPI_Request[ nrecvs_ ];
00130     status_ = new MPI_Status[ nrecvs_ ];
00131     for (i=0; i<nrecvs_; i++) {
00132       lengths_from_[i] = Distributor.lengths_from_[i];
00133       procs_from_[i] = Distributor.procs_from_[i];
00134     }
00135   }
00136   if (size_indices_from_>0) {
00137     indices_from_ = new int[size_indices_from_];
00138     for (i=0; i<size_indices_from_; i++) {
00139       indices_from_[i] = Distributor.indices_from_[i];
00140     }
00141   }
00142 }
00143 
00144 //==============================================================================
00145 // Epetra_MpiDistributor destructor
00146 Epetra_MpiDistributor::~Epetra_MpiDistributor()
00147 {
00148   if( !no_delete_ )
00149   {
00150     if( lengths_to_ != 0 ) delete [] lengths_to_;
00151     if( procs_to_ != 0 ) delete [] procs_to_;
00152     if( indices_to_ != 0 ) delete [] indices_to_;
00153     if( lengths_from_ != 0 ) delete [] lengths_from_;
00154     if( procs_from_ != 0 ) delete [] procs_from_;
00155     if( indices_from_ != 0 ) delete [] indices_from_;
00156     if( starts_to_ != 0 ) delete [] starts_to_;
00157     if( starts_from_ != 0 ) delete [] starts_from_;
00158   }
00159 
00160   if( sizes_ != 0 ) delete [] sizes_;
00161   if( sizes_to_ != 0 ) delete [] sizes_to_;
00162   if( sizes_from_ != 0 ) delete [] sizes_from_;
00163   if( starts_to_ptr_ != 0 ) delete [] starts_to_ptr_;
00164   if( starts_from_ptr_ != 0 ) delete [] starts_from_ptr_;
00165   if( indices_to_ptr_ != 0 ) delete [] indices_to_ptr_;
00166   if( indices_from_ptr_ != 0 ) delete [] indices_from_ptr_;
00167 
00168   if( request_ != 0 ) delete [] request_;
00169   if( status_ != 0 ) delete [] status_;
00170 
00171   if( send_array_size_ != 0 ) { delete [] send_array_; send_array_size_ = 0; }
00172 
00173   if( comm_plan_reverse_ != 0 ) delete comm_plan_reverse_;
00174 }
00175 
00176 
00177 //==============================================================================
00178 //---------------------------------------------------------------------------
00179 //CreateFromSends Method
00180 // - create communication plan given a known list of procs to send to
00181 //---------------------------------------------------------------------------
00182 int Epetra_MpiDistributor::CreateFromSends( const int & NumExportIDs,
00183                                             const int * ExportPIDs,
00184                                             bool Deterministic,
00185                                             int & NumRemoteIDs )
00186 {
00187   (void)Deterministic;
00188   nexports_ = NumExportIDs;
00189 
00190   int i;
00191 
00192   int my_proc;
00193   MPI_Comm_rank( comm_, &my_proc );
00194 
00195   int nprocs;
00196   MPI_Comm_size( comm_, &nprocs );
00197 
00198   // Check to see if items are grouped by processor w/o gaps
00199   // If so, indices_to -> 0
00200 
00201   // Setup data structures for quick traversal of arrays
00202   int * starts = new int[ nprocs + 1 ];
00203   for( i = 0; i < nprocs; i++ )
00204     starts[i] = 0;
00205 
00206   int nactive = 0;
00207   bool no_send_buff = true;
00208   int numDeadIndices = 0; // In some cases the GIDs will not be owned by any processors and the PID will be -1
00209 
00210   for( i = 0; i < NumExportIDs; i++ )
00211   {
00212     if( no_send_buff && i && (ExportPIDs[i] < ExportPIDs[i-1]) )
00213       no_send_buff = false;
00214     if( ExportPIDs[i] >= 0 )
00215     {
00216       ++starts[ ExportPIDs[i] ];
00217       ++nactive;
00218     }
00219     else numDeadIndices++; // Increase the number of dead indices.  Used below to leave these out of the analysis
00220   }
00221 
00222   self_msg_ = ( starts[my_proc] != 0 ) ? 1 : 0;
00223 
00224   nsends_ = 0;
00225 
00226   if( no_send_buff ) //grouped by processor, no send buffer or indices_to_ needed
00227   {
00228     for( i = 0; i < nprocs; ++i )
00229       if( starts[i] ) ++nsends_;
00230 
00231     if( nsends_ )
00232     {
00233       procs_to_ = new int[nsends_];
00234       starts_to_ = new int[nsends_];
00235       lengths_to_ = new int[nsends_];
00236     }
00237 
00238     int index = numDeadIndices;  // Leave off the dead indices (PID = -1)
00239     int proc;
00240     for( i = 0; i < nsends_; ++i )
00241     {
00242       starts_to_[i] = index;
00243       proc = ExportPIDs[index];
00244       procs_to_[i] = proc;
00245       index += starts[proc];
00246     }
00247 
00248     if( nsends_ )
00249       Sort_ints_( procs_to_, starts_to_, nsends_ );
00250 
00251     max_send_length_ = 0;
00252 
00253     for( i = 0; i < nsends_; ++i )
00254     {
00255       proc = procs_to_[i];
00256       lengths_to_[i] = starts[proc];
00257       if( (proc != my_proc) && (lengths_to_[i] > max_send_length_) )
00258         max_send_length_ = lengths_to_[i];
00259     }
00260   }
00261   else //not grouped by processor, need send buffer and indices_to_
00262   {
00263     if( starts[0] != 0 ) nsends_ = 1;
00264 
00265     for( i = 1; i < nprocs; i++ )
00266     {
00267       if( starts[i] != 0 ) ++nsends_;
00268       starts[i] += starts[i-1];
00269     }
00270 
00271     for( i = nprocs-1; i != 0; i-- )
00272       starts[i] = starts[i-1];
00273 
00274     starts[0] = 0;
00275 
00276     if (nactive>0) {
00277       indices_to_ = new int[ nactive ];
00278       size_indices_to_ = nactive;
00279     }
00280 
00281     for( i = 0; i < NumExportIDs; i++ )
00282     if( ExportPIDs[i] >= 0 )
00283     {
00284       indices_to_[ starts[ ExportPIDs[i] ] ] = i;
00285       ++starts[ ExportPIDs[i] ];
00286     }
00287 
00288     //Reconstuct starts array to index into indices_to.
00289 
00290     for( i = nprocs-1; i != 0; i-- )
00291       starts[i] = starts[i-1];
00292     starts[0] = 0;
00293     starts[nprocs] = nactive;
00294 
00295     if (nsends_>0) {
00296       lengths_to_ = new int[ nsends_ ];
00297       procs_to_ = new int[ nsends_ ];
00298       starts_to_ = new int[ nsends_ ];
00299     }
00300 
00301     int j = 0;
00302     max_send_length_ = 0;
00303 
00304     for( i = 0; i < nprocs; i++ )
00305       if( starts[i+1] != starts[i] )
00306       {
00307         lengths_to_[j] = starts[i+1] - starts[i];
00308         starts_to_[j] = starts[i];
00309         if( ( i != my_proc ) && ( lengths_to_[j] > max_send_length_ ) )
00310           max_send_length_ = lengths_to_[j];
00311         procs_to_[j] = i;
00312         j++;
00313       }
00314   }
00315     
00316   delete [] starts;
00317 
00318   nsends_ -= self_msg_;
00319 
00320   //Invert map to see what msgs are received and what length
00321   EPETRA_CHK_ERR( ComputeRecvs_( my_proc, nprocs ) );
00322 
00323   if (nrecvs_>0) {
00324     request_ = new MPI_Request[ nrecvs_ ];
00325     status_ = new MPI_Status[ nrecvs_ ];
00326   }
00327 
00328   NumRemoteIDs = total_recv_length_;
00329 
00330   return 0;
00331 }
00332 
00333 //==============================================================================
00334 //---------------------------------------------------------------------------
00335 //CreateFromRecvs Method
00336 // - create communication plan given a known list of procs to recv from
00337 //---------------------------------------------------------------------------
00338 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
00339            const int * RemoteGIDs,
00340                  const int * RemotePIDs,
00341            bool Deterministic,
00342                  int & NumExportIDs,
00343            int *& ExportGIDs,
00344            int *& ExportPIDs )
00345 {
00346   int my_proc;
00347   MPI_Comm_rank( comm_, &my_proc );
00348 
00349   int nprocs;
00350   MPI_Comm_size( comm_, &nprocs );
00351 
00352   EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
00353          ExportGIDs, ExportPIDs, my_proc) );
00354 
00355   int testNumRemoteIDs;
00356   EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
00357            Deterministic, testNumRemoteIDs ) );
00358 
00359   return(0);
00360 }
00361 
00362 //==============================================================================
00363 //---------------------------------------------------------------------------
00364 //ComputeRecvs Method
00365 //---------------------------------------------------------------------------
00366 int Epetra_MpiDistributor::ComputeRecvs_( int my_proc, 
00367               int nprocs )
00368 {
00369   int * msg_count = new int[ nprocs ];
00370   int * counts = new int[ nprocs ];
00371 
00372   int i;
00373   MPI_Status status;
00374 
00375   for( i = 0; i < nprocs; i++ )
00376   {
00377     msg_count[i] = 0;
00378     counts[i] = 1;
00379   }
00380 
00381   for( i = 0; i < nsends_+self_msg_; i++ )
00382     msg_count[ procs_to_[i] ] = 1;
00383 
00384 #if defined(REDUCE_SCATTER_BUG)
00385 // the bug is found in mpich on linux platforms
00386   MPI_Reduce(msg_count, counts, nprocs, MPI_INT, MPI_SUM, 0, comm_);
00387   MPI_Scatter(counts, 1, MPI_INT, &nrecvs_, 1, MPI_INT, 0, comm_);
00388 #else
00389   MPI_Reduce_scatter( msg_count, &nrecvs_, counts, MPI_INT, MPI_SUM, comm_ );
00390 #endif
00391 
00392   delete [] msg_count;
00393   delete [] counts;
00394 
00395   if (nrecvs_>0) {
00396     lengths_from_ = new int[nrecvs_];
00397     procs_from_ = new int[nrecvs_];
00398     for(i=0; i<nrecvs_; ++i) {
00399       lengths_from_[i] = 0;
00400       procs_from_[i] = 0;
00401     }
00402   }
00403   for( i = 0; i < (nsends_+self_msg_); i++ )
00404     if( procs_to_[i] != my_proc ) {
00405       MPI_Send( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
00406     }
00407     else
00408     {
00409       //set self_msg_ to end block of recv arrays
00410       lengths_from_[nrecvs_-1] = lengths_to_[i];
00411       procs_from_[nrecvs_-1] = my_proc;
00412     }
00413 
00414   for( i = 0; i < (nrecvs_-self_msg_); i++ )
00415   {
00416     MPI_Recv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &status );
00417     procs_from_[i] = status.MPI_SOURCE;
00418   }
00419 
00420   MPI_Barrier( comm_ );
00421 
00422   Sort_ints_( procs_from_, lengths_from_, nrecvs_ );
00423 
00424   // Compute indices_from_
00425   // Seems to break some rvs communication
00426 /* Not necessary since rvs communication is always blocked
00427   size_indices_from_ = 0;
00428   if( nrecvs_ > 0 )
00429   {
00430     for( i = 0; i < nrecvs_; i++ )  size_indices_from_ += lengths_from_[i];
00431     indices_from_ = new int[ size_indices_from_ ];
00432 
00433     for (i=0; i<size_indices_from_; i++) indices_from_[i] = i;
00434   }
00435 */
00436 
00437   if (nrecvs_>0) starts_from_ = new int[nrecvs_];
00438   int j = 0;
00439   for( i=0; i<nrecvs_; ++i )
00440   {
00441     starts_from_[i] = j;
00442     j += lengths_from_[i];
00443   }
00444 
00445   total_recv_length_ = 0;
00446   for( i = 0; i < nrecvs_; i++ )
00447     total_recv_length_ += lengths_from_[i];
00448 
00449   nrecvs_ -= self_msg_;
00450 
00451   MPI_Barrier( comm_ );
00452   
00453   return false;
00454 }
00455 
00456 //==============================================================================
00457 //---------------------------------------------------------------------------
00458 //ComputeSends Method
00459 //---------------------------------------------------------------------------
00460 int Epetra_MpiDistributor::ComputeSends_( int num_imports,
00461         const int *& import_ids,
00462         const int *& import_procs,
00463         int & num_exports,
00464         int *& export_ids,
00465         int *& export_procs,
00466         int my_proc ) {
00467  
00468   Epetra_MpiDistributor tmp_plan(*epComm_);
00469   int i;
00470 
00471   int * proc_list = 0;
00472   int * import_objs = 0;
00473   char * c_export_objs = 0;
00474 
00475   if( num_imports > 0 )
00476   {
00477     proc_list = new int[ num_imports ];
00478     import_objs = new int[ 2 * num_imports ];
00479 
00480     for( i = 0; i < num_imports; i++ )
00481     {
00482       proc_list[i] = import_procs[i];
00483 
00484       import_objs[2*i] = import_ids[i];
00485       import_objs[2*i+1] = my_proc;
00486     }
00487   }
00488 
00489   EPETRA_CHK_ERR(tmp_plan.CreateFromSends( num_imports, proc_list,
00490              true, num_exports) );
00491   if( num_exports > 0 )
00492   {
00493     //export_objs = new int[ 2 * num_exports ];
00494     export_ids = new int[ num_exports ];
00495     export_procs = new int[ num_exports ];
00496   }
00497   else
00498   {
00499     export_ids = 0;
00500     export_procs = 0;
00501   }
00502 
00503   int len_c_export_objs = 0;
00504   EPETRA_CHK_ERR( tmp_plan.Do(reinterpret_cast<char *> (import_objs),
00505             2 * (int)sizeof( int ), 
00506             len_c_export_objs,
00507             c_export_objs) );
00508   int * export_objs = reinterpret_cast<int *>(c_export_objs);
00509 
00510   for( i = 0; i < num_exports; i++ ) {
00511     export_ids[i] = export_objs[2*i];
00512     export_procs[i] = export_objs[2*i+1];
00513   }
00514 
00515   if( proc_list != 0 ) delete [] proc_list;
00516   if( import_objs != 0 ) delete [] import_objs;
00517   if( len_c_export_objs != 0 ) delete [] c_export_objs;
00518 
00519   return(0);
00520 
00521 }
00522 
00523 //==============================================================================
00524 // Do method
00525 int Epetra_MpiDistributor::Do( char * export_objs,
00526                                int obj_size,
00527                                int & len_import_objs,
00528                                char *& import_objs )
00529 {
00530   EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, len_import_objs, import_objs) );
00531   EPETRA_CHK_ERR( DoWaits() );
00532   return(0);
00533 }
00534 
00535 //==============================================================================
00536 // DoReverse method
00537 int Epetra_MpiDistributor::DoReverse( char * export_objs,
00538                                       int obj_size,
00539                                       int & len_import_objs,
00540                                       char *& import_objs )
00541 {
00542   EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size,
00543          len_import_objs, import_objs) );
00544   EPETRA_CHK_ERR( DoReverseWaits() );
00545   return(0);
00546 }
00547 //==============================================================================
00548 //---------------------------------------------------------------------------
00549 //Do_Posts Method
00550 //---------------------------------------------------------------------------
00551 int Epetra_MpiDistributor::DoPosts( char * export_objs,
00552             int obj_size,
00553                                     int & len_import_objs,
00554             char *& import_objs )
00555 {
00556   int i, j, k;
00557 
00558   int my_proc = 0;
00559   int self_recv_address = 0;
00560 
00561   MPI_Comm_rank( comm_, &my_proc );
00562 
00563   if( len_import_objs < (total_recv_length_*obj_size) )
00564   {
00565     if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
00566     len_import_objs = total_recv_length_*obj_size;
00567     if (len_import_objs>0) import_objs = new char[len_import_objs];
00568     for( i=0; i<len_import_objs; ++i ) import_objs[i]=0;
00569   }
00570 
00571   k = 0;
00572 
00573   j = 0;
00574   for( i = 0; i < (nrecvs_+self_msg_); i++ )
00575   {
00576     if( procs_from_[i] != my_proc )
00577     {
00578       MPI_Irecv( &(import_objs[j]),
00579                  lengths_from_[i] * obj_size,
00580                  MPI_CHAR, procs_from_[i],
00581                  tag_, comm_,
00582                  &(request_[k]) );
00583       k++;
00584     }
00585     else
00586       self_recv_address = j;
00587 
00588     j += lengths_from_[i] * obj_size;
00589   }
00590 
00591   MPI_Barrier( comm_ );
00592 
00593   //setup scan through procs_to list starting w/ higher numbered procs 
00594   //Should help balance msg traffic
00595   int nblocks = nsends_ + self_msg_; 
00596   int proc_index = 0; 
00597   while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
00598     ++proc_index;                    
00599   if( proc_index == nblocks ) proc_index = 0;
00600    
00601   int self_num, self_index;
00602   int p;
00603 
00604   if( !indices_to_ ) //data already blocked by processor
00605   {
00606     for( i = 0; i < nblocks; ++i )
00607     {
00608       p = i + proc_index;
00609       if( p > (nblocks-1) ) p -= nblocks;
00610 
00611       if( procs_to_[p] != my_proc )
00612         MPI_Rsend( &export_objs[starts_to_[p]*obj_size],
00613                    lengths_to_[p]*obj_size,
00614                    MPI_CHAR,
00615                    procs_to_[p],
00616                    tag_,
00617                    comm_ );
00618       else
00619        self_num = p;
00620     }
00621 
00622     if( self_msg_ )
00623       memcpy( &import_objs[self_recv_address],
00624               &export_objs[starts_to_[self_num]*obj_size],
00625               lengths_to_[self_num]*obj_size );
00626   }
00627   else //data not blocked by proc, use send buffer
00628   {
00629     if( send_array_size_ < (max_send_length_*obj_size) )
00630     {
00631       if( send_array_!=0 ) {delete [] send_array_; send_array_ = 0;}
00632       send_array_size_ = max_send_length_*obj_size;
00633       if (send_array_size_>0) send_array_ = new char[send_array_size_];
00634     }
00635 
00636     j = 0;
00637     for( i = 0; i < nblocks; i++ )
00638     {
00639       p = i + proc_index;
00640       if( p > (nblocks-1) ) p -= nblocks;
00641       if( procs_to_[p] != my_proc )
00642       {
00643         int offset = 0;
00644         j = starts_to_[p];
00645         for( k = 0; k < lengths_to_[p]; k++ )
00646         {
00647           memcpy( &(send_array_[offset]), 
00648                   &(export_objs[indices_to_[j]*obj_size]),
00649                   obj_size );
00650           ++j;
00651           offset += obj_size;
00652         }
00653         MPI_Rsend( send_array_,
00654                    lengths_to_[p] * obj_size,
00655                    MPI_CHAR,
00656                    procs_to_[p],
00657                    tag_, comm_ );
00658       }
00659       else
00660       {
00661         self_num = p;
00662         self_index = starts_to_[p];
00663       }
00664     }
00665 
00666     if( self_msg_ )
00667       for( k = 0; k < lengths_to_[self_num]; k++ )
00668       {
00669         memcpy( &(import_objs[self_recv_address]),
00670                 &(export_objs[indices_to_[self_index]*obj_size]),
00671                 obj_size );
00672         self_index++;
00673         self_recv_address += obj_size;
00674       }
00675   }
00676 
00677   return(0);
00678 }
00679 //==============================================================================
00680 //---------------------------------------------------------------------------
00681 //Do_Waits Method
00682 //---------------------------------------------------------------------------
00683 int Epetra_MpiDistributor::DoWaits()
00684 {
00685   if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
00686 
00687   return(0);
00688 }
00689 
00690 //==============================================================================
00691 //---------------------------------------------------------------------------
00692 //DoReverse_Posts Method
00693 //---------------------------------------------------------------------------
00694 int Epetra_MpiDistributor::DoReversePosts( char * export_objs,
00695                                            int obj_size,
00696                                            int & len_import_objs,
00697                                            char *& import_objs )
00698 {
00699   assert(indices_to_==0); //Can only do reverse comm when original data
00700                           // is blocked by processor
00701 
00702   int i;
00703   int my_proc = 0;
00704 
00705   MPI_Comm_rank( comm_, &my_proc );
00706 
00707   if( comm_plan_reverse_ == 0 )
00708   {
00709     int total_send_length = 0;
00710     for( i = 0; i < nsends_+self_msg_; i++ )
00711       total_send_length += lengths_to_[i];
00712 
00713     int max_recv_length = 0;
00714     for( i = 0; i < nrecvs_; i++ )
00715       if( procs_from_[i] != my_proc )
00716         if( lengths_from_[i] > max_recv_length )
00717           max_recv_length = lengths_from_[i];
00718 
00719     comm_plan_reverse_ = new Epetra_MpiDistributor(*epComm_);
00720 
00721     comm_plan_reverse_->lengths_to_ = lengths_from_;
00722     comm_plan_reverse_->procs_to_ = procs_from_;
00723     comm_plan_reverse_->indices_to_ = indices_from_;
00724     comm_plan_reverse_->starts_to_ = starts_from_;
00725 
00726     comm_plan_reverse_->lengths_from_ = lengths_to_;
00727     comm_plan_reverse_->procs_from_ = procs_to_;
00728     comm_plan_reverse_->indices_from_ = indices_to_;
00729     comm_plan_reverse_->starts_from_ = starts_to_;
00730 
00731     comm_plan_reverse_->nsends_ = nrecvs_;
00732     comm_plan_reverse_->nrecvs_ = nsends_;
00733     comm_plan_reverse_->self_msg_ = self_msg_;
00734 
00735     comm_plan_reverse_->max_send_length_ = max_recv_length;
00736     comm_plan_reverse_->total_recv_length_ = total_send_length;
00737 
00738     comm_plan_reverse_->request_ = new MPI_Request[ comm_plan_reverse_->nrecvs_ ];
00739     comm_plan_reverse_->status_= new MPI_Status[ comm_plan_reverse_->nrecvs_ ];
00740 
00741     comm_plan_reverse_->no_delete_ = true;
00742   }
00743 
00744   int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, len_import_objs, import_objs);
00745 
00746   return(comm_flag);
00747 }
00748 
00749 //==============================================================================
00750 //---------------------------------------------------------------------------
00751 //DoReverse_Waits Method
00752 //---------------------------------------------------------------------------
00753 int Epetra_MpiDistributor::DoReverseWaits()
00754 {
00755   if( comm_plan_reverse_ == 0 ) return (-1);
00756 
00757   int comm_flag = comm_plan_reverse_->DoWaits();
00758 
00759   return(comm_flag);
00760 }
00761 
00762 //==============================================================================
00763 //---------------------------------------------------------------------------
00764 //Resize Method                      (Heaphy) 
00765 //---------------------------------------------------------------------------
00766 int Epetra_MpiDistributor::Resize_( int * sizes )
00767 { 
00768   int  i, j, k;         // loop counters
00769   int sum; 
00770  
00771   //if (sizes == 0) return 0; 
00772      
00773   int my_proc; 
00774   MPI_Comm_rank (comm_, &my_proc);  
00775   int nprocs;
00776   MPI_Comm_size( comm_, &nprocs );
00777      
00778   if( resized_ )
00779   {  
00780     //test and see if we are already setup for these sizes
00781     bool match = true; 
00782     for( i = 0; i < nexports_; ++i )
00783       match = match && (sizes_[i]==sizes[i]);
00784     int matched = match?1:0;
00785     int match_count = 0;
00786     MPI_Allreduce( &matched, &match_count, 1, MPI_INT, MPI_SUM, comm_ );
00787     if( match_count == nprocs )
00788       return 0;
00789     else //reset existing sizing arrays
00790       max_send_length_ = 0;
00791   } 
00792  
00793   if( !sizes_ && nexports_ ) sizes_ = new int[nexports_]; 
00794   for (i = 0; i < nexports_; i++)
00795     sizes_[i] = sizes[i]; 
00796  
00797   if( !sizes_to_ && (nsends_+self_msg_) ) sizes_to_ = new int[nsends_+self_msg_];
00798   for (i = 0; i < (nsends_+self_msg_); ++i) 
00799     sizes_to_[i] = 0;                       
00800  
00801   if( !starts_to_ptr_ && (nsends_+self_msg_) ) starts_to_ptr_ = new int[nsends_+self_msg_];
00802 
00803   if( !indices_to_ ) //blocked sends
00804   {
00805     
00806     int * index = 0;
00807     int * sort_val = 0;
00808     if (nsends_+self_msg_>0) {
00809       index = new int[nsends_+self_msg_];
00810       sort_val = new int[nsends_+self_msg_];
00811     }
00812     for( i = 0; i < (nsends_+self_msg_); ++i )
00813     {
00814       j = starts_to_[i];
00815       for( k = 0; k < lengths_to_[i]; ++k )
00816         sizes_to_[i] += sizes[j++];
00817       if( (sizes_to_[i] > max_send_length_) && (procs_to_[i] != my_proc) )
00818         max_send_length_ = sizes_to_[i];
00819     }
00820 
00821     for( i = 0; i < (nsends_+self_msg_); ++i )
00822     {
00823       sort_val[i] = starts_to_[i];
00824       index[i] = i;
00825     }
00826 
00827     if( nsends_+self_msg_ )
00828       Sort_ints_( sort_val, index, (nsends_+self_msg_) );
00829 
00830     sum = 0;
00831     for( i = 0; i < (nsends_+self_msg_); ++i )
00832     {
00833       starts_to_ptr_[ index[i] ] = sum;
00834       sum += sizes_to_[ index[i] ];
00835     }
00836 
00837     if (index!=0) {delete [] index; index = 0;}
00838     if (sort_val!=0) {delete [] sort_val; sort_val = 0;}
00839   }
00840   else //Sends not blocked, so have to do more work
00841   {
00842     if( !indices_to_ptr_ && nexports_ ) indices_to_ptr_ = new int[nexports_]; 
00843     int * offset = 0;
00844     if( nexports_ ) offset = new int[nexports_];
00845    
00846     //Compute address for every item in send array
00847     sum = 0; 
00848     for( i = 0; i < nexports_; ++i )
00849     {  
00850       offset[i] = sum; 
00851       sum += sizes_[i];
00852     }
00853    
00854     sum = 0;
00855     max_send_length_ = 0;
00856     for( i = 0; i < (nsends_+self_msg_); ++i )
00857     {
00858       starts_to_ptr_[i] = sum;
00859       for( j = starts_to_[i]; j < (starts_to_[i]+lengths_to_[i]); ++j )
00860       {
00861         indices_to_ptr_[j] = offset[ indices_to_[j] ];
00862         sizes_to_[i] += sizes_[ indices_to_[j] ];
00863       }
00864       if( sizes_to_[i] > max_send_length_ && procs_to_[i] != my_proc )
00865         max_send_length_ = sizes_to_[i];
00866       sum += sizes_to_[i];
00867     }
00868  
00869     if (offset!=0) {delete [] offset; offset = 0;}
00870   }
00871  
00872   //  Exchange sizes routine inserted here:
00873   int self_index_to = -1;
00874   for (i = 0; i < (nsends_+self_msg_); i++)
00875   {
00876     if(procs_to_[i] != my_proc)
00877       MPI_Send ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
00878     else
00879       self_index_to = i;
00880   }
00881 
00882   total_recv_length_ = 0;
00883   MPI_Status status;
00884   if( !sizes_from_ && (nrecvs_+self_msg_) ) sizes_from_ = new int [nrecvs_+self_msg_];
00885   for (i = 0; i < (nrecvs_+self_msg_); ++i)
00886   {
00887     sizes_from_[i] = 0;
00888     if (procs_from_[i] != my_proc)
00889       MPI_Recv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &status);
00890     else
00891       sizes_from_[i] = sizes_to_[self_index_to];
00892     total_recv_length_ += sizes_from_[i];
00893   }
00894   // end of exchanges sizes insert
00895  
00896   sum = 0;
00897   if( !starts_from_ptr_ ) starts_from_ptr_  = new int[nrecvs_+self_msg_]; 
00898   for (i = 0; i < (nrecvs_+self_msg_); ++i)
00899   {
00900      starts_from_ptr_[i] = sum;
00901      sum += sizes_from_[i];
00902   }
00903 
00904   resized_ = true;
00905 
00906   return 0;
00907 }
00908 
00909 //==============================================================================
00910 // GSComm_Comm Do method
00911 int Epetra_MpiDistributor::Do( char * export_objs,
00912                                int obj_size,
00913                                int *& sizes,
00914                                int & len_import_objs,
00915                                char *& import_objs )
00916 {
00917   EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, sizes,
00918         len_import_objs, import_objs) );
00919   EPETRA_CHK_ERR( DoWaits() );
00920 
00921   return(0);
00922 }
00923 
00924 //==============================================================================
00925 // GSComm_Comm DoReverse method
00926 int Epetra_MpiDistributor::DoReverse( char * export_objs,
00927                                       int obj_size,
00928                                       int *& sizes,
00929                                       int & len_import_objs,
00930                                       char *& import_objs )
00931 {
00932   EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size, sizes,
00933          len_import_objs, import_objs) );
00934   EPETRA_CHK_ERR( DoReverseWaits() );
00935 
00936   return(0);
00937 }
00938 //==============================================================================
00939 //---------------------------------------------------------------------------
00940 //Do_Posts Method (Variable Block Size)
00941 //---------------------------------------------------------------------------
00942 int Epetra_MpiDistributor::DoPosts( char * export_objs,
00943                                     int obj_size,
00944                                     int *& sizes,
00945                                     int & len_import_objs,
00946                                     char *& import_objs )
00947 {
00948   int ierr = Resize_(sizes);
00949   if (ierr != 0) {
00950     return(ierr);
00951   }
00952 
00953   MPI_Barrier( comm_ );
00954 
00955   int i, j, k;
00956 
00957   int my_proc = 0;
00958   int self_recv_address = 0;
00959 
00960   MPI_Comm_rank( comm_, &my_proc );
00961 
00962   if( len_import_objs < (total_recv_length_*obj_size) )
00963   {
00964     if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
00965     len_import_objs = total_recv_length_*obj_size;
00966     if (len_import_objs>0) import_objs = new char[len_import_objs];
00967   }
00968 
00969   k = 0;
00970 
00971   for( i = 0; i < (nrecvs_+self_msg_); ++i )
00972   {
00973     if( procs_from_[i] != my_proc )
00974     {
00975       MPI_Irecv( &(import_objs[starts_from_ptr_[i] * obj_size]),
00976                  sizes_from_[i] * obj_size,
00977                  MPI_CHAR, procs_from_[i], tag_, comm_, &(request_[k]) );
00978       k++;
00979     }
00980     else
00981       self_recv_address = starts_from_ptr_[i] * obj_size;
00982   }
00983 
00984   MPI_Barrier( comm_ );
00985 
00986   //setup scan through procs_to list starting w/ higher numbered procs 
00987   //Should help balance msg traffic
00988   int nblocks = nsends_ + self_msg_; 
00989   int proc_index = 0; 
00990   while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
00991     ++proc_index;                    
00992   if( proc_index == nblocks ) proc_index = 0;
00993    
00994   int self_num;
00995   int p;
00996 
00997   if( !indices_to_ ) //data already blocked by processor
00998   {
00999     for( i = 0; i < nblocks; ++i )
01000     {
01001       p = i + proc_index;
01002       if( p > (nblocks-1) ) p -= nblocks;
01003 
01004       if( procs_to_[p] != my_proc )
01005         MPI_Rsend( &export_objs[starts_to_ptr_[p]*obj_size],
01006                    sizes_to_[p]*obj_size,
01007                    MPI_CHAR,
01008                    procs_to_[p],
01009                    tag_,
01010                    comm_ );
01011       else
01012         self_num = p;
01013     }
01014 
01015     if( self_msg_ )
01016       memcpy( &import_objs[self_recv_address],
01017               &export_objs[starts_to_ptr_[self_num]*obj_size],
01018               sizes_to_[self_num]*obj_size );
01019   }
01020   else //data not blocked by proc, need to copy to buffer
01021   {
01022     if( send_array_size_ && send_array_size_ < (max_send_length_*obj_size) )
01023     {
01024       if (send_array_!=0) {delete [] send_array_; send_array_ = 0;}
01025       send_array_ = 0;
01026       send_array_size_ = 0;
01027     }
01028     if( !send_array_size_ )
01029     {
01030       send_array_size_ = max_send_length_*obj_size;
01031       if (send_array_size_>0) send_array_ = new char[send_array_size_];
01032     }
01033 
01034     for( i=0; i<nblocks; ++i )
01035     {
01036       p = i + proc_index;
01037       if( p > (nblocks-1) ) p -= nblocks;
01038 
01039       if( procs_to_[p] != my_proc )
01040       {
01041         int offset = 0;
01042         j = starts_to_[p];
01043         for( k=0; k<lengths_to_[p]; ++k )
01044         {
01045           memcpy( &send_array_[offset],
01046                   &export_objs[indices_to_ptr_[j]*obj_size],
01047                   sizes_[indices_to_[j]]*obj_size );
01048           offset += sizes_[indices_to_[j]]*obj_size;
01049           ++j;
01050         }
01051         MPI_Rsend( send_array_, sizes_to_[p]*obj_size,
01052                    MPI_CHAR, procs_to_[p], tag_, comm_ );
01053       }
01054       else
01055         self_num = p;
01056     }
01057 
01058     if( self_msg_ )
01059     {
01060       int jj;
01061       j = starts_to_[self_num];
01062       for( k=0; k<lengths_to_[self_num]; ++k )
01063       {
01064         jj = indices_to_ptr_[j];
01065         memcpy( &import_objs[self_recv_address],
01066                 &export_objs[jj*obj_size],
01067                 sizes_[indices_to_[j]*obj_size] );
01068         self_recv_address += (obj_size*sizes_[indices_to_[j]]);
01069         ++jj;
01070       }
01071     }
01072   }
01073 
01074   return(0);
01075 }
01076 
01077 //==============================================================================
01078 //---------------------------------------------------------------------------
01079 //DoReverse_Posts Method
01080 //---------------------------------------------------------------------------
01081 int Epetra_MpiDistributor::DoReversePosts( char * export_objs,
01082                    int obj_size,
01083                                            int *& sizes,
01084                                            int & len_import_objs,
01085                    char *& import_objs )
01086 {
01087   assert(indices_to_==0); //Can only do reverse comm when original data
01088                           // is blocked by processor
01089 
01090   int i;
01091   int my_proc = 0;
01092 
01093   MPI_Comm_rank( comm_, &my_proc );
01094 
01095   if( comm_plan_reverse_ == 0 )
01096   {
01097     int total_send_length = 0;
01098     for( i = 0; i < nsends_+self_msg_; i++ )
01099       total_send_length += lengths_to_[i];
01100 
01101     int max_recv_length = 0;
01102     for( i = 0; i < nrecvs_; i++ )
01103       if( procs_from_[i] != my_proc )
01104         if( lengths_from_[i] > max_recv_length )
01105           max_recv_length = lengths_from_[i];
01106 
01107     comm_plan_reverse_ = new Epetra_MpiDistributor(*epComm_);
01108 
01109     comm_plan_reverse_->lengths_to_ = lengths_from_;
01110     comm_plan_reverse_->procs_to_ = procs_from_;
01111     comm_plan_reverse_->indices_to_ = indices_from_;
01112     comm_plan_reverse_->starts_to_ = starts_from_;
01113 
01114     comm_plan_reverse_->lengths_from_ = lengths_to_;
01115     comm_plan_reverse_->procs_from_ = procs_to_;
01116     comm_plan_reverse_->indices_from_ = indices_to_;
01117     comm_plan_reverse_->starts_from_ = starts_to_;
01118 
01119     comm_plan_reverse_->nsends_ = nrecvs_;
01120     comm_plan_reverse_->nrecvs_ = nsends_;
01121     comm_plan_reverse_->self_msg_ = self_msg_;
01122 
01123     comm_plan_reverse_->max_send_length_ = max_recv_length;
01124     comm_plan_reverse_->total_recv_length_ = total_send_length;
01125 
01126     comm_plan_reverse_->request_ = new MPI_Request[ comm_plan_reverse_->nrecvs_ ];
01127     comm_plan_reverse_->status_= new MPI_Status[ comm_plan_reverse_->nrecvs_ ];
01128 
01129     comm_plan_reverse_->no_delete_ = true;
01130   }
01131 
01132   int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, sizes, len_import_objs, import_objs);
01133 
01134   return(comm_flag);
01135 }
01136 
01137 //==============================================================================
01138 void Epetra_MpiDistributor::Print( ostream & os) const
01139 {
01140   int i, j;
01141   os << "nsends: " << nsends_ << endl;
01142   os << "procs_to: ";
01143   for( i = 0; i < nsends_; i++ )
01144     os << " " << procs_to_[i];
01145   os << endl;
01146   os<< "lengths_to: ";
01147   for( i = 0; i < nsends_; i++ )
01148     os << " " << lengths_to_[i];
01149   os << endl;
01150   os << "indices_to: ";
01151   int k = 0;
01152   if( indices_to_ )
01153   {
01154     for( i = 0; i < nsends_; i++ )
01155     {
01156       for( j = 0; j < lengths_to_[i]; j++ )
01157         os << " " << indices_to_[j+k];
01158       k += lengths_to_[i];
01159     }
01160   }
01161   os << endl;
01162   os << "nrecvs: " << nrecvs_ << endl;
01163   os << "procs_from: ";
01164   for( i = 0; i < nrecvs_; i++ )
01165     os << " " << procs_from_[i];
01166   os << endl;
01167   os << "lengths_from: ";
01168   for( i = 0; i < nrecvs_; i++ )
01169     os << " " << lengths_from_[i];
01170   os << endl;
01171 /*
01172   os << "indices_from: ";
01173   k = 0;
01174   for( i = 0; i < nrecvs_; i++ )
01175   {
01176     for( j = 0; j < lengths_from_[i]; j++ )
01177       os << " " << indices_from_[j+k];
01178     k += lengths_from_[i];
01179   }
01180 */
01181   os << "self_msg: " << self_msg_ << endl;
01182   os << "max_send_length: " << max_send_length_ << endl;
01183   os << "total_recv_length: " << total_recv_length_ << endl;
01184   os << endl;
01185 
01186   return;
01187 }
01188 
01189 //---------------------------------------------------------------------------
01190 int Epetra_MpiDistributor::Sort_ints_(
01191  int *vals_sort,     //  values to be sorted  
01192  int *vals_other,    // other array to be reordered with sort
01193  int  nvals)         // length of these two arrays
01194 {
01195 // It is primarily used to sort messages to improve communication flow. 
01196 // This routine will also insure that the ordering produced by the invert_map
01197 // routines is deterministic.  This should make bugs more reproducible.  This
01198 // is accomplished by sorting the message lists by processor ID.
01199 // This is a distribution count sort algorithm (see Knuth)
01200 //  This version assumes non negative integers. 
01201    
01202     if (nvals <= 1) return 0;
01203         
01204     int i;                        // loop counter        
01205      
01206     // find largest int, n, to size sorting array, then allocate and clear it
01207     int n = 0;  
01208     for (i = 0; i < nvals; i++)  
01209        if (n < vals_sort[i]) n = vals_sort[i]; 
01210     int *pos = new int [n+2];  
01211     for (i = 0; i < n+2; i++) pos[i] = 0;
01212  
01213     // copy input arrays into temporary copies to allow sorting original arrays
01214     int *copy_sort  = new int [nvals]; 
01215     int *copy_other = new int [nvals]; 
01216     for (i = 0; i < nvals; i++)
01217     { 
01218       copy_sort[i]  = vals_sort[i]; 
01219       copy_other[i] = vals_other[i];  
01220     }                           
01221  
01222     // count the occurances of integers ("distribution count")
01223     int *p = pos+1;
01224     for (i = 0; i < nvals; i++) p[copy_sort[i]]++;
01225  
01226     // create the partial sum of distribution counts 
01227     for (i = 1; i < n; i++) p[i] += p[i-1]; 
01228  
01229     // the shifted partitial sum is the index to store the data  in sort order
01230     p = pos; 
01231     for (i = 0; i < nvals; i++)         
01232     {                                   
01233       vals_sort  [p[copy_sort [i]]]   = copy_sort[i];
01234       vals_other [p[copy_sort [i]]++] = copy_other[i]; 
01235     } 
01236  
01237     delete [] copy_sort;
01238     delete [] copy_other; 
01239     delete [] pos; 
01240  
01241     return 0;
01242 }
01243 
01244 //-------------------------------------------------------------------------
01245 Epetra_MpiDistributor& Epetra_MpiDistributor::operator=(const Epetra_MpiDistributor& src)
01246 {
01247   (void)src;
01248   //not currently supported
01249   bool throw_error = true;
01250   if (throw_error) {
01251     throw ReportError("Epetra_MpiDistributor::operator= not supported.",-1);
01252   }
01253   return( *this );
01254 }

Generated on Thu Sep 18 12:37:58 2008 for Epetra Package Browser (Single Doxygen Collection) by doxygen 1.3.9.1