Epetra Package Browser (Single Doxygen Collection) Development
Epetra_MpiDistributor.cpp
Go to the documentation of this file.
00001 //@HEADER
00002 // ************************************************************************
00003 //
00004 //               Epetra: Linear Algebra Services Package
00005 //                 Copyright 2011 Sandia Corporation
00006 //
00007 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00008 // the U.S. Government retains certain rights in this software.
00009 //
00010 // Redistribution and use in source and binary forms, with or without
00011 // modification, are permitted provided that the following conditions are
00012 // met:
00013 //
00014 // 1. Redistributions of source code must retain the above copyright
00015 // notice, this list of conditions and the following disclaimer.
00016 //
00017 // 2. Redistributions in binary form must reproduce the above copyright
00018 // notice, this list of conditions and the following disclaimer in the
00019 // documentation and/or other materials provided with the distribution.
00020 //
00021 // 3. Neither the name of the Corporation nor the names of the
00022 // contributors may be used to endorse or promote products derived from
00023 // this software without specific prior written permission.
00024 //
00025 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00026 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00027 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00028 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00029 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00030 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00031 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00032 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00033 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00034 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00035 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00036 //
00037 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
00038 //
00039 // ************************************************************************
00040 //@HEADER
00041 
00042 #include "Epetra_MpiDistributor.h"
00043 #include "Epetra_MpiComm.h"
00044 
00045 #include <stdexcept>
00046 #include <vector>
00047 
00048 //==============================================================================
00049 Epetra_MpiDistributor::Epetra_MpiDistributor(const Epetra_MpiComm & Comm):
00050   Epetra_Object("Epetra::MpiDistributor"),
00051   lengths_to_(0),
00052   procs_to_(0),
00053   indices_to_(0),
00054   size_indices_to_(0),
00055   lengths_from_(0),
00056   procs_from_(0),
00057   indices_from_(0),
00058   size_indices_from_(0),
00059   resized_(false),
00060   sizes_(0),
00061   sizes_to_(0),
00062   starts_to_(0),
00063   starts_to_ptr_(0),
00064   indices_to_ptr_(0),
00065   sizes_from_(0),
00066   starts_from_(0),
00067   starts_from_ptr_(0),
00068   indices_from_ptr_(0),
00069   nrecvs_(0),
00070   nsends_(0),
00071   nexports_(0),
00072   self_msg_(0),
00073   max_send_length_(0),
00074   total_recv_length_(0),
00075   tag_(Comm.GetMpiTag()),
00076   epComm_(&Comm),
00077   comm_(Comm.GetMpiComm()),
00078   request_(0),
00079   status_(0),
00080   no_delete_(false),
00081   send_array_(0),
00082   send_array_size_(0),
00083   comm_plan_reverse_(0)
00084 {
00085 }
00086 
00087 //==============================================================================
00088 Epetra_MpiDistributor::Epetra_MpiDistributor(const Epetra_MpiDistributor & Distributor):
00089   Epetra_Object("Epetra::MpiDistributor"),
00090   lengths_to_(0),
00091   procs_to_(0),
00092   indices_to_(0),
00093   size_indices_to_(Distributor.size_indices_to_),
00094   lengths_from_(0),
00095   procs_from_(0),
00096   indices_from_(0),
00097   size_indices_from_(Distributor.size_indices_from_),
00098   resized_(false),
00099   sizes_(0),
00100   sizes_to_(0),
00101   starts_to_(0),
00102   starts_to_ptr_(0),
00103   indices_to_ptr_(0),
00104   sizes_from_(0),
00105   starts_from_(0),
00106   starts_from_ptr_(0),
00107   indices_from_ptr_(0),
00108   nrecvs_(Distributor.nrecvs_),
00109   nsends_(Distributor.nsends_),
00110   nexports_(Distributor.nexports_),
00111   self_msg_(Distributor.self_msg_),
00112   max_send_length_(Distributor.max_send_length_),
00113   total_recv_length_(Distributor.total_recv_length_),
00114   tag_(Distributor.tag_),
00115   epComm_(Distributor.epComm_),
00116   comm_(Distributor.comm_),
00117   request_(0),
00118   status_(0),
00119   no_delete_(Distributor.no_delete_),
00120   send_array_(0),
00121   send_array_size_(0),
00122   comm_plan_reverse_(0)
00123 {
00124   //CMS
00125   int i;
00126   if (nsends_>0) {
00127     lengths_to_ = new int[nsends_];
00128     procs_to_   = new int[nsends_];
00129     starts_to_  = new int[nsends_];
00130     for (i=0; i<nsends_; i++) {
00131       lengths_to_[i] = Distributor.lengths_to_[i];
00132       procs_to_[i]   = Distributor.procs_to_[i];
00133       starts_to_[i]  = Distributor.starts_to_[i];
00134     }
00135   }
00136   if (size_indices_to_>0) {
00137     indices_to_ = new int[size_indices_to_];
00138     for (i=0; i<size_indices_to_; i++) {
00139       indices_to_[i] = Distributor.indices_to_[i];
00140     }
00141     indices_to_ptr_ = new int[nexports_];
00142     for (i=0; i<nexports_; i++) {
00143       indices_to_ptr_[i] = Distributor.indices_to_ptr_[i];
00144     }
00145   }
00146 
00147   if( nsends_+self_msg_ > 0 )  {
00148     if(Distributor.sizes_to_)      sizes_to_      = new int[nsends_+self_msg_];
00149     if(Distributor.starts_to_ptr_) starts_to_ptr_ = new int[nsends_+self_msg_];
00150     for(i=0;i<nsends_+self_msg_; i++)  {
00151       if(Distributor.sizes_to_)       sizes_to_[i]      = Distributor.sizes_to_[i];
00152        if(Distributor.starts_to_ptr_) starts_to_ptr_[i] = Distributor.starts_to_ptr_[i];
00153     }
00154   }
00155 
00156   if (nrecvs_>0) {
00157     lengths_from_ = new int[nrecvs_];
00158     procs_from_   = new int[nrecvs_];
00159     starts_from_  = new int[nrecvs_];
00160     request_ = new MPI_Request[ nrecvs_ ];
00161     status_ = new MPI_Status[ nrecvs_ ];
00162     for (i=0; i<nrecvs_; i++) {
00163       lengths_from_[i]  = Distributor.lengths_from_[i];
00164       procs_from_[i]    = Distributor.procs_from_[i];
00165       starts_from_[i]   = Distributor.starts_from_[i];
00166     }
00167   }
00168 
00169   if (size_indices_from_>0) {
00170     indices_from_ = new int[size_indices_from_];
00171     for (i=0; i<size_indices_from_; i++) {
00172       indices_from_[i] = Distributor.indices_from_[i];
00173     }
00174   }
00175 
00176   if( nrecvs_+self_msg_ > 0 ) {
00177     if(Distributor.sizes_from_)      sizes_from_      = new int [nrecvs_+self_msg_];
00178     if(Distributor.starts_from_ptr_) starts_from_ptr_ = new int [nrecvs_+self_msg_];
00179     for(i=0;i<nrecvs_+self_msg_; i++)  {
00180       if(Distributor.sizes_from_)      sizes_from_[i]      = Distributor.sizes_from_[i];
00181       if(Distributor.starts_from_ptr_) starts_from_ptr_[i] = Distributor.starts_from_ptr_[i];
00182     }
00183   }
00184 
00185   // Note: indices_from_ptr_ is always length zero...
00186 
00187 }
00188 
00189 //==============================================================================
00190 Epetra_MpiDistributor::~Epetra_MpiDistributor()
00191 {
00192   if( !no_delete_ )
00193   {
00194     if( lengths_to_ != 0 ) delete [] lengths_to_;
00195     if( procs_to_ != 0 ) delete [] procs_to_;
00196     if( indices_to_ != 0 ) delete [] indices_to_;
00197     if( lengths_from_ != 0 ) delete [] lengths_from_;
00198     if( procs_from_ != 0 ) delete [] procs_from_;
00199     if( indices_from_ != 0 ) delete [] indices_from_;
00200     if( starts_to_ != 0 ) delete [] starts_to_;
00201     if( starts_from_ != 0 ) delete [] starts_from_;
00202   }
00203 
00204   if( sizes_ != 0 ) delete [] sizes_;
00205   if( sizes_to_ != 0 ) delete [] sizes_to_;
00206   if( sizes_from_ != 0 ) delete [] sizes_from_;
00207   if( starts_to_ptr_ != 0 ) delete [] starts_to_ptr_;
00208   if( starts_from_ptr_ != 0 ) delete [] starts_from_ptr_;
00209   if( indices_to_ptr_ != 0 ) delete [] indices_to_ptr_;
00210   if( indices_from_ptr_ != 0 ) delete [] indices_from_ptr_;
00211 
00212   if( request_ != 0 ) delete [] request_;
00213   if( status_ != 0 ) delete [] status_;
00214 
00215   if( send_array_size_ != 0 ) { delete [] send_array_; send_array_size_ = 0; }
00216 
00217   if( comm_plan_reverse_ != 0 ) delete comm_plan_reverse_;
00218 }
00219 
00220 
00221 //==============================================================================
00222 int Epetra_MpiDistributor::CreateFromSends( const int & NumExportIDs,
00223                                             const int * ExportPIDs,
00224                                             bool Deterministic,
00225                                             int & NumRemoteIDs )
00226 {
00227  (void)Deterministic; // Prevent compiler warnings for unused argument.
00228   int my_proc;
00229   MPI_Comm_rank( comm_, &my_proc );
00230 
00231   int nprocs;
00232   MPI_Comm_size( comm_, &nprocs );
00233 
00234   // Do the forward map component
00235   CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
00236 
00237   //Invert map to see what msgs are received and what length
00238   EPETRA_CHK_ERR( ComputeRecvs_( my_proc, nprocs ) );
00239 
00240   if (nrecvs_>0) {
00241     if( !request_ ) {
00242       request_ = new MPI_Request[ nrecvs_ ];
00243       status_ = new MPI_Status[ nrecvs_ ];
00244     }
00245   }
00246 
00247   NumRemoteIDs = total_recv_length_;
00248 
00249   return 0;
00250 }
00251 
00252 //==============================================================================
00253 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
00254            const int * RemoteGIDs,
00255                  const int * RemotePIDs,
00256            bool Deterministic,
00257                  int & NumExportIDs,
00258            int *& ExportGIDs,
00259            int *& ExportPIDs )
00260 {
00261   int my_proc;
00262   MPI_Comm_rank( comm_, &my_proc );
00263 
00264   int nprocs;
00265   MPI_Comm_size( comm_, &nprocs );
00266 
00267   EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
00268          ExportGIDs, ExportPIDs, my_proc) );
00269 
00270   int testNumRemoteIDs;
00271   EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
00272            Deterministic, testNumRemoteIDs ) );
00273 
00274   return(0);
00275 }
00276 
00277 //==============================================================================
00278 //---------------------------------------------------------------------------
00279 //CreateFromRecvs Method
00280 // - create communication plan given a known list of procs to recv from
00281 //---------------------------------------------------------------------------
00282 #ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
00283 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
00284            const long long * RemoteGIDs,
00285                  const int * RemotePIDs,
00286            bool Deterministic,
00287                  int & NumExportIDs,
00288            long long *& ExportGIDs,
00289            int *& ExportPIDs )
00290 {
00291   int my_proc;
00292   MPI_Comm_rank( comm_, &my_proc );
00293 
00294   int nprocs;
00295   MPI_Comm_size( comm_, &nprocs );
00296 
00297   EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
00298          ExportGIDs, ExportPIDs, my_proc) );
00299 
00300   int testNumRemoteIDs;
00301   EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
00302            Deterministic, testNumRemoteIDs ) );
00303 
00304   return(0);
00305 }
00306 #endif
00307 
00308 
00309 
00310 //==============================================================================
00311 //---------------------------------------------------------------------------
00312 //CreateFromSendsAndRecvs Method
00313 //---------------------------------------------------------------------------
00314 int Epetra_MpiDistributor::CreateFromSendsAndRecvs( const int & NumExportIDs,
00315                 const int * ExportPIDs,
00316                 const int & NumRemoteIDs,
00317                 const int * RemoteGIDs,
00318                 const int * RemotePIDs,
00319                 bool Deterministic)
00320 {
00321   (void)RemoteGIDs;
00322   (void)Deterministic; // Prevent compiler warnings for unused argument.
00323   nexports_ = NumExportIDs;
00324 
00325   int my_proc;
00326   MPI_Comm_rank( comm_, &my_proc );
00327   int nprocs;
00328   MPI_Comm_size( comm_, &nprocs );
00329 
00330   // Do the forward map component
00331   CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
00332 
00333   // Do the reverse map component
00334   CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
00335 
00336   return 0;
00337 }
00338 #ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
00339 int  Epetra_MpiDistributor::CreateFromSendsAndRecvs( const int & NumExportIDs,
00340                  const int * ExportPIDs,
00341                  const int & NumRemoteIDs,
00342                  const long long * RemoteGIDs,
00343                  const int * RemotePIDs,
00344                  bool Deterministic)
00345 {
00346   (void)RemoteGIDs;
00347   (void)Deterministic; // Prevent compiler warnings for unused argument.
00348   nexports_ = NumExportIDs;
00349 
00350   int my_proc;
00351   MPI_Comm_rank( comm_, &my_proc );
00352   int nprocs;
00353   MPI_Comm_size( comm_, &nprocs );
00354 
00355   // Do the forward map component
00356   CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
00357 
00358   // Do the reverse map component
00359   CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
00360 
00361   return 0;
00362 
00363 
00364 }
00365 #endif
00366 
00367 
00368 
00369 //==============================================================================
00370 int Epetra_MpiDistributor::CreateSendStructures_(int my_proc,
00371              int nprocs,
00372              const int & NumExportIDs,
00373              const int * ExportPIDs)
00374 {
00375   nexports_ = NumExportIDs;
00376 
00377   int i;
00378 
00379   // Check to see if items are grouped by processor w/o gaps
00380   // If so, indices_to -> 0
00381 
00382   // Setup data structures for quick traversal of arrays
00383   int * starts = new int[ nprocs + 1 ];
00384   for( i = 0; i < nprocs; i++ )
00385     starts[i] = 0;
00386 
00387   int nactive = 0;
00388   bool no_send_buff = true;
00389   int numDeadIndices = 0; // In some cases the GIDs will not be owned by any processors and the PID will be -1
00390 
00391   for( i = 0; i < NumExportIDs; i++ )
00392   {
00393     if( no_send_buff && i && (ExportPIDs[i] < ExportPIDs[i-1]) )
00394       no_send_buff = false;
00395     if( ExportPIDs[i] >= 0 )
00396     {
00397       ++starts[ ExportPIDs[i] ];
00398       ++nactive;
00399     }
00400     else numDeadIndices++; // Increase the number of dead indices.  Used below to leave these out of the analysis
00401   }
00402 
00403   self_msg_ = ( starts[my_proc] != 0 ) ? 1 : 0;
00404 
00405   nsends_ = 0;
00406 
00407   if( no_send_buff ) //grouped by processor, no send buffer or indices_to_ needed
00408   {
00409     for( i = 0; i < nprocs; ++i )
00410       if( starts[i] ) ++nsends_;
00411 
00412     if( nsends_ )
00413     {
00414       procs_to_ = new int[nsends_];
00415       starts_to_ = new int[nsends_];
00416       lengths_to_ = new int[nsends_];
00417     }
00418 
00419     int index = numDeadIndices;  // Leave off the dead indices (PID = -1)
00420     int proc;
00421     for( i = 0; i < nsends_; ++i )
00422     {
00423       starts_to_[i] = index;
00424       proc = ExportPIDs[index];
00425       procs_to_[i] = proc;
00426       index += starts[proc];
00427     }
00428 
00429     if( nsends_ )
00430       Sort_ints_( procs_to_, starts_to_, nsends_ );
00431 
00432     max_send_length_ = 0;
00433 
00434     for( i = 0; i < nsends_; ++i )
00435     {
00436       proc = procs_to_[i];
00437       lengths_to_[i] = starts[proc];
00438       if( (proc != my_proc) && (lengths_to_[i] > max_send_length_) )
00439         max_send_length_ = lengths_to_[i];
00440     }
00441   }
00442   else //not grouped by processor, need send buffer and indices_to_
00443   {
00444     if( starts[0] != 0 ) nsends_ = 1;
00445 
00446     for( i = 1; i < nprocs; i++ )
00447     {
00448       if( starts[i] != 0 ) ++nsends_;
00449       starts[i] += starts[i-1];
00450     }
00451 
00452     for( i = nprocs-1; i != 0; i-- )
00453       starts[i] = starts[i-1];
00454 
00455     starts[0] = 0;
00456 
00457     if (nactive>0) {
00458       indices_to_ = new int[ nactive ];
00459       size_indices_to_ = nactive;
00460     }
00461 
00462     for( i = 0; i < NumExportIDs; i++ )
00463     if( ExportPIDs[i] >= 0 )
00464     {
00465       indices_to_[ starts[ ExportPIDs[i] ] ] = i;
00466       ++starts[ ExportPIDs[i] ];
00467     }
00468 
00469     //Reconstuct starts array to index into indices_to.
00470 
00471     for( i = nprocs-1; i != 0; i-- )
00472       starts[i] = starts[i-1];
00473     starts[0] = 0;
00474     starts[nprocs] = nactive;
00475 
00476     if (nsends_>0) {
00477       lengths_to_ = new int[ nsends_ ];
00478       procs_to_ = new int[ nsends_ ];
00479       starts_to_ = new int[ nsends_ ];
00480     }
00481 
00482     int j = 0;
00483     max_send_length_ = 0;
00484 
00485     for( i = 0; i < nprocs; i++ )
00486       if( starts[i+1] != starts[i] )
00487       {
00488         lengths_to_[j] = starts[i+1] - starts[i];
00489         starts_to_[j] = starts[i];
00490         if( ( i != my_proc ) && ( lengths_to_[j] > max_send_length_ ) )
00491           max_send_length_ = lengths_to_[j];
00492         procs_to_[j] = i;
00493         j++;
00494       }
00495   }
00496 
00497   delete [] starts;
00498 
00499   nsends_ -= self_msg_;
00500 
00501   return 0;
00502 }
00503 
00504 //==============================================================================
00505 int Epetra_MpiDistributor::CreateRecvStructures_(const int & NumRemoteIDs,
00506              const int * RemotePIDs)
00507 {
00508   int i, j;
00509 
00510   // Since the RemotePIDs should be sorted, counting the total number of recvs should be easy...
00511   // use nsends as an initial guess for space.
00512   std::vector<int> recv_list;
00513   recv_list.reserve(nsends_);
00514 
00515   int last_pid=-2;
00516   for(i=0; i<NumRemoteIDs; i++) {
00517     if(RemotePIDs[i]>last_pid) {
00518       recv_list.push_back(RemotePIDs[i]);
00519       last_pid = RemotePIDs[i];
00520     }
00521     else if (RemotePIDs[i]<last_pid)
00522       throw std::runtime_error("Epetra_MpiDistributor::CreateRecvStructures_ expected RemotePIDs to be in sorted order");
00523   }
00524   nrecvs_=recv_list.size();
00525 
00526   if (nrecvs_>0) {
00527     starts_from_  = new int[nrecvs_];
00528     procs_from_   = new int[nrecvs_];
00529     lengths_from_ = new int[nrecvs_];
00530     request_      = new MPI_Request[ nrecvs_ ];
00531     status_       = new MPI_Status[ nrecvs_ ];
00532   }
00533 
00534   for(i=0,j=0; i<nrecvs_; ++i) {
00535     int jlast=j;
00536     procs_from_[i]  = recv_list[i];
00537     starts_from_[i] = j;
00538     for( ; j<NumRemoteIDs && RemotePIDs[jlast]==RemotePIDs[j]  ; j++){;}
00539     lengths_from_[i]=j-jlast;
00540   }
00541   total_recv_length_=NumRemoteIDs;
00542 
00543   nrecvs_ -= self_msg_;
00544 
00545   return 0;
00546 }
00547 
00548 
00549 //==============================================================================
00550 //---------------------------------------------------------------------------
00551 //ComputeRecvs Method
00552 //---------------------------------------------------------------------------
00553 int Epetra_MpiDistributor::ComputeRecvs_( int my_proc,
00554               int nprocs )
00555 {
00556   int * msg_count = new int[ nprocs ];
00557   int * counts = new int[ nprocs ];
00558 
00559   int i;
00560   MPI_Status status;
00561 
00562   for( i = 0; i < nprocs; i++ )
00563   {
00564     msg_count[i] = 0;
00565     counts[i] = 1;
00566   }
00567 
00568   for( i = 0; i < nsends_+self_msg_; i++ )
00569     msg_count[ procs_to_[i] ] = 1;
00570 
00571 #if defined(REDUCE_SCATTER_BUG)
00572 // the bug is found in mpich on linux platforms
00573   MPI_Reduce(msg_count, counts, nprocs, MPI_INT, MPI_SUM, 0, comm_);
00574   MPI_Scatter(counts, 1, MPI_INT, &nrecvs_, 1, MPI_INT, 0, comm_);
00575 #else
00576   MPI_Reduce_scatter( msg_count, &nrecvs_, counts, MPI_INT, MPI_SUM, comm_ );
00577 #endif
00578 
00579   delete [] msg_count;
00580   delete [] counts;
00581 
00582   if (nrecvs_>0) {
00583     lengths_from_ = new int[nrecvs_];
00584     procs_from_ = new int[nrecvs_];
00585     for(i=0; i<nrecvs_; ++i) {
00586       lengths_from_[i] = 0;
00587       procs_from_[i] = 0;
00588     }
00589   }
00590 
00591 #ifndef NEW_COMM_PATTERN
00592   for( i = 0; i < (nsends_+self_msg_); i++ )
00593     if( procs_to_[i] != my_proc ) {
00594       MPI_Send( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
00595     }
00596     else
00597     {
00598       //set self_msg_ to end block of recv arrays
00599       lengths_from_[nrecvs_-1] = lengths_to_[i];
00600       procs_from_[nrecvs_-1] = my_proc;
00601     }
00602 
00603   for( i = 0; i < (nrecvs_-self_msg_); i++ )
00604   {
00605     MPI_Recv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &status );
00606     procs_from_[i] = status.MPI_SOURCE;
00607   }
00608 
00609   MPI_Barrier( comm_ );
00610 #else
00611   if (nrecvs_>0) {
00612     if( !request_ ) {
00613       request_ = new MPI_Request[nrecvs_-self_msg_];
00614       status_ = new MPI_Status[nrecvs_-self_msg_];
00615     }
00616   }
00617 
00618   for( i = 0; i < (nrecvs_-self_msg_); i++ )
00619     MPI_Irecv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &(request_[i]) );
00620 
00621   MPI_Barrier( comm_ );
00622 
00623   for( i = 0; i < (nsends_+self_msg_); i++ )
00624     if( procs_to_[i] != my_proc ) {
00625       MPI_Rsend( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
00626     }
00627     else
00628     {
00629       //set self_msg_ to end block of recv arrays
00630       lengths_from_[nrecvs_-1] = lengths_to_[i];
00631       procs_from_[nrecvs_-1] = my_proc;
00632     }
00633 
00634   if( (nrecvs_-self_msg_) > 0 ) MPI_Waitall( (nrecvs_-self_msg_), request_, status_ );
00635 
00636   for( i = 0; i < (nrecvs_-self_msg_); i++ )
00637     procs_from_[i] = status_[i].MPI_SOURCE;
00638 #endif
00639 
00640   Sort_ints_( procs_from_, lengths_from_, nrecvs_ );
00641 
00642   // Compute indices_from_
00643   // Seems to break some rvs communication
00644 /* Not necessary since rvs communication is always blocked
00645   size_indices_from_ = 0;
00646   if( nrecvs_ > 0 )
00647   {
00648     for( i = 0; i < nrecvs_; i++ )  size_indices_from_ += lengths_from_[i];
00649     indices_from_ = new int[ size_indices_from_ ];
00650 
00651     for (i=0; i<size_indices_from_; i++) indices_from_[i] = i;
00652   }
00653 */
00654 
00655   if (nrecvs_>0) starts_from_ = new int[nrecvs_];
00656   int j = 0;
00657   for( i=0; i<nrecvs_; ++i )
00658   {
00659     starts_from_[i] = j;
00660     j += lengths_from_[i];
00661   }
00662 
00663   total_recv_length_ = 0;
00664   for( i = 0; i < nrecvs_; i++ )
00665     total_recv_length_ += lengths_from_[i];
00666 
00667   nrecvs_ -= self_msg_;
00668 
00669   MPI_Barrier( comm_ );
00670 
00671   return false;
00672 }
00673 
00674 //==============================================================================
00675 //---------------------------------------------------------------------------
00676 //ComputeSends Method
00677 //---------------------------------------------------------------------------
00678 template<typename id_type>
00679 int Epetra_MpiDistributor::ComputeSends_( int num_imports,
00680         const id_type *& import_ids,
00681         const int *& import_procs,
00682         int & num_exports,
00683         id_type *& export_ids,
00684         int *& export_procs,
00685         int my_proc ) {
00686 
00687   Epetra_MpiDistributor tmp_plan(*epComm_);
00688   int i;
00689 
00690   int * proc_list = 0;
00691   int * import_objs = 0;
00692   char * c_export_objs = 0;
00693   const int pack_size = (1 + sizeof(id_type)/sizeof(int));
00694 
00695   if( num_imports > 0 )
00696   {
00697     proc_list = new int[ num_imports ];
00698     import_objs = new int[ num_imports * pack_size];
00699 
00700     for( i = 0; i < num_imports; i++ )
00701     {
00702       proc_list[i] = import_procs[i];
00703 
00704       *(id_type*)(import_objs + pack_size*i) = import_ids[i];
00705       *(import_objs + pack_size*i + (pack_size-1)) = my_proc;
00706     }
00707   }
00708 
00709   EPETRA_CHK_ERR(tmp_plan.CreateFromSends( num_imports, proc_list,
00710              true, num_exports) );
00711   if( num_exports > 0 )
00712   {
00713     //export_objs = new int[ 2 * num_exports ];
00714     export_ids = new id_type[ num_exports ];
00715     export_procs = new int[ num_exports ];
00716   }
00717   else
00718   {
00719     export_ids = 0;
00720     export_procs = 0;
00721   }
00722 
00723   int len_c_export_objs = 0;
00724   EPETRA_CHK_ERR( tmp_plan.Do(reinterpret_cast<char *> (import_objs),
00725             pack_size * (int)sizeof( int ),
00726             len_c_export_objs,
00727             c_export_objs) );
00728   int * export_objs = reinterpret_cast<int *>(c_export_objs);
00729 
00730   for( i = 0; i < num_exports; i++ ) {
00731     export_ids[i] = *(id_type*)(export_objs + pack_size*i);
00732     export_procs[i] = *(export_objs + pack_size*i + (pack_size-1));
00733   }
00734 
00735   if( proc_list != 0 ) delete [] proc_list;
00736   if( import_objs != 0 ) delete [] import_objs;
00737   if( len_c_export_objs != 0 ) delete [] c_export_objs;
00738 
00739   return(0);
00740 
00741 }
00742 
00743 //==============================================================================
00744 int Epetra_MpiDistributor::Do( char * export_objs,
00745                                int obj_size,
00746                                int & len_import_objs,
00747                                char *& import_objs )
00748 {
00749   EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, len_import_objs, import_objs) );
00750   EPETRA_CHK_ERR( DoWaits() );
00751   return(0);
00752 }
00753 
00754 //==============================================================================
00755 int Epetra_MpiDistributor::DoReverse( char * export_objs,
00756                                       int obj_size,
00757                                       int & len_import_objs,
00758                                       char *& import_objs )
00759 {
00760   EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size,
00761          len_import_objs, import_objs) );
00762   EPETRA_CHK_ERR( DoReverseWaits() );
00763   return(0);
00764 }
00765 
00766 //==============================================================================
00767 int Epetra_MpiDistributor::DoPosts( char * export_objs,
00768             int obj_size,
00769                                     int & len_import_objs,
00770             char *& import_objs )
00771 {
00772   int i, j, k;
00773 
00774   int my_proc = 0;
00775   int self_recv_address = 0;
00776 
00777   MPI_Comm_rank( comm_, &my_proc );
00778 
00779   if( len_import_objs < (total_recv_length_*obj_size) )
00780   {
00781     if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
00782     len_import_objs = total_recv_length_*obj_size;
00783     if (len_import_objs>0) import_objs = new char[len_import_objs];
00784     for( i=0; i<len_import_objs; ++i ) import_objs[i]=0;
00785   }
00786 
00787   k = 0;
00788 
00789   j = 0;
00790   for( i = 0; i < (nrecvs_+self_msg_); i++ )
00791   {
00792     if( procs_from_[i] != my_proc )
00793     {
00794       MPI_Irecv( &(import_objs[j]),
00795                  lengths_from_[i] * obj_size,
00796                  MPI_CHAR, procs_from_[i],
00797                  tag_, comm_,
00798                  &(request_[k]) );
00799       k++;
00800     }
00801     else
00802       self_recv_address = j;
00803 
00804     j += lengths_from_[i] * obj_size;
00805   }
00806 
00807 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
00808   // NOTE (mfh 19 Mar 2012):
00809   //
00810   // The ready-sends below require that each ready-send's matching
00811   // receive (see above) has already been posted.  We ensure this with
00812   // a barrier.  (Otherwise, some process that doesn't need to post
00813   // receives might post its ready-send before the receiving process
00814   // gets to post its receive.)  If you want to remove the barrier,
00815   // you'll have to replace the ready-sends below with standard sends
00816   // or Isends.
00817   MPI_Barrier( comm_ );
00818 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
00819 
00820   //setup scan through procs_to list starting w/ higher numbered procs
00821   //Should help balance msg traffic
00822   int nblocks = nsends_ + self_msg_;
00823   int proc_index = 0;
00824   while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
00825     ++proc_index;
00826   if( proc_index == nblocks ) proc_index = 0;
00827 
00828   int self_num = 0, self_index = 0;
00829   int p;
00830 
00831   if( !indices_to_ ) //data already blocked by processor
00832   {
00833     for( i = 0; i < nblocks; ++i )
00834     {
00835       p = i + proc_index;
00836       if( p > (nblocks-1) ) p -= nblocks;
00837 
00838       if( procs_to_[p] != my_proc ) {
00839 
00840 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
00841         MPI_Rsend( &export_objs[starts_to_[p]*obj_size],
00842                    lengths_to_[p]*obj_size,
00843                    MPI_CHAR,
00844                    procs_to_[p],
00845                    tag_,
00846                    comm_ );
00847 #else
00848         MPI_Send( &export_objs[starts_to_[p]*obj_size],
00849                    lengths_to_[p]*obj_size,
00850                    MPI_CHAR,
00851                    procs_to_[p],
00852                    tag_,
00853                    comm_ );
00854 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
00855       }
00856       else {
00857        self_num = p;
00858       }
00859     }
00860 
00861     if( self_msg_ )
00862       memcpy( &import_objs[self_recv_address],
00863               &export_objs[starts_to_[self_num]*obj_size],
00864               lengths_to_[self_num]*obj_size );
00865   }
00866   else //data not blocked by proc, use send buffer
00867   {
00868     if( send_array_size_ < (max_send_length_*obj_size) )
00869     {
00870       if( send_array_!=0 ) {delete [] send_array_; send_array_ = 0;}
00871       send_array_size_ = max_send_length_*obj_size;
00872       if (send_array_size_>0) send_array_ = new char[send_array_size_];
00873     }
00874 
00875     j = 0;
00876     for( i = 0; i < nblocks; i++ )
00877     {
00878       p = i + proc_index;
00879       if( p > (nblocks-1) ) p -= nblocks;
00880       if( procs_to_[p] != my_proc )
00881       {
00882         int offset = 0;
00883         j = starts_to_[p];
00884         for( k = 0; k < lengths_to_[p]; k++ )
00885         {
00886           memcpy( &(send_array_[offset]),
00887                   &(export_objs[indices_to_[j]*obj_size]),
00888                   obj_size );
00889           ++j;
00890           offset += obj_size;
00891         }
00892 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
00893         MPI_Rsend( send_array_,
00894                    lengths_to_[p] * obj_size,
00895                    MPI_CHAR,
00896                    procs_to_[p],
00897                    tag_, comm_ );
00898 #else
00899         MPI_Send( send_array_,
00900       lengths_to_[p] * obj_size,
00901       MPI_CHAR,
00902       procs_to_[p],
00903       tag_, comm_ );
00904 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
00905       }
00906       else
00907       {
00908         self_num = p;
00909         self_index = starts_to_[p];
00910       }
00911     }
00912 
00913     if( self_msg_ )
00914       for( k = 0; k < lengths_to_[self_num]; k++ )
00915       {
00916         memcpy( &(import_objs[self_recv_address]),
00917                 &(export_objs[indices_to_[self_index]*obj_size]),
00918                 obj_size );
00919         self_index++;
00920         self_recv_address += obj_size;
00921       }
00922   }
00923   return(0);
00924 }
00925 
00926 //==============================================================================
00927 int Epetra_MpiDistributor::DoWaits()
00928 {
00929   if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
00930 
00931   return(0);
00932 }
00933 
00934 //==============================================================================
00935 int Epetra_MpiDistributor::DoReversePosts( char * export_objs,
00936                                            int obj_size,
00937                                            int & len_import_objs,
00938                                            char *& import_objs )
00939 {
00940   assert(indices_to_==0); //Can only do reverse comm when original data
00941                           // is blocked by processor
00942 
00943   // If we don't have a reverse distributor, make one.
00944   if( comm_plan_reverse_ == 0 )
00945     CreateReverseDistributor();
00946 
00947   int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, len_import_objs, import_objs);
00948 
00949   return(comm_flag);
00950 }
00951 
00952 //==============================================================================
00953 int Epetra_MpiDistributor::DoReverseWaits()
00954 {
00955   if( comm_plan_reverse_ == 0 ) return (-1);
00956 
00957   int comm_flag = comm_plan_reverse_->DoWaits();
00958 
00959   return(comm_flag);
00960 }
00961 
00962 //==============================================================================
00963 //---------------------------------------------------------------------------
00964 //Resize Method                      (Heaphy)
00965 //---------------------------------------------------------------------------
00966 int Epetra_MpiDistributor::Resize_( int * sizes )
00967 {
00968   int  i, j, k;         // loop counters
00969   int sum;
00970 
00971   //if (sizes == 0) return 0;
00972 
00973   int my_proc;
00974   MPI_Comm_rank (comm_, &my_proc);
00975   int nprocs;
00976   MPI_Comm_size( comm_, &nprocs );
00977 
00978   if( resized_ )
00979   {
00980     //test and see if we are already setup for these sizes
00981     bool match = true;
00982     for( i = 0; i < nexports_; ++i )
00983       match = match && (sizes_[i]==sizes[i]);
00984     int matched = match?1:0;
00985     int match_count = 0;
00986     MPI_Allreduce( &matched, &match_count, 1, MPI_INT, MPI_SUM, comm_ );
00987     if( match_count == nprocs )
00988       return 0;
00989     else //reset existing sizing arrays
00990       max_send_length_ = 0;
00991   }
00992 
00993   if( !sizes_ && nexports_ ) sizes_ = new int[nexports_];
00994   for (i = 0; i < nexports_; i++)
00995     sizes_[i] = sizes[i];
00996 
00997   if( !sizes_to_ && (nsends_+self_msg_) ) sizes_to_ = new int[nsends_+self_msg_];
00998   for (i = 0; i < (nsends_+self_msg_); ++i)
00999     sizes_to_[i] = 0;
01000 
01001   if( !starts_to_ptr_ && (nsends_+self_msg_) ) starts_to_ptr_ = new int[nsends_+self_msg_];
01002 
01003   if( !indices_to_ ) //blocked sends
01004   {
01005 
01006     int * index = 0;
01007     int * sort_val = 0;
01008     if (nsends_+self_msg_>0) {
01009       index = new int[nsends_+self_msg_];
01010       sort_val = new int[nsends_+self_msg_];
01011     }
01012     for( i = 0; i < (nsends_+self_msg_); ++i )
01013     {
01014       j = starts_to_[i];
01015       for( k = 0; k < lengths_to_[i]; ++k )
01016         sizes_to_[i] += sizes[j++];
01017       if( (sizes_to_[i] > max_send_length_) && (procs_to_[i] != my_proc) )
01018         max_send_length_ = sizes_to_[i];
01019     }
01020 
01021     for( i = 0; i < (nsends_+self_msg_); ++i )
01022     {
01023       sort_val[i] = starts_to_[i];
01024       index[i] = i;
01025     }
01026 
01027     if( nsends_+self_msg_ )
01028       Sort_ints_( sort_val, index, (nsends_+self_msg_) );
01029 
01030     sum = 0;
01031     for( i = 0; i < (nsends_+self_msg_); ++i )
01032     {
01033       starts_to_ptr_[ index[i] ] = sum;
01034       sum += sizes_to_[ index[i] ];
01035     }
01036 
01037     if (index!=0) {delete [] index; index = 0;}
01038     if (sort_val!=0) {delete [] sort_val; sort_val = 0;}
01039   }
01040   else //Sends not blocked, so have to do more work
01041   {
01042     if( !indices_to_ptr_ && nexports_ ) indices_to_ptr_ = new int[nexports_];
01043     int * offset = 0;
01044     if( nexports_ ) offset = new int[nexports_];
01045 
01046     //Compute address for every item in send array
01047     sum = 0;
01048     for( i = 0; i < nexports_; ++i )
01049     {
01050       offset[i] = sum;
01051       sum += sizes_[i];
01052     }
01053 
01054     sum = 0;
01055     max_send_length_ = 0;
01056     for( i = 0; i < (nsends_+self_msg_); ++i )
01057     {
01058       starts_to_ptr_[i] = sum;
01059       for( j = starts_to_[i]; j < (starts_to_[i]+lengths_to_[i]); ++j )
01060       {
01061         indices_to_ptr_[j] = offset[ indices_to_[j] ];
01062         sizes_to_[i] += sizes_[ indices_to_[j] ];
01063       }
01064       if( sizes_to_[i] > max_send_length_ && procs_to_[i] != my_proc )
01065         max_send_length_ = sizes_to_[i];
01066       sum += sizes_to_[i];
01067     }
01068 
01069     if (offset!=0) {delete [] offset; offset = 0;}
01070   }
01071 
01072   //  Exchange sizes routine inserted here:
01073   int self_index_to = -1;
01074   total_recv_length_ = 0;
01075   if( !sizes_from_ && (nrecvs_+self_msg_) ) sizes_from_ = new int [nrecvs_+self_msg_];
01076 
01077 #ifndef EPETRA_NEW_COMM_PATTERN
01078   for (i = 0; i < (nsends_+self_msg_); i++)
01079   {
01080     if(procs_to_[i] != my_proc)
01081       MPI_Send ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
01082     else
01083       self_index_to = i;
01084   }
01085 
01086   MPI_Status status;
01087   for (i = 0; i < (nrecvs_+self_msg_); ++i)
01088   {
01089     sizes_from_[i] = 0;
01090     if (procs_from_[i] != my_proc)
01091       MPI_Recv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &status);
01092     else
01093       sizes_from_[i] = sizes_to_[self_index_to];
01094     total_recv_length_ += sizes_from_[i];
01095   }
01096 #else
01097   if (nrecvs_>0 && !request_) {
01098     request_ = new MPI_Request[ nrecvs_-self_msg_ ];
01099     status_ = new MPI_Status[ nrecvs_-self_msg_ ];
01100   }
01101 
01102   for (i = 0; i < (nsends_+self_msg_); i++)
01103   {
01104     if(procs_to_[i] == my_proc)
01105       self_index_to = i;
01106   }
01107 
01108   for (i = 0; i < (nrecvs_+self_msg_); ++i)
01109   {
01110     sizes_from_[i] = 0;
01111     if (procs_from_[i] != my_proc)
01112       MPI_Irecv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &(request_[i]));
01113     else
01114     {
01115       sizes_from_[i] = sizes_to_[self_index_to];
01116       total_recv_length_ += sizes_from_[i];
01117     }
01118   }
01119 
01120   MPI_Barrier( comm_ );
01121 
01122   for (i = 0; i < (nsends_+self_msg_); i++)
01123   {
01124     if(procs_to_[i] != my_proc)
01125       MPI_Rsend ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
01126   }
01127 
01128   if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
01129 
01130   for (i = 0; i < (nrecvs_+self_msg_); ++i)
01131   {
01132     if (procs_from_[i] != my_proc)
01133       total_recv_length_ += sizes_from_[i];
01134   }
01135 #endif
01136   // end of exchanges sizes insert
01137 
01138   sum = 0;
01139   if( !starts_from_ptr_ ) starts_from_ptr_  = new int[nrecvs_+self_msg_];
01140   for (i = 0; i < (nrecvs_+self_msg_); ++i)
01141   {
01142      starts_from_ptr_[i] = sum;
01143      sum += sizes_from_[i];
01144   }
01145 
01146   resized_ = true;
01147 
01148   return 0;
01149 }
01150 
01151 //==============================================================================
01152 int Epetra_MpiDistributor::Do( char * export_objs,
01153                                int obj_size,
01154                                int *& sizes,
01155                                int & len_import_objs,
01156                                char *& import_objs )
01157 {
01158   EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, sizes,
01159         len_import_objs, import_objs) );
01160   EPETRA_CHK_ERR( DoWaits() );
01161 
01162   return(0);
01163 }
01164 
01165 //==============================================================================
01166 int Epetra_MpiDistributor::DoReverse( char * export_objs,
01167                                       int obj_size,
01168                                       int *& sizes,
01169                                       int & len_import_objs,
01170                                       char *& import_objs )
01171 {
01172   EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size, sizes,
01173          len_import_objs, import_objs) );
01174   EPETRA_CHK_ERR( DoReverseWaits() );
01175 
01176   return(0);
01177 }
01178 
01179 //==============================================================================
01180 int Epetra_MpiDistributor::DoPosts( char * export_objs,
01181                                     int obj_size,
01182                                     int *& sizes,
01183                                     int & len_import_objs,
01184                                     char *& import_objs )
01185 {
01186   int ierr = Resize_(sizes);
01187   if (ierr != 0) {
01188     return(ierr);
01189   }
01190 
01191   MPI_Barrier( comm_ );
01192 
01193   int i, j, k;
01194 
01195   int my_proc = 0;
01196   int self_recv_address = 0;
01197 
01198   MPI_Comm_rank( comm_, &my_proc );
01199 
01200   if( len_import_objs < (total_recv_length_*obj_size) )
01201   {
01202     if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
01203     len_import_objs = total_recv_length_*obj_size;
01204     if (len_import_objs>0) import_objs = new char[len_import_objs];
01205   }
01206 
01207   k = 0;
01208 
01209   for( i = 0; i < (nrecvs_+self_msg_); ++i )
01210   {
01211     if( procs_from_[i] != my_proc )
01212     {
01213       MPI_Irecv( &(import_objs[starts_from_ptr_[i] * obj_size]),
01214                  sizes_from_[i] * obj_size,
01215                  MPI_CHAR, procs_from_[i], tag_, comm_, &(request_[k]) );
01216       k++;
01217     }
01218     else
01219       self_recv_address = starts_from_ptr_[i] * obj_size;
01220   }
01221 
01222   MPI_Barrier( comm_ );
01223 
01224   //setup scan through procs_to list starting w/ higher numbered procs
01225   //Should help balance msg traffic
01226   int nblocks = nsends_ + self_msg_;
01227   int proc_index = 0;
01228   while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
01229     ++proc_index;
01230   if( proc_index == nblocks ) proc_index = 0;
01231 
01232   int self_num = 0;
01233   int p;
01234 
01235   if( !indices_to_ ) //data already blocked by processor
01236   {
01237     for( i = 0; i < nblocks; ++i )
01238     {
01239       p = i + proc_index;
01240       if( p > (nblocks-1) ) p -= nblocks;
01241 
01242       if( procs_to_[p] != my_proc )
01243         MPI_Rsend( &export_objs[starts_to_ptr_[p]*obj_size],
01244                    sizes_to_[p]*obj_size,
01245                    MPI_CHAR,
01246                    procs_to_[p],
01247                    tag_,
01248                    comm_ );
01249       else
01250         self_num = p;
01251     }
01252 
01253     if( self_msg_ )
01254       memcpy( &import_objs[self_recv_address],
01255               &export_objs[starts_to_ptr_[self_num]*obj_size],
01256               sizes_to_[self_num]*obj_size );
01257   }
01258   else //data not blocked by proc, need to copy to buffer
01259   {
01260     if( send_array_size_ && send_array_size_ < (max_send_length_*obj_size) )
01261     {
01262       if (send_array_!=0) {delete [] send_array_; send_array_ = 0;}
01263       send_array_ = 0;
01264       send_array_size_ = 0;
01265     }
01266     if( !send_array_size_ )
01267     {
01268       send_array_size_ = max_send_length_*obj_size;
01269       if (send_array_size_>0) send_array_ = new char[send_array_size_];
01270     }
01271 
01272     for( i=0; i<nblocks; ++i )
01273     {
01274       p = i + proc_index;
01275       if( p > (nblocks-1) ) p -= nblocks;
01276 
01277       if( procs_to_[p] != my_proc )
01278       {
01279         int offset = 0;
01280         j = starts_to_[p];
01281         for( k=0; k<lengths_to_[p]; ++k )
01282         {
01283           memcpy( &send_array_[offset],
01284                   &export_objs[indices_to_ptr_[j]*obj_size],
01285                   sizes_[indices_to_[j]]*obj_size );
01286           offset += sizes_[indices_to_[j]]*obj_size;
01287           ++j;
01288         }
01289         MPI_Rsend( send_array_, sizes_to_[p]*obj_size,
01290                    MPI_CHAR, procs_to_[p], tag_, comm_ );
01291       }
01292       else
01293         self_num = p;
01294     }
01295 
01296     if( self_msg_ )
01297     {
01298       int jj;
01299       j = starts_to_[self_num];
01300       for( k=0; k<lengths_to_[self_num]; ++k )
01301       {
01302         jj = indices_to_ptr_[j];
01303         memcpy( &import_objs[self_recv_address],
01304                 &export_objs[jj*obj_size],
01305                 sizes_[indices_to_[j]*obj_size] );
01306         self_recv_address += (obj_size*sizes_[indices_to_[j]]);
01307         ++jj;
01308       }
01309     }
01310   }
01311 
01312   return(0);
01313 }
01314 
01315 //==============================================================================
01316 int Epetra_MpiDistributor::DoReversePosts( char * export_objs,
01317                    int obj_size,
01318                                            int *& sizes,
01319                                            int & len_import_objs,
01320                    char *& import_objs )
01321 {
01322   assert(indices_to_==0); //Can only do reverse comm when original data
01323                           // is blocked by processor
01324 
01325   // If we don't have a reverse distributor, make one.
01326   if( comm_plan_reverse_ == 0 )
01327     CreateReverseDistributor();
01328 
01329   int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, sizes, len_import_objs, import_objs);
01330 
01331   return(comm_flag);
01332 }
01333 
01334 //==============================================================================
01335 void Epetra_MpiDistributor::Print(std::ostream & os) const
01336 {
01337   using std::endl;
01338 
01339   int myRank = 0, numProcs = 1;
01340   MPI_Comm_rank (comm_, &myRank);
01341   MPI_Comm_size (comm_, &numProcs);
01342 
01343   if (myRank == 0) {
01344     os << "Epetra_MpiDistributor (implements Epetra_Distributor)" << std::endl;
01345   }
01346   // Let each MPI process print its data.  We assume that all
01347   // processes can print to the given output stream, and execute
01348   // barriers to make it more likely that the output will be in the
01349   // right order.
01350   for (int p = 0; p < numProcs; ++p) {
01351     if (myRank == p) {
01352       os << "[Node " << p << " of " << numProcs << "]" << std::endl;
01353       os << " selfMessage: " << self_msg_ << std::endl;
01354       os << " numSends: " << nsends_ << std::endl;
01355 
01356       os << " imagesTo: [";
01357       for (int i = 0; i < nsends_; ++i) {
01358   os << procs_to_[i];
01359   if (i < nsends_ - 1) {
01360     os << " ";
01361   }
01362       }
01363       os << "]" << std::endl;
01364 
01365       os << " lengthsTo: [";
01366       for (int i = 0; i < nsends_; ++i) {
01367   os << lengths_to_[i];
01368   if (i < nsends_ - 1) {
01369     os << " ";
01370   }
01371       }
01372       os << "]" << std::endl;
01373 
01374       os << " maxSendLength: " << max_send_length_ << std::endl;
01375 
01376       os << " startsTo: ";
01377       if (starts_to_ == NULL) {
01378   os << "(NULL)" << std::endl;
01379       } else {
01380   os << "[";
01381   for (int i = 0; i < nsends_; ++i) {
01382     os << starts_to_[i];
01383     if (i < nsends_ - 1) {
01384       os << " ";
01385     }
01386   }
01387   os << "]" << std::endl;
01388       }
01389 
01390       os << " indicesTo: ";
01391       if (indices_to_ == NULL) {
01392   os << "(NULL)" << std::endl;
01393       } else {
01394   os << "[";
01395   int k = 0;
01396   for (int i = 0; i < nsends_; ++i) {
01397     for (int j = 0; j < lengths_to_[i]; ++j) {
01398       os << " " << indices_to_[j+k];
01399     }
01400     k += lengths_to_[i];
01401   }
01402   os << "]" << std::endl;
01403       }
01404 
01405       os << " numReceives: " << nrecvs_ << std::endl;
01406       os << " totalReceiveLength: " << total_recv_length_ << std::endl;
01407 
01408       os << " lengthsFrom: [";
01409       for (int i = 0; i < nrecvs_; ++i) {
01410   os << lengths_from_[i];
01411   if (i < nrecvs_ - 1) {
01412     os << " ";
01413   }
01414       }
01415       os << "]" << std::endl;
01416 
01417       os << " startsFrom: [";
01418       for (int i = 0; i < nrecvs_; ++i) {
01419   os << starts_from_[i];
01420   if (i < nrecvs_ - 1) {
01421     os << " ";
01422   }
01423       }
01424       os << "]" << std::endl;
01425 
01426       os << " imagesFrom: [";
01427       for (int i = 0; i < nrecvs_; ++i) {
01428   os << procs_from_[i];
01429   if (i < nrecvs_ - 1) {
01430     os << " ";
01431   }
01432       }
01433       os << "]" << std::endl;
01434 
01435       // mfh 16 Dec 2011: I found this commented out here; not sure if
01436       // we want to print this, so I'm leaving it commented out.
01437       /*
01438   os << "indices_from: ";
01439   k = 0;
01440   for( i = 0; i < nrecvs_; i++ )
01441   {
01442   for( j = 0; j < lengths_from_[i]; j++ )
01443   os << " " << indices_from_[j+k];
01444   k += lengths_from_[i];
01445   }
01446       */
01447 
01448       // Last output is a flush; it leaves a space and also
01449       // helps synchronize output.
01450       os << std::flush;
01451     } // if it's my process' turn to print
01452 
01453     // Execute barriers to give output time to synchronize.
01454     // One barrier generally isn't enough.
01455     MPI_Barrier (comm_);
01456     MPI_Barrier (comm_);
01457     MPI_Barrier (comm_);
01458   }
01459 }
01460 
01461 //---------------------------------------------------------------------------
01462 int Epetra_MpiDistributor::Sort_ints_(
01463  int *vals_sort,     //  values to be sorted
01464  int *vals_other,    // other array to be reordered with sort
01465  int  nvals)         // length of these two arrays
01466 {
01467 // It is primarily used to sort messages to improve communication flow.
01468 // This routine will also insure that the ordering produced by the invert_map
01469 // routines is deterministic.  This should make bugs more reproducible.  This
01470 // is accomplished by sorting the message lists by processor ID.
01471 // This is a distribution count sort algorithm (see Knuth)
01472 //  This version assumes non negative integers.
01473 
01474     if (nvals <= 1) return 0;
01475 
01476     int i;                        // loop counter
01477 
01478     // find largest int, n, to size sorting array, then allocate and clear it
01479     int n = 0;
01480     for (i = 0; i < nvals; i++)
01481        if (n < vals_sort[i]) n = vals_sort[i];
01482     int *pos = new int [n+2];
01483     for (i = 0; i < n+2; i++) pos[i] = 0;
01484 
01485     // copy input arrays into temporary copies to allow sorting original arrays
01486     int *copy_sort  = new int [nvals];
01487     int *copy_other = new int [nvals];
01488     for (i = 0; i < nvals; i++)
01489     {
01490       copy_sort[i]  = vals_sort[i];
01491       copy_other[i] = vals_other[i];
01492     }
01493 
01494     // count the occurances of integers ("distribution count")
01495     int *p = pos+1;
01496     for (i = 0; i < nvals; i++) p[copy_sort[i]]++;
01497 
01498     // create the partial sum of distribution counts
01499     for (i = 1; i < n; i++) p[i] += p[i-1];
01500 
01501     // the shifted partitial sum is the index to store the data  in sort order
01502     p = pos;
01503     for (i = 0; i < nvals; i++)
01504     {
01505       vals_sort  [p[copy_sort [i]]]   = copy_sort[i];
01506       vals_other [p[copy_sort [i]]++] = copy_other[i];
01507     }
01508 
01509     delete [] copy_sort;
01510     delete [] copy_other;
01511     delete [] pos;
01512 
01513     return 0;
01514 }
01515 
01516 //-------------------------------------------------------------------------
01517 Epetra_MpiDistributor& Epetra_MpiDistributor::operator=(const Epetra_MpiDistributor& src)
01518 {
01519   (void)src;
01520   //not currently supported
01521   bool throw_error = true;
01522   if (throw_error) {
01523     throw ReportError("Epetra_MpiDistributor::operator= not supported.",-1);
01524   }
01525   return( *this );
01526 }
01527 
01528 
01529 //-------------------------------------------------------------------------
01530 void Epetra_MpiDistributor::CreateReverseDistributor() {
01531   int i;
01532   int my_proc = 0;
01533 
01534   MPI_Comm_rank( comm_, &my_proc );
01535 
01536   if( comm_plan_reverse_ == 0 ) {
01537     int total_send_length = 0;
01538     for( i = 0; i < nsends_+self_msg_; i++ )
01539       total_send_length += lengths_to_[i];
01540 
01541     int max_recv_length = 0;
01542     for( i = 0; i < nrecvs_; i++ )
01543       if( procs_from_[i] != my_proc )
01544         if( lengths_from_[i] > max_recv_length )
01545           max_recv_length = lengths_from_[i];
01546 
01547     comm_plan_reverse_ = new Epetra_MpiDistributor(*epComm_);
01548 
01549     comm_plan_reverse_->lengths_to_ = lengths_from_;
01550     comm_plan_reverse_->procs_to_ = procs_from_;
01551     comm_plan_reverse_->indices_to_ = indices_from_;
01552     comm_plan_reverse_->starts_to_ = starts_from_;
01553 
01554     comm_plan_reverse_->lengths_from_ = lengths_to_;
01555     comm_plan_reverse_->procs_from_ = procs_to_;
01556     comm_plan_reverse_->indices_from_ = indices_to_;
01557     comm_plan_reverse_->starts_from_ = starts_to_;
01558 
01559     comm_plan_reverse_->nsends_ = nrecvs_;
01560     comm_plan_reverse_->nrecvs_ = nsends_;
01561     comm_plan_reverse_->self_msg_ = self_msg_;
01562 
01563     comm_plan_reverse_->max_send_length_ = max_recv_length;
01564     comm_plan_reverse_->total_recv_length_ = total_send_length;
01565 
01566     comm_plan_reverse_->request_ = new MPI_Request[ comm_plan_reverse_->nrecvs_ ];
01567     comm_plan_reverse_->status_= new MPI_Status[ comm_plan_reverse_->nrecvs_ ];
01568 
01569     comm_plan_reverse_->no_delete_ = true;
01570   }
01571 
01572 }
01573 
01574 //-------------------------------------------------------------------------
01575 Epetra_Distributor * Epetra_MpiDistributor::ReverseClone() {
01576   if(comm_plan_reverse_==0)
01577     CreateReverseDistributor();
01578 
01579   return(dynamic_cast<Epetra_Distributor *>(new Epetra_MpiDistributor(*comm_plan_reverse_)));
01580 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines