Tpetra Matrix/Vector Services Version of the Day
Tpetra_Distributor.cpp
00001 // @HEADER
00002 // ***********************************************************************
00003 //
00004 //          Tpetra: Templated Linear Algebra Services Package
00005 //                 Copyright (2008) Sandia Corporation
00006 //
00007 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00008 // the U.S. Government retains certain rights in this software.
00009 //
00010 // Redistribution and use in source and binary forms, with or without
00011 // modification, are permitted provided that the following conditions are
00012 // met:
00013 //
00014 // 1. Redistributions of source code must retain the above copyright
00015 // notice, this list of conditions and the following disclaimer.
00016 //
00017 // 2. Redistributions in binary form must reproduce the above copyright
00018 // notice, this list of conditions and the following disclaimer in the
00019 // documentation and/or other materials provided with the distribution.
00020 //
00021 // 3. Neither the name of the Corporation nor the names of the
00022 // contributors may be used to endorse or promote products derived from
00023 // this software without specific prior written permission.
00024 //
00025 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00026 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00027 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00028 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00029 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00030 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00031 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00032 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00033 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00034 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00035 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00036 //
00037 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
00038 //
00039 // ************************************************************************
00040 // @HEADER
00041 
00042 #include "Tpetra_Distributor.hpp"
00043 #include "Teuchos_StandardParameterEntryValidators.hpp"
00044 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
00045 
00046 
00047 namespace Tpetra {
00048   namespace Details {
00049     std::string
00050     DistributorSendTypeEnumToString (EDistributorSendType sendType)
00051     {
00052       if (sendType == DISTRIBUTOR_ISEND) {
00053         return "Isend";
00054       }
00055       else if (sendType == DISTRIBUTOR_RSEND) {
00056         return "Rsend";
00057       }
00058       else if (sendType == DISTRIBUTOR_SEND) {
00059         return "Send";
00060       }
00061       else if (sendType == DISTRIBUTOR_SSEND) {
00062         return "Ssend";
00063       }
00064       else {
00065         TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
00066           "EDistributorSendType enum value " << sendType << ".");
00067       }
00068     }
00069 
00070     std::string
00071     DistributorHowInitializedEnumToString (EDistributorHowInitialized how)
00072     {
00073       switch (how) {
00074       case Details::DISTRIBUTOR_NOT_INITIALIZED:
00075         return "Not initialized yet";
00076       case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
00077         return "By createFromSends";
00078       case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
00079         return "By createFromRecvs";
00080       case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
00081         return "By createReverseDistributor";
00082       case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
00083         return "By copy constructor";
00084       default:
00085         return "INVALID";
00086       }
00087     }
00088   } // namespace Details
00089 
00090   Array<std::string>
00091   distributorSendTypes ()
00092   {
00093     Array<std::string> sendTypes;
00094     sendTypes.push_back ("Isend");
00095     sendTypes.push_back ("Rsend");
00096     sendTypes.push_back ("Send");
00097     sendTypes.push_back ("Ssend");
00098     return sendTypes;
00099   }
00100 
00101   // We set default values of Distributor's Boolean parameters here,
00102   // in this one place.  That way, if we want to change the default
00103   // value of a parameter, we don't have to search the whole file to
00104   // ensure a consistent setting.
00105   namespace {
00106     // Default value of the "Debug" parameter.
00107     const bool tpetraDistributorDebugDefault = false;
00108     // Default value of the "Barrier between receives and sends" parameter.
00109     const bool barrierBetween_default = false;
00110     // Default value of the "Use distinct tags" parameter.
00111     const bool useDistinctTags_default = true;
00112     // Default value of the "Enable MPI CUDA RDMA support"
00113 #ifdef TPETRA_ENABLE_MPI_CUDA_RDMA
00114     const bool enable_cuda_rdma_default = true;
00115 #else
00116     const bool enable_cuda_rdma_default = false;
00117 #endif
00118   } // namespace (anonymous)
00119 
00120   int Distributor::getTag (const int pathTag) const {
00121     return useDistinctTags_ ? pathTag : comm_->getTag ();
00122   }
00123 
00124 
00125 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00126   void Distributor::makeTimers () {
00127     const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
00128     const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
00129     const std::string name_doWaits = "Tpetra::Distributor: doWaits";
00130     const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
00131     const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
00132     const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
00133     const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
00134     const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
00135     const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
00136 
00137     timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
00138     timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
00139     timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
00140     timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
00141     timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
00142     timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
00143     timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
00144     timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
00145     timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
00146   }
00147 #endif // TPETRA_DISTRIBUTOR_TIMERS
00148 
00149   void
00150   Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00151                      const Teuchos::RCP<Teuchos::ParameterList>& plist)
00152   {
00153     this->setVerbLevel (debug_ ? Teuchos::VERB_EXTREME : Teuchos::VERB_NONE);
00154     this->setOStream (out_);
00155     if (! plist.is_null ()) {
00156       // The input parameters may override the above verbosity level
00157       // setting, if there is a "VerboseObject" sublist.
00158       this->setParameterList (plist);
00159     }
00160 
00161 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00162     makeTimers ();
00163 #endif // TPETRA_DISTRIBUTOR_TIMERS
00164 
00165     if (debug_) {
00166       Teuchos::OSTab tab (out_);
00167       std::ostringstream os;
00168       os << comm_->getRank ()
00169          << ": Distributor ctor done" << std::endl;
00170       *out_ << os.str ();
00171     }
00172   }
00173 
00174   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
00175     : comm_ (comm)
00176     , out_  (Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)))
00177     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00178     , sendType_ (Details::DISTRIBUTOR_SEND)
00179     , barrierBetween_ (barrierBetween_default)
00180     , debug_ (tpetraDistributorDebugDefault)
00181     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00182     , numExports_ (0)
00183     , selfMessage_ (false)
00184     , numSends_ (0)
00185     , maxSendLength_ (0)
00186     , numReceives_ (0)
00187     , totalReceiveLength_ (0)
00188     , lastRoundBytesSend_ (0)
00189     , lastRoundBytesRecv_ (0)
00190     , useDistinctTags_ (useDistinctTags_default)
00191   {
00192     init (comm, Teuchos::null);
00193   }
00194 
00195   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00196                             const Teuchos::RCP<Teuchos::FancyOStream>& out)
00197     : comm_ (comm)
00198     , out_ (out.is_null () ? Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out)
00199     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00200     , sendType_ (Details::DISTRIBUTOR_SEND)
00201     , barrierBetween_ (barrierBetween_default)
00202     , debug_ (tpetraDistributorDebugDefault)
00203     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00204     , numExports_ (0)
00205     , selfMessage_ (false)
00206     , numSends_ (0)
00207     , maxSendLength_ (0)
00208     , numReceives_ (0)
00209     , totalReceiveLength_ (0)
00210     , lastRoundBytesSend_ (0)
00211     , lastRoundBytesRecv_ (0)
00212     , useDistinctTags_ (useDistinctTags_default)
00213   {
00214     init (comm, Teuchos::null);
00215   }
00216 
00217   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00218                             const Teuchos::RCP<Teuchos::ParameterList>& plist)
00219     : comm_ (comm)
00220     , out_ (Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)))
00221     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00222     , sendType_ (Details::DISTRIBUTOR_SEND)
00223     , barrierBetween_ (barrierBetween_default)
00224     , debug_ (tpetraDistributorDebugDefault)
00225     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00226     , numExports_ (0)
00227     , selfMessage_ (false)
00228     , numSends_ (0)
00229     , maxSendLength_ (0)
00230     , numReceives_ (0)
00231     , totalReceiveLength_ (0)
00232     , lastRoundBytesSend_ (0)
00233     , lastRoundBytesRecv_ (0)
00234     , useDistinctTags_ (useDistinctTags_default)
00235   {
00236     init (comm, plist);
00237   }
00238 
00239   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00240                             const Teuchos::RCP<Teuchos::FancyOStream>& out,
00241                             const Teuchos::RCP<Teuchos::ParameterList>& plist)
00242     : comm_ (comm)
00243     , out_ (out.is_null () ? Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out)
00244     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00245     , sendType_ (Details::DISTRIBUTOR_SEND)
00246     , barrierBetween_ (barrierBetween_default)
00247     , debug_ (tpetraDistributorDebugDefault)
00248     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00249     , numExports_ (0)
00250     , selfMessage_ (false)
00251     , numSends_ (0)
00252     , maxSendLength_ (0)
00253     , numReceives_ (0)
00254     , totalReceiveLength_ (0)
00255     , lastRoundBytesSend_ (0)
00256     , lastRoundBytesRecv_ (0)
00257     , useDistinctTags_ (useDistinctTags_default)
00258   {
00259     init (comm, plist);
00260   }
00261 
00262   Distributor::Distributor (const Distributor & distributor)
00263     : comm_ (distributor.comm_)
00264     , out_ (distributor.out_)
00265     , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
00266     , sendType_ (distributor.sendType_)
00267     , barrierBetween_ (distributor.barrierBetween_)
00268     , debug_ (distributor.debug_)
00269     , enable_cuda_rdma_ (distributor.enable_cuda_rdma_)
00270     , numExports_ (distributor.numExports_)
00271     , selfMessage_ (distributor.selfMessage_)
00272     , numSends_ (distributor.numSends_)
00273     , imagesTo_ (distributor.imagesTo_)
00274     , startsTo_ (distributor.startsTo_)
00275     , lengthsTo_ (distributor.lengthsTo_)
00276     , maxSendLength_ (distributor.maxSendLength_)
00277     , indicesTo_ (distributor.indicesTo_)
00278     , numReceives_ (distributor.numReceives_)
00279     , totalReceiveLength_ (distributor.totalReceiveLength_)
00280     , lengthsFrom_ (distributor.lengthsFrom_)
00281     , imagesFrom_ (distributor.imagesFrom_)
00282     , startsFrom_ (distributor.startsFrom_)
00283     , indicesFrom_ (distributor.indicesFrom_)
00284     , reverseDistributor_ (distributor.reverseDistributor_)
00285     , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
00286     , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
00287     , useDistinctTags_ (distributor.useDistinctTags_)
00288   {
00289     using Teuchos::ParameterList;
00290     using Teuchos::parameterList;
00291     using Teuchos::RCP;
00292     using Teuchos::rcp;
00293 
00294     this->setVerbLevel (distributor.getVerbLevel ());
00295     this->setOStream (out_);
00296     // The input parameters may override the above verbosity level
00297     // setting, if there is a "VerboseObject" sublist.
00298     //
00299     // Clone the right-hand side's ParameterList, so that this' list
00300     // is decoupled from the right-hand side's list.  We don't need to
00301     // do validation, since the right-hand side already has validated
00302     // its parameters, so just call setMyParamList().  Note that this
00303     // won't work if the right-hand side doesn't have a list set yet,
00304     // so we first check for null.
00305     RCP<const ParameterList> rhsList = distributor.getParameterList ();
00306     if (! rhsList.is_null ()) {
00307       this->setMyParamList (parameterList (* rhsList));
00308     }
00309 
00310 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00311     makeTimers ();
00312 #endif // TPETRA_DISTRIBUTOR_TIMERS
00313 
00314     if (debug_) {
00315       Teuchos::OSTab tab (out_);
00316       std::ostringstream os;
00317       os << comm_->getRank ()
00318          << ": Distributor copy ctor done" << std::endl;
00319       *out_ << os.str ();
00320     }
00321   }
00322 
00323   void Distributor::swap (Distributor& rhs) {
00324     using Teuchos::ParameterList;
00325     using Teuchos::parameterList;
00326     using Teuchos::RCP;
00327 
00328     std::swap (comm_, rhs.comm_);
00329     std::swap (out_, rhs.out_);
00330     std::swap (howInitialized_, rhs.howInitialized_);
00331     std::swap (sendType_, rhs.sendType_);
00332     std::swap (barrierBetween_, rhs.barrierBetween_);
00333     std::swap (debug_, rhs.debug_);
00334     std::swap (enable_cuda_rdma_, rhs.enable_cuda_rdma_);
00335     std::swap (numExports_, rhs.numExports_);
00336     std::swap (selfMessage_, rhs.selfMessage_);
00337     std::swap (numSends_, rhs.numSends_);
00338     std::swap (imagesTo_, rhs.imagesTo_);
00339     std::swap (startsTo_, rhs.startsTo_);
00340     std::swap (lengthsTo_, rhs.lengthsTo_);
00341     std::swap (maxSendLength_, rhs.maxSendLength_);
00342     std::swap (indicesTo_, rhs.indicesTo_);
00343     std::swap (numReceives_, rhs.numReceives_);
00344     std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
00345     std::swap (lengthsFrom_, rhs.lengthsFrom_);
00346     std::swap (imagesFrom_, rhs.imagesFrom_);
00347     std::swap (startsFrom_, rhs.startsFrom_);
00348     std::swap (indicesFrom_, rhs.indicesFrom_);
00349     std::swap (reverseDistributor_, rhs.reverseDistributor_);
00350     std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
00351     std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
00352     std::swap (useDistinctTags_, rhs.useDistinctTags_);
00353 
00354     // Swap verbosity levels.
00355     const Teuchos::EVerbosityLevel lhsVerb = this->getVerbLevel ();
00356     const Teuchos::EVerbosityLevel rhsVerb = rhs.getVerbLevel ();
00357     this->setVerbLevel (rhsVerb);
00358     rhs.setVerbLevel (lhsVerb);
00359 
00360     // Swap output streams.  We've swapped out_ above, but we have to
00361     // tell the parent class VerboseObject about the swap.
00362     this->setOStream (out_);
00363     rhs.setOStream (rhs.out_);
00364 
00365     // Swap parameter lists.  If they are the same object, make a deep
00366     // copy first, so that modifying one won't modify the other one.
00367     RCP<ParameterList> lhsList = this->getNonconstParameterList ();
00368     RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
00369     if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
00370       rhsList = parameterList (*rhsList);
00371     }
00372     if (! rhsList.is_null ()) {
00373       this->setMyParamList (rhsList);
00374     }
00375     if (! lhsList.is_null ()) {
00376       rhs.setMyParamList (lhsList);
00377     }
00378 
00379     // We don't need to swap timers, because all instances of
00380     // Distributor use the same timers.
00381   }
00382 
00383   Distributor::~Distributor()
00384   {
00385     // We shouldn't have any outstanding communication requests at
00386     // this point.
00387     TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
00388       "Tpetra::Distributor: Destructor called with " << requests_.size()
00389       << " outstanding posts (unfulfilled communication requests).  There "
00390       "should be none at this point.  Please report this bug to the Tpetra "
00391       "developers.");
00392   }
00393 
00394   void
00395   Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
00396   {
00397     using Teuchos::FancyOStream;
00398     using Teuchos::getIntegralValue;
00399     using Teuchos::includesVerbLevel;
00400     using Teuchos::OSTab;
00401     using Teuchos::ParameterList;
00402     using Teuchos::parameterList;
00403     using Teuchos::RCP;
00404     using std::endl;
00405 
00406     RCP<const ParameterList> validParams = getValidParameters ();
00407     plist->validateParametersAndSetDefaults (*validParams);
00408 
00409     const bool barrierBetween =
00410       plist->get<bool> ("Barrier between receives and sends");
00411     const Details::EDistributorSendType sendType =
00412       getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
00413     const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
00414     const bool debug = plist->get<bool> ("Debug");
00415     const bool enable_cuda_rdma = plist->get<bool> ("Enable MPI CUDA RDMA support");
00416 
00417     // We check this property explicitly, since we haven't yet learned
00418     // how to make a validator that can cross-check properties.
00419     // Later, turn this into a validator so that it can be embedded in
00420     // the valid ParameterList and used in Optika.
00421     TEUCHOS_TEST_FOR_EXCEPTION(
00422       ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
00423       std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
00424       << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
00425       "between receives and sends." << endl << "This is invalid; you must "
00426       "include the barrier if you use ready sends." << endl << "Ready sends "
00427       "require that their corresponding receives have already been posted, "
00428       "and the only way to guarantee that in general is with a barrier.");
00429 
00430     if (plist->isSublist ("VerboseObject")) {
00431       // Read the "VerboseObject" sublist for (optional) verbosity
00432       // settings.  We've set defaults already in Distributor's
00433       // constructor, so we don't need this sublist to exist.
00434       Teuchos::readVerboseObjectSublist (&*plist, this);
00435     }
00436 
00437     // Now that we've validated the input list, save the results.
00438     sendType_ = sendType;
00439     barrierBetween_ = barrierBetween;
00440     useDistinctTags_ = useDistinctTags;
00441     debug_ = debug;
00442     enable_cuda_rdma_ = enable_cuda_rdma;
00443 
00444     // ParameterListAcceptor semantics require pointer identity of the
00445     // sublist passed to setParameterList(), so we save the pointer.
00446     this->setMyParamList (plist);
00447   }
00448 
00449   Teuchos::RCP<const Teuchos::ParameterList>
00450   Distributor::getValidParameters () const
00451   {
00452     using Teuchos::Array;
00453     using Teuchos::ParameterList;
00454     using Teuchos::parameterList;
00455     using Teuchos::RCP;
00456     using Teuchos::setStringToIntegralParameter;
00457 
00458     const bool barrierBetween = barrierBetween_default;
00459     const bool useDistinctTags = useDistinctTags_default;
00460     const bool debug = tpetraDistributorDebugDefault;
00461     const bool enable_cuda_rdma = enable_cuda_rdma_default;
00462 
00463     Array<std::string> sendTypes = distributorSendTypes ();
00464     const std::string defaultSendType ("Send");
00465     Array<Details::EDistributorSendType> sendTypeEnums;
00466     sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
00467     sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
00468     sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
00469     sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
00470 
00471     RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
00472     plist->set ("Barrier between receives and sends", barrierBetween,
00473                 "Whether to execute a barrier between receives and sends in do"
00474                 "[Reverse]Posts().  Required for correctness when \"Send type\""
00475                 "=\"Rsend\", otherwise correct but not recommended.");
00476     setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
00477       defaultSendType, "When using MPI, the variant of send to use in "
00478       "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
00479     plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
00480                 "MPI message tags for different code paths.");
00481     plist->set ("Debug", debug, "Whether to print copious debugging output on "
00482                 "all processes.");
00483     plist->set ("Enable MPI CUDA RDMA support", enable_cuda_rdma,
00484                 "Whether to enable RDMA support for MPI communication between "
00485                 "CUDA GPUs.  Only enable this if you know for sure your MPI "
00486                 "library supports it.");
00487 
00488     Teuchos::setupVerboseObjectSublist (&*plist);
00489     return Teuchos::rcp_const_cast<const ParameterList> (plist);
00490   }
00491 
00492 
00493   size_t Distributor::getTotalReceiveLength() const
00494   { return totalReceiveLength_; }
00495 
00496   size_t Distributor::getNumReceives() const
00497   { return numReceives_; }
00498 
00499   bool Distributor::hasSelfMessage() const
00500   { return selfMessage_; }
00501 
00502   size_t Distributor::getNumSends() const
00503   { return numSends_; }
00504 
00505   size_t Distributor::getMaxSendLength() const
00506   { return maxSendLength_; }
00507 
00508   Teuchos::ArrayView<const int> Distributor::getImagesFrom() const
00509   { return imagesFrom_; }
00510 
00511   Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
00512   { return lengthsFrom_; }
00513 
00514   Teuchos::ArrayView<const int> Distributor::getImagesTo() const
00515   { return imagesTo_; }
00516 
00517   Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
00518   { return lengthsTo_; }
00519 
00520   Teuchos::RCP<Distributor>
00521   Distributor::getReverse() const {
00522     if (reverseDistributor_.is_null ()) {
00523       createReverseDistributor ();
00524     }
00525     return reverseDistributor_;
00526   }
00527 
00528 
00529   void
00530   Distributor::createReverseDistributor() const
00531   {
00532     reverseDistributor_ = Teuchos::rcp (new Distributor (comm_));
00533 
00534     // The total length of all the sends of this Distributor.  We
00535     // calculate it because it's the total length of all the receives
00536     // of the reverse Distributor.
00537     size_t totalSendLength =
00538       std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
00539 
00540     // The maximum length of any of the receives of this Distributor.
00541     // We calculate it because it's the maximum length of any of the
00542     // sends of the reverse Distributor.
00543     size_t maxReceiveLength = 0;
00544     const int myImageID = comm_->getRank();
00545     for (size_t i=0; i < numReceives_; ++i) {
00546       if (imagesFrom_[i] != myImageID) {
00547         // Don't count receives for messages sent by myself to myself.
00548         if (lengthsFrom_[i] > maxReceiveLength) {
00549           maxReceiveLength = lengthsFrom_[i];
00550         }
00551       }
00552     }
00553 
00554     // Initialize all of reverseDistributor's data members.  This
00555     // mainly just involves flipping "send" and "receive," or the
00556     // equivalent "to" and "from."
00557     reverseDistributor_->lengthsTo_ = lengthsFrom_;
00558     reverseDistributor_->imagesTo_ = imagesFrom_;
00559     reverseDistributor_->indicesTo_ = indicesFrom_;
00560     reverseDistributor_->startsTo_ = startsFrom_;
00561     reverseDistributor_->lengthsFrom_ = lengthsTo_;
00562     reverseDistributor_->imagesFrom_ = imagesTo_;
00563     reverseDistributor_->indicesFrom_ = indicesTo_;
00564     reverseDistributor_->startsFrom_ = startsTo_;
00565     reverseDistributor_->numSends_ = numReceives_;
00566     reverseDistributor_->numReceives_ = numSends_;
00567     reverseDistributor_->selfMessage_ = selfMessage_;
00568     reverseDistributor_->maxSendLength_ = maxReceiveLength;
00569     reverseDistributor_->totalReceiveLength_ = totalSendLength;
00570     reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
00571 
00572     // Note: technically, I am my reverse distributor's reverse distributor, but
00573     //       we will not set this up, as it gives us an opportunity to test
00574     //       that reverseDistributor is an inverse operation w.r.t. value semantics of distributors
00575     // Note: numExports_ was not copied
00576   }
00577 
00578 
00579   void Distributor::doWaits() {
00580     using Teuchos::Array;
00581     using Teuchos::CommRequest;
00582     using Teuchos::FancyOStream;
00583     using Teuchos::includesVerbLevel;
00584     using Teuchos::is_null;
00585     using Teuchos::OSTab;
00586     using Teuchos::RCP;
00587     using Teuchos::waitAll;
00588     using std::endl;
00589 
00590     Teuchos::OSTab tab (out_);
00591 
00592 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00593     Teuchos::TimeMonitor timeMon (*timer_doWaits_);
00594 #endif // TPETRA_DISTRIBUTOR_TIMERS
00595 
00596     const int myRank = comm_->getRank ();
00597 
00598     if (debug_) {
00599       std::ostringstream os;
00600       os << myRank << ": doWaits: # reqs = "
00601          << requests_.size () << endl;
00602       *out_ << os.str ();
00603     }
00604 
00605     if (requests_.size() > 0) {
00606       waitAll (*comm_, requests_());
00607 
00608 #ifdef HAVE_TEUCHOS_DEBUG
00609       // Make sure that waitAll() nulled out all the requests.
00610       for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
00611            it != requests_.end(); ++it)
00612       {
00613         TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
00614           Teuchos::typeName(*this) << "::doWaits(): Communication requests "
00615           "should all be null aftr calling Teuchos::waitAll() on them, but "
00616           "at least one request is not null.");
00617       }
00618 #endif // HAVE_TEUCHOS_DEBUG
00619       // Restore the invariant that requests_.size() is the number of
00620       // outstanding nonblocking communication requests.
00621       requests_.resize (0);
00622     }
00623 
00624 #ifdef HAVE_TEUCHOS_DEBUG
00625     {
00626       const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
00627       int globalSizeNonzero = 0;
00628       Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
00629                                     localSizeNonzero,
00630                                     Teuchos::outArg (globalSizeNonzero));
00631       TEUCHOS_TEST_FOR_EXCEPTION(
00632         globalSizeNonzero != 0, std::runtime_error,
00633         "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
00634         "a nonzero number of outstanding posts.  There should be none at this "
00635         "point.  Please report this bug to the Tpetra developers.");
00636     }
00637 #endif // HAVE_TEUCHOS_DEBUG
00638 
00639     if (debug_) {
00640       std::ostringstream os;
00641       os << myRank << ": doWaits done" << endl;
00642       *out_ << os.str ();
00643     }
00644   }
00645 
00646   void Distributor::doReverseWaits() {
00647     // call doWaits() on the reverse Distributor, if it exists
00648     if (! reverseDistributor_.is_null()) {
00649       reverseDistributor_->doWaits();
00650     }
00651   }
00652 
00653   std::string Distributor::description () const {
00654     std::ostringstream out;
00655 
00656     out << "\"Tpetra::Distributor\": {";
00657     const std::string label = this->getObjectLabel ();
00658     if (label != "") {
00659       out << "Label: " << label << ", ";
00660     }
00661     out << "How initialized: "
00662         << Details::DistributorHowInitializedEnumToString (howInitialized_)
00663         << ", Parameters: {"
00664         << "Send type: "
00665         << DistributorSendTypeEnumToString (sendType_)
00666         << ", Barrier between receives and sends: "
00667         << (barrierBetween_ ? "true" : "false")
00668         << ", Use distinct tags: "
00669         << (useDistinctTags_ ? "true" : "false")
00670         << ", Debug: " << (debug_ ? "true" : "false")
00671         << ", Enable MPI CUDA RDMA support: "
00672         << (enable_cuda_rdma_ ? "true" : "false")
00673         << "}}";
00674     return out.str ();
00675   }
00676 
00677   void
00678   Distributor::describe (Teuchos::FancyOStream &out,
00679                          const Teuchos::EVerbosityLevel verbLevel) const
00680   {
00681     using std::endl;
00682     using std::setw;
00683     using Teuchos::VERB_DEFAULT;
00684     using Teuchos::VERB_NONE;
00685     using Teuchos::VERB_LOW;
00686     using Teuchos::VERB_MEDIUM;
00687     using Teuchos::VERB_HIGH;
00688     using Teuchos::VERB_EXTREME;
00689     Teuchos::EVerbosityLevel vl = verbLevel;
00690     if (vl == VERB_DEFAULT) vl = VERB_LOW;
00691     const int myImageID = comm_->getRank();
00692     const int numImages = comm_->getSize();
00693     Teuchos::OSTab tab (out);
00694 
00695     if (vl == VERB_NONE) {
00696       return;
00697     } else {
00698       if (myImageID == 0) {
00699         // VERB_LOW and higher prints description() (on Proc 0 only).
00700         // We quote the class name because it contains colons:
00701         // quoting makes the output valid YAML.
00702         out << "\"Tpetra::Distributor\":" << endl;
00703         Teuchos::OSTab tab2 (out);
00704         const std::string label = this->getObjectLabel ();
00705         if (label != "") {
00706           out << "Label: " << label << endl;
00707         }
00708         out << "How initialized: "
00709             << Details::DistributorHowInitializedEnumToString (howInitialized_)
00710             << endl << "Parameters: " << endl;
00711         {
00712           Teuchos::OSTab tab3 (out);
00713           out << "\"Send type\": "
00714               << DistributorSendTypeEnumToString (sendType_) << endl
00715               << "\"Barrier between receives and sends\": "
00716               << (barrierBetween_ ? "true" : "false") << endl;
00717           out << "\"Use distinct tags\": "
00718               << (useDistinctTags_ ? "true" : "false") << endl;
00719           out << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
00720           out << "\"Enable MPI CUDA RDMA support\": " <<
00721             (enable_cuda_rdma_ ? "true" : "false") << endl;
00722         }
00723       }
00724       if (vl == VERB_LOW) {
00725         return;
00726       } else {
00727         Teuchos::OSTab tab2 (out);
00728         // vl > VERB_LOW lets each image print its data.  We assume
00729         // that all images can print to the given output stream, and
00730         // execute barriers to make it more likely that the output
00731         // will be in the right order.
00732         for (int imageCtr = 0; imageCtr < numImages; ++imageCtr) {
00733           if (myImageID == imageCtr) {
00734             if (myImageID == 0) {
00735               out << "Number of processes: " << numImages << endl;
00736             }
00737             out << "Process: " << myImageID << endl;
00738             Teuchos::OSTab tab3 (out);
00739             out << "selfMessage: " << hasSelfMessage () << endl;
00740             out << "numSends: " << getNumSends () << endl;
00741             if (vl == VERB_HIGH || vl == VERB_EXTREME) {
00742               out << "imagesTo: " << toString (imagesTo_) << endl;
00743               out << "lengthsTo: " << toString (lengthsTo_) << endl;
00744               out << "maxSendLength: " << getMaxSendLength () << endl;
00745             }
00746             if (vl == VERB_EXTREME) {
00747               out << "startsTo: " << toString (startsTo_) << endl;
00748               out << "indicesTo: " << toString (indicesTo_) << endl;
00749             }
00750             if (vl == VERB_HIGH || vl == VERB_EXTREME) {
00751               out << "numReceives: " << getNumReceives () << endl;
00752               out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
00753               out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
00754               out << "startsFrom: " << toString (startsFrom_) << endl;
00755               out << "imagesFrom: " << toString (imagesFrom_) << endl;
00756             }
00757             // Last output is a flush; it leaves a space and also
00758             // helps synchronize output.
00759             out << std::flush;
00760           } // if it's my image's turn to print
00761           // Execute barriers to give output time to synchronize.
00762           // One barrier generally isn't enough.
00763           comm_->barrier();
00764           comm_->barrier();
00765           comm_->barrier();
00766         } // for each image
00767       }
00768     }
00769   }
00770 
00771   void
00772   Distributor::computeReceives ()
00773   {
00774     using Teuchos::Array;
00775     using Teuchos::as;
00776     using Teuchos::CommStatus;
00777     using Teuchos::CommRequest;
00778     using Teuchos::ireceive;
00779     using Teuchos::RCP;
00780     using Teuchos::rcp;
00781     using Teuchos::REDUCE_SUM;
00782     using Teuchos::receive;
00783     using Teuchos::reduceAllAndScatter;
00784     using Teuchos::send;
00785     using Teuchos::waitAll;
00786     using std::endl;
00787 
00788     Teuchos::OSTab tab (out_);
00789     const int myRank = comm_->getRank();
00790     const int numProcs = comm_->getSize();
00791 
00792     // MPI tag for nonblocking receives and blocking sends in this method.
00793     const int pathTag = 2;
00794     const int tag = this->getTag (pathTag);
00795 
00796     if (debug_) {
00797       std::ostringstream os;
00798       os << myRank << ": computeReceives: "
00799         "{selfMessage_: " << (selfMessage_ ? "true" : "false")
00800          << ", tag: " << tag << "}" << endl;
00801       *out_ << os.str ();
00802     }
00803 
00804     // toNodesFromMe[i] == the number of messages sent by this process
00805     // to process i.  The data in numSends_, imagesTo_, and lengthsTo_
00806     // concern the contiguous sends.  Therefore, each process will be
00807     // listed in imagesTo_ at most once, and so toNodesFromMe[i] will
00808     // either be 0 or 1.
00809     {
00810       Array<int> toNodesFromMe (numProcs, 0);
00811 #ifdef HAVE_TEUCHOS_DEBUG
00812       bool counting_error = false;
00813 #endif // HAVE_TEUCHOS_DEBUG
00814       for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
00815 #ifdef HAVE_TEUCHOS_DEBUG
00816         if (toNodesFromMe[imagesTo_[i]] != 0) {
00817           counting_error = true;
00818         }
00819 #endif // HAVE_TEUCHOS_DEBUG
00820         toNodesFromMe[imagesTo_[i]] = 1;
00821       }
00822 #ifdef HAVE_TEUCHOS_DEBUG
00823       SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
00824         "Tpetra::Distributor::computeReceives: There was an error on at least "
00825         "one process in counting the number of messages send by that process to "
00826         "the other processs.  Please report this bug to the Tpetra developers.",
00827         *comm_);
00828 #endif // HAVE_TEUCHOS_DEBUG
00829 
00830       if (debug_) {
00831         std::ostringstream os;
00832         os << myRank << ": computeReceives: "
00833           "Calling reduceAllAndScatter" << endl;
00834         *out_ << os.str ();
00835       }
00836 
00837       // Compute the number of receives that this process needs to
00838       // post.  The number of receives includes any self sends (i.e.,
00839       // messages sent by this process to itself).
00840       //
00841       // (We will use numReceives_ this below to post exactly that
00842       // number of receives, with MPI_ANY_SOURCE as the sending rank.
00843       // This will tell us from which processes this process expects
00844       // to receive, and how many packets of data we expect to receive
00845       // from each process.)
00846       //
00847       // toNodesFromMe[i] is the number of messages sent by this
00848       // process to process i.  Compute the sum (elementwise) of all
00849       // the toNodesFromMe arrays on all processes in the
00850       // communicator.  If the array x is that sum, then if this
00851       // process has rank j, x[j] is the number of messages sent
00852       // to process j, that is, the number of receives on process j
00853       // (including any messages sent by process j to itself).
00854       //
00855       // Yes, this requires storing and operating on an array of
00856       // length P, where P is the number of processes in the
00857       // communicator.  Epetra does this too.  Avoiding this O(P)
00858       // memory bottleneck would require some research.
00859       //
00860       // In the (wrapped) MPI_Reduce_scatter call below, since the
00861       // counts array contains only ones, there is only one output on
00862       // each process, namely numReceives_ (which is x[j], in the
00863       // above notation).
00864       //
00865       // mfh 09 Jan 2012: The reduceAllAndScatter really isn't
00866       // necessary here.  Since counts is just all ones, we could
00867       // replace this with an all-reduce on toNodesFromMe, and let my
00868       // process (with rank myRank) get numReceives_ from
00869       // toNodesFromMe[myRank].  The HPCCG miniapp uses the all-reduce
00870       // method.  It could be possible that reduceAllAndScatter is
00871       // faster, but it also makes the code more complicated, and it
00872       // can't be _asymptotically_ faster (MPI_Allreduce has twice the
00873       // critical path length of MPI_Reduce, so reduceAllAndScatter
00874       // can't be more than twice as fast as the all-reduce, even if
00875       // the scatter is free).
00876       //
00877       // mfh 12 Apr 2013: See discussion in createFromSends() about
00878       // how we could use this communication to propagate an error
00879       // flag for "free" in a release build.
00880       Array<int> counts (numProcs, 1);
00881       int numReceivesAsInt = 0; // output
00882       reduceAllAndScatter<int, int> (*comm_, REDUCE_SUM, numProcs,
00883                                      toNodesFromMe.getRawPtr (),
00884                                      counts.getRawPtr (),
00885                                      &numReceivesAsInt);
00886       numReceives_ = Teuchos::as<size_t> (numReceivesAsInt);
00887     }
00888 
00889     // Now we know numReceives_, which is this process' number of
00890     // receives.  Allocate the lengthsFrom_ and imagesFrom_ arrays
00891     // with this number of entries.
00892     lengthsFrom_.assign (numReceives_, 0);
00893     imagesFrom_.assign (numReceives_, 0);
00894 
00895     //
00896     // Ask (via nonblocking receive) each process from which we are
00897     // receiving how many packets we should expect from it in the
00898     // communication pattern.
00899     //
00900 
00901     // At this point, numReceives_ includes any self message that
00902     // there may be.  At the end of this routine, we'll subtract off
00903     // the self message (if there is one) from numReceives_.  In this
00904     // routine, we don't need to receive a message from ourselves in
00905     // order to figure out our lengthsFrom_ and source process ID; we
00906     // can just ask ourselves directly.  Thus, the actual number of
00907     // nonblocking receives we post here does not include the self
00908     // message.
00909     const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
00910 
00911     // Teuchos' wrapper for nonblocking receives requires receive
00912     // buffers that it knows won't go away.  This is why we use RCPs,
00913     // one RCP per nonblocking receive request.  They get allocated in
00914     // the loop below.
00915     Array<RCP<CommRequest<int> > > requests (actualNumReceives);
00916     Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
00917     Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
00918 
00919     // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
00920     // (receive data from any process).
00921 #ifdef HAVE_MPI
00922     const int anySourceProc = MPI_ANY_SOURCE;
00923 #else
00924     const int anySourceProc = -1;
00925 #endif
00926 
00927     if (debug_) {
00928       std::ostringstream os;
00929       os << myRank << ": computeReceives: Posting "
00930          << actualNumReceives << " irecvs" << endl;
00931       *out_ << os.str ();
00932     }
00933 
00934     // Post the (nonblocking) receives.
00935     for (size_t i = 0; i < actualNumReceives; ++i) {
00936       // Once the receive completes, we can ask the corresponding
00937       // CommStatus object (output by wait()) for the sending process'
00938       // ID (which we'll assign to imagesFrom_[i] -- don't forget to
00939       // do that!).
00940       lengthsFromBuffers[i].resize (1);
00941       lengthsFromBuffers[i][0] = as<size_t> (0);
00942       requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
00943       if (debug_) {
00944         std::ostringstream os;
00945         os << myRank << ": computeReceives: "
00946           "Posted any-proc irecv w/ specified tag " << tag << endl;
00947         *out_ << os.str ();
00948       }
00949     }
00950 
00951     if (debug_) {
00952       std::ostringstream os;
00953       os << myRank << ": computeReceives: "
00954         "posting " << numSends_ << " sends" << endl;
00955       *out_ << os.str ();
00956     }
00957     // Post the sends: Tell each process to which we are sending how
00958     // many packets it should expect from us in the communication
00959     // pattern.  We could use nonblocking sends here, as long as we do
00960     // a waitAll() on all the sends and receives at once.
00961     //
00962     // We assume that numSends_ and selfMessage_ have already been
00963     // set.  The value of numSends_ (my process' number of sends) does
00964     // not include any message that it might send to itself.
00965     for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
00966       if (imagesTo_[i] != myRank) {
00967         // Send a message to imagesTo_[i], telling that process that
00968         // this communication pattern will send that process
00969         // lengthsTo_[i] blocks of packets.
00970         const size_t* const lengthsTo_i = &lengthsTo_[i];
00971         send<int, size_t> (lengthsTo_i, 1, as<int> (imagesTo_[i]), tag, *comm_);
00972         if (debug_) {
00973           std::ostringstream os;
00974           os << myRank << ": computeReceives: "
00975             "Posted send to Proc " << imagesTo_[i] << " w/ specified tag "
00976              << tag << endl;
00977           *out_ << os.str ();
00978         }
00979       }
00980       else {
00981         // We don't need a send in the self-message case.  If this
00982         // process will send a message to itself in the communication
00983         // pattern, then the last element of lengthsFrom_ and
00984         // imagesFrom_ corresponds to the self-message.  Of course
00985         // this process knows how long the message is, and the process
00986         // ID is its own process ID.
00987         lengthsFrom_[numReceives_-1] = lengthsTo_[i];
00988         imagesFrom_[numReceives_-1] = myRank;
00989       }
00990     }
00991 
00992     if (debug_) {
00993       std::ostringstream os;
00994       os << myRank << ": computeReceives: waitAll on "
00995          << requests.size () << " requests" << endl;
00996       *out_ << os.str ();
00997     }
00998     //
00999     // Wait on all the receives.  When they arrive, check the status
01000     // output of wait() for the receiving process ID, unpack the
01001     // request buffers into lengthsFrom_, and set imagesFrom_ from the
01002     // status.
01003     //
01004     waitAll (*comm_, requests (), statuses ());
01005     for (size_t i = 0; i < actualNumReceives; ++i) {
01006       lengthsFrom_[i] = *lengthsFromBuffers[i];
01007       imagesFrom_[i] = statuses[i]->getSourceRank ();
01008     }
01009 
01010     // Sort the imagesFrom_ array, and apply the same permutation to
01011     // lengthsFrom_.  This ensures that imagesFrom_[i] and
01012     // lengthsFrom_[i] refers to the same thing.
01013     sort2 (imagesFrom_.begin(), imagesFrom_.end(), lengthsFrom_.begin());
01014 
01015     // Compute indicesFrom_
01016     totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
01017     indicesFrom_.clear ();
01018     indicesFrom_.reserve (totalReceiveLength_);
01019     for (size_t i = 0; i < totalReceiveLength_; ++i) {
01020       indicesFrom_.push_back(i);
01021     }
01022 
01023     startsFrom_.clear ();
01024     startsFrom_.reserve (numReceives_);
01025     for (size_t i = 0, j = 0; i < numReceives_; ++i) {
01026       startsFrom_.push_back(j);
01027       j += lengthsFrom_[i];
01028     }
01029 
01030     if (selfMessage_) {
01031       --numReceives_;
01032     }
01033 
01034     if (debug_) {
01035       std::ostringstream os;
01036       os << myRank << ": computeReceives: done" << endl;
01037       *out_ << os.str ();
01038     }
01039   }
01040 
01041   size_t
01042   Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportNodeIDs)
01043   {
01044     using Teuchos::outArg;
01045     using Teuchos::REDUCE_MAX;
01046     using Teuchos::reduceAll;
01047     using std::endl;
01048 
01049     Teuchos::OSTab tab (out_);
01050 
01051     numExports_ = exportNodeIDs.size();
01052 
01053     const int myImageID = comm_->getRank();
01054     const int numImages = comm_->getSize();
01055     if (debug_) {
01056       std::ostringstream os;
01057       os << myImageID << ": createFromSends" << endl;
01058       *out_ << os.str ();
01059     }
01060 
01061     // exportNodeIDs tells us the communication pattern for this
01062     // distributor.  It dictates the way that the export data will be
01063     // interpreted in doPosts().  We want to perform at most one
01064     // send per process in doPosts; this is for two reasons:
01065     //   * minimize latency / overhead in the comm routines (nice)
01066     //   * match the number of receives and sends between processes
01067     //     (necessary)
01068     //
01069     // Teuchos::Comm requires that the data for a send are contiguous
01070     // in a send buffer.  Therefore, if the data in the send buffer
01071     // for doPosts() are not contiguous, they will need to be copied
01072     // into a contiguous buffer.  The user has specified this
01073     // noncontiguous pattern and we can't do anything about it.
01074     // However, if they do not provide an efficient pattern, we will
01075     // warn them if one of the following compile-time options has been
01076     // set:
01077     //   * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
01078     //   * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
01079     //
01080     // If the data are contiguous, then we can post the sends in situ
01081     // (i.e., without needing to copy them into a send buffer).
01082     //
01083     // Determine contiguity. There are a number of ways to do this:
01084     // * If the export IDs are sorted, then all exports to a
01085     //   particular node must be contiguous. This is what Epetra does.
01086     // * If the export ID of the current export already has been
01087     //   listed, then the previous listing should correspond to the
01088     //   same export.  This tests contiguity, but not sortedness.
01089     //
01090     // Both of these tests require O(n), where n is the number of
01091     // exports. However, the latter will positively identify a greater
01092     // portion of contiguous patterns.  We use the latter method.
01093     //
01094     // Check to see if values are grouped by images without gaps
01095     // If so, indices_to -> 0.
01096 
01097     // Set up data structures for quick traversal of arrays.
01098     // This contains the number of sends for each process ID.
01099     //
01100     // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
01101     // that create an array of length the number of processes in the
01102     // communicator (plus one).  Given how this code uses this array,
01103     // it should be straightforward to replace it with a hash table or
01104     // some other more space-efficient data structure.  In practice,
01105     // most of the entries of starts should be zero for a sufficiently
01106     // large process count, unless the communication pattern is dense.
01107     // Note that it's important to be able to iterate through keys (i
01108     // for which starts[i] is nonzero) in increasing order.
01109     Teuchos::Array<size_t> starts (numImages + 1, 0);
01110 
01111     // numActive is the number of sends that are not Null
01112     size_t numActive = 0;
01113     int needSendBuff = 0; // Boolean
01114 
01115 #ifdef HAVE_TPETRA_DEBUG
01116     int badID = -1; // only used in a debug build
01117 #endif // HAVE_TPETRA_DEBUG
01118     for (size_t i = 0; i < numExports_; ++i) {
01119       const int exportID = exportNodeIDs[i];
01120       if (exportID >= numImages) {
01121 #ifdef HAVE_TPETRA_DEBUG
01122         badID = myImageID;
01123 #endif // HAVE_TPETRA_DEBUG
01124         break;
01125       }
01126       else if (exportID >= 0) {
01127         // exportID is a valid process ID.  Increment the number of
01128         // messages this process will send to that process.
01129         ++starts[exportID];
01130 
01131         // If we're sending more than one message to process exportID,
01132         // then it is possible that the data are not contiguous.
01133         // Check by seeing if the previous process ID in the list
01134         // (exportNodeIDs[i-1]) is the same.  It's safe to use i-1,
01135         // because if starts[exportID] > 1, then i must be > 1 (since
01136         // the starts array was filled with zeros initially).
01137 
01138         // null entries break continuity.
01139         // e.g.,  [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
01140         if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportNodeIDs[i-1]) {
01141           needSendBuff = 1;
01142         }
01143         ++numActive;
01144       }
01145     }
01146 
01147 #ifdef HAVE_TPETRA_DEBUG
01148     // Test whether any process in the communicator got an invalid
01149     // process ID.  If badID != -1 on this process, then it equals
01150     // this process' rank.  The max of all badID over all processes is
01151     // the max rank which has an invalid process ID.
01152     {
01153       int gbl_badID;
01154       reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
01155       TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
01156         Teuchos::typeName(*this) << "::createFromSends(): Process  " << gbl_badID
01157         << ", perhaps among other processes, got a bad send process ID.");
01158     }
01159 #else
01160     // FIXME (mfh 12 Apr 2013) Rather than simply ignoring this
01161     // information, we should think about how to pass it along so that
01162     // all the processes find out about it.  In a release build with
01163     // efficiency warnings turned off, the next communication happens
01164     // in computeReceives(), in the reduceAllAndScatter
01165     // (MPI_Reduce_scatter).  We could figure out how to encode the
01166     // error flag in that operation, for example by replacing it with
01167     // a reduceAll (MPI_Allreduce) as described there, and adding an
01168     // extra element to the array that encodes the error condition
01169     // (zero on all processes if no error, else 1 on any process with
01170     // the error, so that the sum will produce a nonzero value if any
01171     // process had an error).  I'll defer this change for now and
01172     // recommend instead that people with troubles try a debug build.
01173 #endif // HAVE_TPETRA_DEBUG
01174 
01175 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
01176     {
01177       int global_needSendBuff;
01178       reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
01179                             outArg (global_needSendBuff));
01180       TPETRA_EFFICIENCY_WARNING(
01181         global_needSendBuff != 0, std::runtime_error,
01182         "::createFromSends: Grouping export IDs together by process rank often "
01183         "improves performance.");
01184     }
01185 #endif
01186 
01187     // Determine from the caller's data whether or not the current
01188     // process should send (a) message(s) to itself.
01189     if (starts[myImageID] != 0) {
01190       selfMessage_ = true;
01191     }
01192     else {
01193       selfMessage_ = false;
01194     }
01195 
01196 #ifdef HAVE_TEUCHOS_DEBUG
01197     bool index_neq_numActive = false;
01198     bool send_neq_numSends = false;
01199 #endif
01200     if (! needSendBuff) {
01201       // grouped by image, no send buffer or indicesTo_ needed
01202       numSends_ = 0;
01203       // Count total number of sends, i.e., total number of images to
01204       // which we are sending.  This includes myself, if applicable.
01205       for (int i = 0; i < numImages; ++i) {
01206         if (starts[i]) {
01207           ++numSends_;
01208         }
01209       }
01210 
01211       // Not only do we not need these, but we must clear them, as
01212       // empty status of indicesTo is a flag used later.
01213       indicesTo_.resize(0);
01214       // Size these to numSends_; note, at the moment, numSends_
01215       // includes self sends.  Set their values to zeros.
01216       imagesTo_.assign(numSends_,0);
01217       startsTo_.assign(numSends_,0);
01218       lengthsTo_.assign(numSends_,0);
01219 
01220       // set startsTo to the offset for each send (i.e., each image ID)
01221       // set imagesTo to the image ID for each send
01222       // in interpreting this code, remember that we are assuming contiguity
01223       // that is why index skips through the ranks
01224       {
01225         size_t index = 0, nodeIndex = 0;
01226         for (size_t i = 0; i < numSends_; ++i) {
01227           while (exportNodeIDs[nodeIndex] < 0) {
01228             ++nodeIndex; // skip all negative node IDs
01229           }
01230           startsTo_[i] = nodeIndex;
01231           int imageID = exportNodeIDs[nodeIndex];
01232           imagesTo_[i] = imageID;
01233           index     += starts[imageID];
01234           nodeIndex += starts[imageID];
01235         }
01236 #ifdef HAVE_TEUCHOS_DEBUG
01237         if (index != numActive) {
01238           index_neq_numActive = true;
01239         }
01240 #endif
01241       }
01242       // sort the startsTo and image IDs together, in ascending order, according
01243       // to image IDs
01244       if (numSends_ > 0) {
01245         sort2(imagesTo_.begin(), imagesTo_.end(), startsTo_.begin());
01246       }
01247       // compute the maximum send length
01248       maxSendLength_ = 0;
01249       for (size_t i = 0; i < numSends_; ++i) {
01250         int imageID = imagesTo_[i];
01251         lengthsTo_[i] = starts[imageID];
01252         if ((imageID != myImageID) && (lengthsTo_[i] > maxSendLength_)) {
01253           maxSendLength_ = lengthsTo_[i];
01254         }
01255       }
01256     }
01257     else {
01258       // not grouped by image, need send buffer and indicesTo_
01259 
01260       // starts[i] is the number of sends to node i
01261       // numActive equals number of sends total, \sum_i starts[i]
01262 
01263       // this loop starts at starts[1], so explicitly check starts[0]
01264       if (starts[0] == 0 ) {
01265         numSends_ = 0;
01266       }
01267       else {
01268         numSends_ = 1;
01269       }
01270       for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
01271                                             im1=starts.begin();
01272            i != starts.end(); ++i)
01273       {
01274         if (*i != 0) ++numSends_;
01275         *i += *im1;
01276         im1 = i;
01277       }
01278       // starts[i] now contains the number of exports to nodes 0 through i
01279 
01280       for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
01281                                                       i=starts.rbegin()+1;
01282            i != starts.rend(); ++i)
01283       {
01284         *ip1 = *i;
01285         ip1 = i;
01286       }
01287       starts[0] = 0;
01288       // starts[i] now contains the number of exports to nodes 0 through
01289       // i-1, i.e., all nodes before node i
01290 
01291       indicesTo_.resize(numActive);
01292 
01293       for (size_t i = 0; i < numExports_; ++i) {
01294         if (exportNodeIDs[i] >= 0) {
01295           // record the offset to the sendBuffer for this export
01296           indicesTo_[starts[exportNodeIDs[i]]] = i;
01297           // now increment the offset for this node
01298           ++starts[exportNodeIDs[i]];
01299         }
01300       }
01301       // our send buffer will contain the export data for each of the nodes
01302       // we communicate with, in order by node id
01303       // sendBuffer = {node_0_data, node_1_data, ..., node_np-1_data}
01304       // indicesTo now maps each export to the location in our send buffer
01305       // associated with the export
01306       // data for export i located at sendBuffer[indicesTo[i]]
01307       //
01308       // starts[i] once again contains the number of exports to
01309       // nodes 0 through i
01310       for (int node = numImages-1; node != 0; --node) {
01311         starts[node] = starts[node-1];
01312       }
01313       starts.front() = 0;
01314       starts[numImages] = numActive;
01315       //
01316       // starts[node] once again contains the number of exports to
01317       // nodes 0 through node-1
01318       // i.e., the start of my data in the sendBuffer
01319 
01320       // this contains invalid data at nodes we don't care about, that is okay
01321       imagesTo_.resize(numSends_);
01322       startsTo_.resize(numSends_);
01323       lengthsTo_.resize(numSends_);
01324 
01325       // for each group of sends/exports, record the destination node,
01326       // the length, and the offset for this send into the
01327       // send buffer (startsTo_)
01328       maxSendLength_ = 0;
01329       size_t snd = 0;
01330       for (int node = 0; node < numImages; ++node ) {
01331         if (starts[node+1] != starts[node]) {
01332           lengthsTo_[snd] = starts[node+1] - starts[node];
01333           startsTo_[snd] = starts[node];
01334           // record max length for all off-node sends
01335           if ((node != myImageID) && (lengthsTo_[snd] > maxSendLength_)) {
01336             maxSendLength_ = lengthsTo_[snd];
01337           }
01338           imagesTo_[snd] = node;
01339           ++snd;
01340         }
01341       }
01342 #ifdef HAVE_TEUCHOS_DEBUG
01343       if (snd != numSends_) {
01344         send_neq_numSends = true;
01345       }
01346 #endif
01347     }
01348 #ifdef HAVE_TEUCHOS_DEBUG
01349         SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
01350             "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
01351         SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
01352             "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
01353 #endif
01354 
01355     if (selfMessage_) --numSends_;
01356 
01357     // Invert map to see what msgs are received and what length
01358     computeReceives();
01359 
01360     if (debug_) {
01361       std::ostringstream os;
01362       os << myImageID << ": createFromSends: done" << endl;
01363       *out_ << os.str ();
01364     }
01365 
01366     // createFromRecvs() calls createFromSends(), but will set
01367     // howInitialized_ again after calling createFromSends().
01368     howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
01369 
01370     return totalReceiveLength_;
01371   }
01372 
01373 } // namespace Tpetra
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines