fei_CommUtils.cpp

00001 
00002 #include <fei_CommUtils.hpp>
00003 #include <fei_TemplateUtils.hpp>
00004 
00005 #undef fei_file
00006 #define fei_file "fei_CommUtils.cpp"
00007 
00008 #include <fei_ErrMacros.hpp>
00009 
00010 namespace fei {
00011 
00012 //------------------------------------------------------------------------
00013 int localProc(MPI_Comm comm)
00014 {
00015 #ifdef FEI_SER
00016   return 0;
00017 #else
00018   int local_proc = 0;
00019   MPI_Comm_rank(comm, &local_proc);
00020   return local_proc;
00021 #endif
00022 }
00023 
00024 //------------------------------------------------------------------------
00025 int numProcs(MPI_Comm comm)
00026 {
00027 #ifdef FEI_SER
00028   return 1;
00029 #else
00030   int num_procs = 1;
00031   MPI_Comm_size(comm, &num_procs);
00032   return num_procs;
00033 #endif
00034 }
00035 
00036 //------------------------------------------------------------------------
00037 void Barrier(MPI_Comm comm)
00038 {
00039 #ifndef FEI_SER
00040   MPI_Barrier(comm);
00041 #endif
00042 }
00043 
00044 //------------------------------------------------------------------------
00045 int mirrorProcs(MPI_Comm comm, std::vector<int>& toProcs, std::vector<int>& fromProcs)
00046 {
00047   fromProcs.resize(0);
00048 #ifdef FEI_SER
00049   fromProcs.push_back(0);
00050   return(0);
00051 #else
00052   int num_procs = fei::numProcs(comm);
00053   std::vector<int> tmpIntData(num_procs*3, 0);
00054 
00055   int* buf = &tmpIntData[0];
00056   int* recvbuf = buf+num_procs;
00057 
00058   for(unsigned i=0; i<toProcs.size(); ++i) {
00059     buf[toProcs[i]] = 1;
00060   }
00061 
00062   for(int ii=2*num_procs; ii<3*num_procs; ++ii) {
00063     buf[ii] = 1;
00064   }
00065 
00066   CHK_MPI( MPI_Reduce_scatter(buf, &(buf[num_procs]), &(buf[2*num_procs]),
00067                               MPI_INT, MPI_SUM, comm) );
00068 
00069   int numRecvProcs = buf[num_procs];
00070 
00071   int tag = 11116;
00072   std::vector<MPI_Request> mpiReqs(numRecvProcs);
00073 
00074   int offset = 0;
00075   for(int ii=0; ii<numRecvProcs; ++ii) {
00076     CHK_MPI( MPI_Irecv(&(recvbuf[ii]), 1, MPI_INT, MPI_ANY_SOURCE, tag,
00077                        comm, &(mpiReqs[offset++])) );
00078   }
00079 
00080   for(unsigned i=0; i<toProcs.size(); ++i) {
00081     CHK_MPI( MPI_Send(&(toProcs[i]), 1, MPI_INT, toProcs[i], tag, comm) );
00082   }
00083 
00084   MPI_Status status;
00085   for(int ii=0; ii<numRecvProcs; ++ii) {
00086     int index;
00087     MPI_Waitany(numRecvProcs, &mpiReqs[0], &index, &status);
00088     fromProcs.push_back(status.MPI_SOURCE);
00089   }
00090 
00091   std::sort(fromProcs.begin(), fromProcs.end());
00092 
00093   return(0);
00094 #endif
00095 }
00096 
00097 //------------------------------------------------------------------------
00098 int mirrorCommPattern(MPI_Comm comm, comm_map* inPattern, comm_map*& outPattern)
00099 {
00100 #ifdef FEI_SER
00101   (void)inPattern;
00102   (void)outPattern;
00103 #else
00104   int localP = localProc(comm);
00105   int numP  = numProcs(comm);
00106 
00107   if (numP < 2) return(0);
00108 
00109   std::vector<int> buf(numP*2, 0);
00110 
00111   int numInProcs = inPattern->getMap().size();
00112   std::vector<int> inProcs(numInProcs);
00113   fei::copyKeysToVector(inPattern->getMap(), inProcs);
00114 
00115   std::vector<int> outProcs;
00116 
00117   int err = mirrorProcs(comm, inProcs, outProcs);
00118   if (err != 0) ERReturn(-1);
00119 
00120   std::vector<int> recvbuf(outProcs.size(), 0);
00121 
00122   outPattern = new comm_map(0,1);
00123 
00124   MPI_Datatype mpi_ttype = fei::mpiTraits<int>::mpi_type();
00125 
00126   //now recv a length (the contents of buf[i]) from each "out-proc", which
00127   //will be the length of the equation data that will also be recvd from that
00128   //proc.
00129   std::vector<MPI_Request> mpiReqs(outProcs.size());
00130   std::vector<MPI_Status> mpiStss(outProcs.size());
00131   MPI_Request* requests = &mpiReqs[0];
00132   MPI_Status* statuses = &mpiStss[0];
00133 
00134   int firsttag = 11117;
00135   int offset = 0;
00136   int* outProcsPtr = &outProcs[0];
00137   for(unsigned i=0; i<outProcs.size(); ++i) {
00138     if (MPI_Irecv(&(recvbuf[i]), 1, MPI_INT, outProcsPtr[i], firsttag,
00139                   comm, &requests[offset++]) != MPI_SUCCESS) ERReturn(-1);
00140   }
00141 
00142   comm_map::map_type& in_row_map = inPattern->getMap();
00143   comm_map::map_type::iterator
00144     in_iter = in_row_map.begin(),
00145     in_end  = in_row_map.end();
00146  
00147   int* inProcsPtr = &inProcs[0];
00148   for(int ii=0; in_iter!= in_end; ++in_iter, ++ii) {
00149     comm_map::row_type* in_row = in_iter->second;
00150     buf[ii] = in_row->size();
00151     if (MPI_Send(&(buf[ii]), 1, MPI_INT, inProcsPtr[ii], firsttag,
00152                  comm) != MPI_SUCCESS) ERReturn(-1);
00153   }
00154 
00155   int numOutProcs = outProcs.size();
00156 
00157   MPI_Waitall(numOutProcs, requests, statuses);
00158   std::vector<int> lengths(numOutProcs);
00159   int totalRecvLen = 0;
00160   offset = 0;
00161   for(int ii=0; ii<numOutProcs; ++ii) {
00162     if (recvbuf[ii] > 0) {
00163       lengths[offset++] = recvbuf[ii];
00164       totalRecvLen += recvbuf[ii];
00165     }
00166   }
00167 
00168   //now we need to create the space into which we'll receive the
00169   //lists that other procs send to us.
00170   std::vector<int> recvData(totalRecvLen, 999999);
00171 
00172   int tag2 = 11118;
00173   offset = 0;
00174   for(int ii=0; ii<numOutProcs; ++ii) {
00175     CHK_MPI(MPI_Irecv(&(recvData[offset]), lengths[ii], mpi_ttype,
00176                       outProcs[ii], tag2, comm, &requests[ii]) );
00177     offset += lengths[ii];
00178   }
00179 
00180   std::vector<int> sendList;
00181 
00182   in_iter = in_row_map.begin();
00183 
00184   for(int ii=0; in_iter != in_end; ++in_iter,++ii) {
00185     if (inProcs[ii] == localP) {
00186       continue;
00187     }
00188     sendList.resize(in_iter->second->size());
00189     fei::copySetToArray(*(in_iter->second), sendList.size(), &sendList[0]);
00190 
00191     CHK_MPI(MPI_Send(&sendList[0], sendList.size(), mpi_ttype,
00192                      inProcs[ii], tag2, comm) );
00193   }
00194 
00195   //our final communication operation is to catch the Irecvs we started above.
00196   for(int ii=0; ii<numOutProcs; ++ii) {
00197     MPI_Wait(&requests[ii], &statuses[ii]);
00198   }
00199 
00200   //now we've completed all the communication, so we're ready to put the data
00201   //we received into the outPattern object.
00202   offset = 0;
00203   for(int ii=0; ii<numOutProcs; ii++) {
00204     outPattern->addIndices(outProcs[ii], lengths[ii],
00205                            &(recvData[offset]));
00206     offset += lengths[ii];
00207   }
00208 
00209 #endif
00210   return(0);
00211 }
00212 
00213 
00214 //------------------------------------------------------------------------
00215 int exchangeIntData(MPI_Comm comm,
00216                     std::vector<int>& sendProcs,
00217                     std::vector<int>& sendData,
00218                     std::vector<int>& recvProcs,
00219                     std::vector<int>& recvData)
00220 {
00221   if (sendProcs.size() == 0 && recvProcs.size() == 0) return(0);
00222   if (sendProcs.size() != sendData.size()) return(-1);
00223 #ifndef FEI_SER
00224   recvData.resize(recvProcs.size());
00225   std::vector<MPI_Request> mpiReqs;
00226   mpiReqs.resize(recvProcs.size());
00227 
00228   int tag = 11114;
00229   MPI_Datatype mpi_dtype = MPI_INT;
00230 
00231   //launch Irecv's for recvData:
00232 
00233   int localProc = fei::localProc(comm);
00234   int numRecvProcs = recvProcs.size();
00235   int req_offset = 0;
00236   for(unsigned i=0; i<recvProcs.size(); ++i) {
00237     if (recvProcs[i] == localProc) {--numRecvProcs; continue; }
00238 
00239     CHK_MPI( MPI_Irecv(&(recvData[i]), 1, mpi_dtype, recvProcs[i], tag,
00240                        comm, &mpiReqs[req_offset++]) );
00241   }
00242 
00243   //send the sendData:
00244 
00245   for(unsigned i=0; i<sendProcs.size(); ++i) {
00246     if (sendProcs[i] == localProc) continue;
00247 
00248     CHK_MPI( MPI_Send(&(sendData[i]), 1, mpi_dtype,
00249                       sendProcs[i], tag, comm) );
00250   }
00251 
00252   //complete the Irecv's:
00253 
00254   for(int ii=0; ii<numRecvProcs; ++ii) {
00255     int index;
00256     MPI_Status status;
00257     CHK_MPI( MPI_Waitany(numRecvProcs, &mpiReqs[0], &index, &status) );
00258   }
00259 
00260 #endif
00261   return(0);
00262 }
00263 
00264 //------------------------------------------------------------------------
00265 int Allreduce(MPI_Comm comm, bool localBool, bool& globalBool)
00266 {
00267 #ifndef FEI_SER
00268   int localInt = localBool ? 1 : 0;
00269   int globalInt = 0;
00270 
00271   CHK_MPI( MPI_Allreduce(&localInt, &globalInt, 1, MPI_INT, MPI_MAX, comm) );
00272 
00273   globalBool = globalInt==1 ? true : false;
00274 #else
00275   globalBool = localBool;
00276 #endif
00277 
00278   return(0);
00279 }
00280 
00281 }//namespace fei
00282 

Generated on Wed May 12 21:30:40 2010 for FEI by  doxygen 1.4.7