Tpetra Matrix/Vector Services Version of the Day
Tpetra_Distributor.cpp
00001 // @HEADER
00002 // ***********************************************************************
00003 //
00004 //          Tpetra: Templated Linear Algebra Services Package
00005 //                 Copyright (2008) Sandia Corporation
00006 //
00007 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00008 // the U.S. Government retains certain rights in this software.
00009 //
00010 // Redistribution and use in source and binary forms, with or without
00011 // modification, are permitted provided that the following conditions are
00012 // met:
00013 //
00014 // 1. Redistributions of source code must retain the above copyright
00015 // notice, this list of conditions and the following disclaimer.
00016 //
00017 // 2. Redistributions in binary form must reproduce the above copyright
00018 // notice, this list of conditions and the following disclaimer in the
00019 // documentation and/or other materials provided with the distribution.
00020 //
00021 // 3. Neither the name of the Corporation nor the names of the
00022 // contributors may be used to endorse or promote products derived from
00023 // this software without specific prior written permission.
00024 //
00025 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00026 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00027 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00028 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00029 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00030 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00031 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00032 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00033 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00034 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00035 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00036 //
00037 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
00038 //
00039 // ************************************************************************
00040 // @HEADER
00041 
00042 #include "Tpetra_Distributor.hpp"
00043 #include "Teuchos_StandardParameterEntryValidators.hpp"
00044 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
00045 
00046 
00047 namespace Tpetra {
00048   namespace Details {
00049     std::string
00050     DistributorSendTypeEnumToString (EDistributorSendType sendType)
00051     {
00052       if (sendType == DISTRIBUTOR_ISEND) {
00053         return "Isend";
00054       }
00055       else if (sendType == DISTRIBUTOR_RSEND) {
00056         return "Rsend";
00057       }
00058       else if (sendType == DISTRIBUTOR_SEND) {
00059         return "Send";
00060       }
00061       else if (sendType == DISTRIBUTOR_SSEND) {
00062         return "Ssend";
00063       }
00064       else {
00065         TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
00066           "EDistributorSendType enum value " << sendType << ".");
00067       }
00068     }
00069 
00070     std::string
00071     DistributorHowInitializedEnumToString (EDistributorHowInitialized how)
00072     {
00073       switch (how) {
00074       case Details::DISTRIBUTOR_NOT_INITIALIZED:
00075         return "Not initialized yet";
00076       case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
00077         return "By createFromSends";
00078       case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
00079         return "By createFromRecvs";
00080       case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
00081         return "By createReverseDistributor";
00082       case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
00083         return "By copy constructor";
00084       default:
00085         return "INVALID";
00086       }
00087     }
00088   } // namespace Details
00089 
00090   Array<std::string>
00091   distributorSendTypes ()
00092   {
00093     Array<std::string> sendTypes;
00094     sendTypes.push_back ("Isend");
00095     sendTypes.push_back ("Rsend");
00096     sendTypes.push_back ("Send");
00097     sendTypes.push_back ("Ssend");
00098     return sendTypes;
00099   }
00100 
00101   // We set default values of Distributor's Boolean parameters here,
00102   // in this one place.  That way, if we want to change the default
00103   // value of a parameter, we don't have to search the whole file to
00104   // ensure a consistent setting.
00105   namespace {
00106     // Default value of the "Debug" parameter.
00107     const bool tpetraDistributorDebugDefault = false;
00108     // Default value of the "Barrier between receives and sends" parameter.
00109     const bool barrierBetween_default = false;
00110     // Default value of the "Use distinct tags" parameter.
00111     const bool useDistinctTags_default = true;
00112     // Default value of the "Enable MPI CUDA RDMA support"
00113 #ifdef TPETRA_ENABLE_MPI_CUDA_RDMA
00114     const bool enable_cuda_rdma_default = true;
00115 #else
00116     const bool enable_cuda_rdma_default = false;
00117 #endif
00118   } // namespace (anonymous)
00119 
00120   int Distributor::getTag (const int pathTag) const {
00121     return useDistinctTags_ ? pathTag : comm_->getTag ();
00122   }
00123 
00124 
00125 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00126   void Distributor::makeTimers () {
00127     const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
00128     const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
00129     const std::string name_doWaits = "Tpetra::Distributor: doWaits";
00130     const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
00131     const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
00132     const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
00133     const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
00134     const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
00135     const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
00136 
00137     timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
00138     timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
00139     timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
00140     timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
00141     timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
00142     timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
00143     timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
00144     timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
00145     timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
00146   }
00147 #endif // TPETRA_DISTRIBUTOR_TIMERS
00148 
00149   void
00150   Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00151                      const Teuchos::RCP<Teuchos::ParameterList>& plist)
00152   {
00153     this->setVerbLevel (debug_ ? Teuchos::VERB_EXTREME : Teuchos::VERB_NONE);
00154     this->setOStream (out_);
00155     if (! plist.is_null ()) {
00156       // The input parameters may override the above verbosity level
00157       // setting, if there is a "VerboseObject" sublist.
00158       this->setParameterList (plist);
00159     }
00160 
00161 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00162     makeTimers ();
00163 #endif // TPETRA_DISTRIBUTOR_TIMERS
00164 
00165     if (debug_) {
00166       Teuchos::OSTab tab (out_);
00167       std::ostringstream os;
00168       os << comm_->getRank ()
00169          << ": Distributor ctor done" << std::endl;
00170       *out_ << os.str ();
00171     }
00172   }
00173 
00174   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
00175     : comm_ (comm)
00176     , out_  (Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)))
00177     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00178     , sendType_ (Details::DISTRIBUTOR_SEND)
00179     , barrierBetween_ (barrierBetween_default)
00180     , debug_ (tpetraDistributorDebugDefault)
00181     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00182     , numExports_ (0)
00183     , selfMessage_ (false)
00184     , numSends_ (0)
00185     , maxSendLength_ (0)
00186     , numReceives_ (0)
00187     , totalReceiveLength_ (0)
00188     , useDistinctTags_ (useDistinctTags_default)
00189   {
00190     init (comm, Teuchos::null);
00191   }
00192 
00193   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00194                             const Teuchos::RCP<Teuchos::FancyOStream>& out)
00195     : comm_ (comm)
00196     , out_ (out.is_null () ? Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out)
00197     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00198     , sendType_ (Details::DISTRIBUTOR_SEND)
00199     , barrierBetween_ (barrierBetween_default)
00200     , debug_ (tpetraDistributorDebugDefault)
00201     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00202     , numExports_ (0)
00203     , selfMessage_ (false)
00204     , numSends_ (0)
00205     , maxSendLength_ (0)
00206     , numReceives_ (0)
00207     , totalReceiveLength_ (0)
00208     , useDistinctTags_ (useDistinctTags_default)
00209   {
00210     init (comm, Teuchos::null);
00211   }
00212 
00213   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00214                             const Teuchos::RCP<Teuchos::ParameterList>& plist)
00215     : comm_ (comm)
00216     , out_ (Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)))
00217     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00218     , sendType_ (Details::DISTRIBUTOR_SEND)
00219     , barrierBetween_ (barrierBetween_default)
00220     , debug_ (tpetraDistributorDebugDefault)
00221     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00222     , numExports_ (0)
00223     , selfMessage_ (false)
00224     , numSends_ (0)
00225     , maxSendLength_ (0)
00226     , numReceives_ (0)
00227     , totalReceiveLength_ (0)
00228     , useDistinctTags_ (useDistinctTags_default)
00229   {
00230     init (comm, plist);
00231   }
00232 
00233   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00234                             const Teuchos::RCP<Teuchos::FancyOStream>& out,
00235                             const Teuchos::RCP<Teuchos::ParameterList>& plist)
00236     : comm_ (comm)
00237     , out_ (out.is_null () ? Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out)
00238     , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
00239     , sendType_ (Details::DISTRIBUTOR_SEND)
00240     , barrierBetween_ (barrierBetween_default)
00241     , debug_ (tpetraDistributorDebugDefault)
00242     , enable_cuda_rdma_ (enable_cuda_rdma_default)
00243     , numExports_ (0)
00244     , selfMessage_ (false)
00245     , numSends_ (0)
00246     , maxSendLength_ (0)
00247     , numReceives_ (0)
00248     , totalReceiveLength_ (0)
00249     , useDistinctTags_ (useDistinctTags_default)
00250   {
00251     init (comm, plist);
00252   }
00253 
00254   Distributor::Distributor (const Distributor & distributor)
00255     : comm_ (distributor.comm_)
00256     , out_ (distributor.out_)
00257     , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
00258     , sendType_ (distributor.sendType_)
00259     , barrierBetween_ (distributor.barrierBetween_)
00260     , debug_ (distributor.debug_)
00261     , enable_cuda_rdma_ (distributor.enable_cuda_rdma_)
00262     , numExports_ (distributor.numExports_)
00263     , selfMessage_ (distributor.selfMessage_)
00264     , numSends_ (distributor.numSends_)
00265     , imagesTo_ (distributor.imagesTo_)
00266     , startsTo_ (distributor.startsTo_)
00267     , lengthsTo_ (distributor.lengthsTo_)
00268     , maxSendLength_ (distributor.maxSendLength_)
00269     , indicesTo_ (distributor.indicesTo_)
00270     , numReceives_ (distributor.numReceives_)
00271     , totalReceiveLength_ (distributor.totalReceiveLength_)
00272     , lengthsFrom_ (distributor.lengthsFrom_)
00273     , imagesFrom_ (distributor.imagesFrom_)
00274     , startsFrom_ (distributor.startsFrom_)
00275     , indicesFrom_ (distributor.indicesFrom_)
00276     , reverseDistributor_ (distributor.reverseDistributor_)
00277     , useDistinctTags_ (distributor.useDistinctTags_)
00278   {
00279     using Teuchos::ParameterList;
00280     using Teuchos::parameterList;
00281     using Teuchos::RCP;
00282     using Teuchos::rcp;
00283 
00284     this->setVerbLevel (distributor.getVerbLevel ());
00285     this->setOStream (out_);
00286     // The input parameters may override the above verbosity level
00287     // setting, if there is a "VerboseObject" sublist.
00288     //
00289     // Clone the right-hand side's ParameterList, so that this' list
00290     // is decoupled from the right-hand side's list.  We don't need to
00291     // do validation, since the right-hand side already has validated
00292     // its parameters, so just call setMyParamList().  Note that this
00293     // won't work if the right-hand side doesn't have a list set yet,
00294     // so we first check for null.
00295     RCP<const ParameterList> rhsList = distributor.getParameterList ();
00296     if (! rhsList.is_null ()) {
00297       this->setMyParamList (parameterList (* rhsList));
00298     }
00299 
00300 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00301     makeTimers ();
00302 #endif // TPETRA_DISTRIBUTOR_TIMERS
00303 
00304     if (debug_) {
00305       Teuchos::OSTab tab (out_);
00306       std::ostringstream os;
00307       os << comm_->getRank ()
00308          << ": Distributor copy ctor done" << std::endl;
00309       *out_ << os.str ();
00310     }
00311   }
00312 
00313   void Distributor::swap (Distributor& rhs) {
00314     using Teuchos::ParameterList;
00315     using Teuchos::parameterList;
00316     using Teuchos::RCP;
00317 
00318     std::swap (comm_, rhs.comm_);
00319     std::swap (out_, rhs.out_);
00320     std::swap (howInitialized_, rhs.howInitialized_);
00321     std::swap (sendType_, rhs.sendType_);
00322     std::swap (barrierBetween_, rhs.barrierBetween_);
00323     std::swap (debug_, rhs.debug_);
00324     std::swap (enable_cuda_rdma_, rhs.enable_cuda_rdma_);
00325     std::swap (numExports_, rhs.numExports_);
00326     std::swap (selfMessage_, rhs.selfMessage_);
00327     std::swap (numSends_, rhs.numSends_);
00328     std::swap (imagesTo_, rhs.imagesTo_);
00329     std::swap (startsTo_, rhs.startsTo_);
00330     std::swap (lengthsTo_, rhs.lengthsTo_);
00331     std::swap (maxSendLength_, rhs.maxSendLength_);
00332     std::swap (indicesTo_, rhs.indicesTo_);
00333     std::swap (numReceives_, rhs.numReceives_);
00334     std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
00335     std::swap (lengthsFrom_, rhs.lengthsFrom_);
00336     std::swap (imagesFrom_, rhs.imagesFrom_);
00337     std::swap (startsFrom_, rhs.startsFrom_);
00338     std::swap (indicesFrom_, rhs.indicesFrom_);
00339     std::swap (reverseDistributor_, rhs.reverseDistributor_);
00340     std::swap (useDistinctTags_, rhs.useDistinctTags_);
00341 
00342     // Swap verbosity levels.
00343     const Teuchos::EVerbosityLevel lhsVerb = this->getVerbLevel ();
00344     const Teuchos::EVerbosityLevel rhsVerb = rhs.getVerbLevel ();
00345     this->setVerbLevel (rhsVerb);
00346     rhs.setVerbLevel (lhsVerb);
00347 
00348     // Swap output streams.  We've swapped out_ above, but we have to
00349     // tell the parent class VerboseObject about the swap.
00350     this->setOStream (out_);
00351     rhs.setOStream (rhs.out_);
00352 
00353     // Swap parameter lists.  If they are the same object, make a deep
00354     // copy first, so that modifying one won't modify the other one.
00355     RCP<ParameterList> lhsList = this->getNonconstParameterList ();
00356     RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
00357     if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
00358       rhsList = parameterList (*rhsList);
00359     }
00360     if (! rhsList.is_null ()) {
00361       this->setMyParamList (rhsList);
00362     }
00363     if (! lhsList.is_null ()) {
00364       rhs.setMyParamList (lhsList);
00365     }
00366 
00367     // We don't need to swap timers, because all instances of
00368     // Distributor use the same timers.
00369   }
00370 
00371   Distributor::~Distributor()
00372   {
00373     // We shouldn't have any outstanding communication requests at
00374     // this point.
00375     TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
00376       "Tpetra::Distributor: Destructor called with " << requests_.size()
00377       << " outstanding posts (unfulfilled communication requests).  There "
00378       "should be none at this point.  Please report this bug to the Tpetra "
00379       "developers.");
00380   }
00381 
00382   void
00383   Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
00384   {
00385     using Teuchos::FancyOStream;
00386     using Teuchos::getIntegralValue;
00387     using Teuchos::includesVerbLevel;
00388     using Teuchos::OSTab;
00389     using Teuchos::ParameterList;
00390     using Teuchos::parameterList;
00391     using Teuchos::RCP;
00392     using std::endl;
00393 
00394     RCP<const ParameterList> validParams = getValidParameters ();
00395     plist->validateParametersAndSetDefaults (*validParams);
00396 
00397     const bool barrierBetween =
00398       plist->get<bool> ("Barrier between receives and sends");
00399     const Details::EDistributorSendType sendType =
00400       getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
00401     const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
00402     const bool debug = plist->get<bool> ("Debug");
00403     const bool enable_cuda_rdma = plist->get<bool> ("Enable MPI CUDA RDMA support");
00404 
00405     // We check this property explicitly, since we haven't yet learned
00406     // how to make a validator that can cross-check properties.
00407     // Later, turn this into a validator so that it can be embedded in
00408     // the valid ParameterList and used in Optika.
00409     TEUCHOS_TEST_FOR_EXCEPTION(
00410       ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
00411       std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
00412       << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
00413       "between receives and sends." << endl << "This is invalid; you must "
00414       "include the barrier if you use ready sends." << endl << "Ready sends "
00415       "require that their corresponding receives have already been posted, "
00416       "and the only way to guarantee that in general is with a barrier.");
00417 
00418     if (plist->isSublist ("VerboseObject")) {
00419       // Read the "VerboseObject" sublist for (optional) verbosity
00420       // settings.  We've set defaults already in Distributor's
00421       // constructor, so we don't need this sublist to exist.
00422       Teuchos::readVerboseObjectSublist (&*plist, this);
00423     }
00424 
00425     // Now that we've validated the input list, save the results.
00426     sendType_ = sendType;
00427     barrierBetween_ = barrierBetween;
00428     useDistinctTags_ = useDistinctTags;
00429     debug_ = debug;
00430     enable_cuda_rdma_ = enable_cuda_rdma;
00431 
00432     // ParameterListAcceptor semantics require pointer identity of the
00433     // sublist passed to setParameterList(), so we save the pointer.
00434     this->setMyParamList (plist);
00435   }
00436 
00437   Teuchos::RCP<const Teuchos::ParameterList>
00438   Distributor::getValidParameters () const
00439   {
00440     using Teuchos::Array;
00441     using Teuchos::ParameterList;
00442     using Teuchos::parameterList;
00443     using Teuchos::RCP;
00444     using Teuchos::setStringToIntegralParameter;
00445 
00446     const bool barrierBetween = barrierBetween_default;
00447     const bool useDistinctTags = useDistinctTags_default;
00448     const bool debug = tpetraDistributorDebugDefault;
00449     const bool enable_cuda_rdma = enable_cuda_rdma_default;
00450 
00451     Array<std::string> sendTypes = distributorSendTypes ();
00452     const std::string defaultSendType ("Send");
00453     Array<Details::EDistributorSendType> sendTypeEnums;
00454     sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
00455     sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
00456     sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
00457     sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
00458 
00459     RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
00460     plist->set ("Barrier between receives and sends", barrierBetween,
00461                 "Whether to execute a barrier between receives and sends in do"
00462                 "[Reverse]Posts().  Required for correctness when \"Send type\""
00463                 "=\"Rsend\", otherwise correct but not recommended.");
00464     setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
00465       defaultSendType, "When using MPI, the variant of send to use in "
00466       "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
00467     plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
00468                 "MPI message tags for different code paths.");
00469     plist->set ("Debug", debug, "Whether to print copious debugging output on "
00470                 "all processes.");
00471     plist->set ("Enable MPI CUDA RDMA support", enable_cuda_rdma,
00472                 "Whether to enable RDMA support for MPI communication between "
00473                 "CUDA GPUs.  Only enable this if you know for sure your MPI "
00474                 "library supports it.");
00475 
00476     Teuchos::setupVerboseObjectSublist (&*plist);
00477     return Teuchos::rcp_const_cast<const ParameterList> (plist);
00478   }
00479 
00480 
00481   size_t Distributor::getTotalReceiveLength() const
00482   { return totalReceiveLength_; }
00483 
00484   size_t Distributor::getNumReceives() const
00485   { return numReceives_; }
00486 
00487   bool Distributor::hasSelfMessage() const
00488   { return selfMessage_; }
00489 
00490   size_t Distributor::getNumSends() const
00491   { return numSends_; }
00492 
00493   size_t Distributor::getMaxSendLength() const
00494   { return maxSendLength_; }
00495 
00496   Teuchos::ArrayView<const int> Distributor::getImagesFrom() const
00497   { return imagesFrom_; }
00498 
00499   Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
00500   { return lengthsFrom_; }
00501 
00502   Teuchos::ArrayView<const int> Distributor::getImagesTo() const
00503   { return imagesTo_; }
00504 
00505   Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
00506   { return lengthsTo_; }
00507 
00508   Teuchos::RCP<Distributor>
00509   Distributor::getReverse() const {
00510     if (reverseDistributor_.is_null ()) {
00511       createReverseDistributor ();
00512     }
00513     return reverseDistributor_;
00514   }
00515 
00516 
00517   void
00518   Distributor::createReverseDistributor() const
00519   {
00520     reverseDistributor_ = Teuchos::rcp (new Distributor (comm_));
00521 
00522     // The total length of all the sends of this Distributor.  We
00523     // calculate it because it's the total length of all the receives
00524     // of the reverse Distributor.
00525     size_t totalSendLength =
00526       std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
00527 
00528     // The maximum length of any of the receives of this Distributor.
00529     // We calculate it because it's the maximum length of any of the
00530     // sends of the reverse Distributor.
00531     size_t maxReceiveLength = 0;
00532     const int myImageID = comm_->getRank();
00533     for (size_t i=0; i < numReceives_; ++i) {
00534       if (imagesFrom_[i] != myImageID) {
00535         // Don't count receives for messages sent by myself to myself.
00536         if (lengthsFrom_[i] > maxReceiveLength) {
00537           maxReceiveLength = lengthsFrom_[i];
00538         }
00539       }
00540     }
00541 
00542     // Initialize all of reverseDistributor's data members.  This
00543     // mainly just involves flipping "send" and "receive," or the
00544     // equivalent "to" and "from."
00545     reverseDistributor_->lengthsTo_ = lengthsFrom_;
00546     reverseDistributor_->imagesTo_ = imagesFrom_;
00547     reverseDistributor_->indicesTo_ = indicesFrom_;
00548     reverseDistributor_->startsTo_ = startsFrom_;
00549     reverseDistributor_->lengthsFrom_ = lengthsTo_;
00550     reverseDistributor_->imagesFrom_ = imagesTo_;
00551     reverseDistributor_->indicesFrom_ = indicesTo_;
00552     reverseDistributor_->startsFrom_ = startsTo_;
00553     reverseDistributor_->numSends_ = numReceives_;
00554     reverseDistributor_->numReceives_ = numSends_;
00555     reverseDistributor_->selfMessage_ = selfMessage_;
00556     reverseDistributor_->maxSendLength_ = maxReceiveLength;
00557     reverseDistributor_->totalReceiveLength_ = totalSendLength;
00558     reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
00559 
00560     // Note: technically, I am my reverse distributor's reverse distributor, but
00561     //       we will not set this up, as it gives us an opportunity to test
00562     //       that reverseDistributor is an inverse operation w.r.t. value semantics of distributors
00563     // Note: numExports_ was not copied
00564   }
00565 
00566 
00567   void Distributor::doWaits() {
00568     using Teuchos::Array;
00569     using Teuchos::CommRequest;
00570     using Teuchos::FancyOStream;
00571     using Teuchos::includesVerbLevel;
00572     using Teuchos::is_null;
00573     using Teuchos::OSTab;
00574     using Teuchos::RCP;
00575     using Teuchos::waitAll;
00576     using std::endl;
00577 
00578     Teuchos::OSTab tab (out_);
00579 
00580 #ifdef TPETRA_DISTRIBUTOR_TIMERS
00581     Teuchos::TimeMonitor timeMon (*timer_doWaits_);
00582 #endif // TPETRA_DISTRIBUTOR_TIMERS
00583 
00584     const int myRank = comm_->getRank ();
00585 
00586     if (debug_) {
00587       std::ostringstream os;
00588       os << myRank << ": doWaits: # reqs = "
00589          << requests_.size () << endl;
00590       *out_ << os.str ();
00591     }
00592 
00593     if (requests_.size() > 0) {
00594       waitAll (*comm_, requests_());
00595 
00596 #ifdef HAVE_TEUCHOS_DEBUG
00597       // Make sure that waitAll() nulled out all the requests.
00598       for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
00599            it != requests_.end(); ++it)
00600       {
00601         TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
00602           Teuchos::typeName(*this) << "::doWaits(): Communication requests "
00603           "should all be null aftr calling Teuchos::waitAll() on them, but "
00604           "at least one request is not null.");
00605       }
00606 #endif // HAVE_TEUCHOS_DEBUG
00607       // Restore the invariant that requests_.size() is the number of
00608       // outstanding nonblocking communication requests.
00609       requests_.resize (0);
00610     }
00611 
00612 #ifdef HAVE_TEUCHOS_DEBUG
00613     {
00614       const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
00615       int globalSizeNonzero = 0;
00616       Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
00617                                     localSizeNonzero,
00618                                     Teuchos::outArg (globalSizeNonzero));
00619       TEUCHOS_TEST_FOR_EXCEPTION(
00620         globalSizeNonzero != 0, std::runtime_error,
00621         "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
00622         "a nonzero number of outstanding posts.  There should be none at this "
00623         "point.  Please report this bug to the Tpetra developers.");
00624     }
00625 #endif // HAVE_TEUCHOS_DEBUG
00626 
00627     if (debug_) {
00628       std::ostringstream os;
00629       os << myRank << ": doWaits done" << endl;
00630       *out_ << os.str ();
00631     }
00632   }
00633 
00634   void Distributor::doReverseWaits() {
00635     // call doWaits() on the reverse Distributor, if it exists
00636     if (! reverseDistributor_.is_null()) {
00637       reverseDistributor_->doWaits();
00638     }
00639   }
00640 
00641   std::string Distributor::description () const {
00642     std::ostringstream out;
00643 
00644     out << "\"Tpetra::Distributor\": {";
00645     const std::string label = this->getObjectLabel ();
00646     if (label != "") {
00647       out << "Label: " << label << ", ";
00648     }
00649     out << "How initialized: "
00650         << Details::DistributorHowInitializedEnumToString (howInitialized_)
00651         << ", Parameters: {"
00652         << "Send type: "
00653         << DistributorSendTypeEnumToString (sendType_)
00654         << ", Barrier between receives and sends: "
00655         << (barrierBetween_ ? "true" : "false")
00656         << ", Use distinct tags: "
00657         << (useDistinctTags_ ? "true" : "false")
00658         << ", Debug: " << (debug_ ? "true" : "false")
00659         << ", Enable MPI CUDA RDMA support: "
00660         << (enable_cuda_rdma_ ? "true" : "false")
00661         << "}}";
00662     return out.str ();
00663   }
00664 
00665   void
00666   Distributor::describe (Teuchos::FancyOStream &out,
00667                          const Teuchos::EVerbosityLevel verbLevel) const
00668   {
00669     using std::endl;
00670     using std::setw;
00671     using Teuchos::VERB_DEFAULT;
00672     using Teuchos::VERB_NONE;
00673     using Teuchos::VERB_LOW;
00674     using Teuchos::VERB_MEDIUM;
00675     using Teuchos::VERB_HIGH;
00676     using Teuchos::VERB_EXTREME;
00677     Teuchos::EVerbosityLevel vl = verbLevel;
00678     if (vl == VERB_DEFAULT) vl = VERB_LOW;
00679     const int myImageID = comm_->getRank();
00680     const int numImages = comm_->getSize();
00681     Teuchos::OSTab tab (out);
00682 
00683     if (vl == VERB_NONE) {
00684       return;
00685     } else {
00686       if (myImageID == 0) {
00687         // VERB_LOW and higher prints description() (on Proc 0 only).
00688         // We quote the class name because it contains colons:
00689         // quoting makes the output valid YAML.
00690         out << "\"Tpetra::Distributor\":" << endl;
00691         Teuchos::OSTab tab2 (out);
00692         const std::string label = this->getObjectLabel ();
00693         if (label != "") {
00694           out << "Label: " << label << endl;
00695         }
00696         out << "How initialized: "
00697             << Details::DistributorHowInitializedEnumToString (howInitialized_)
00698             << endl << "Parameters: " << endl;
00699         {
00700           Teuchos::OSTab tab3 (out);
00701           out << "\"Send type\": "
00702               << DistributorSendTypeEnumToString (sendType_) << endl
00703               << "\"Barrier between receives and sends\": "
00704               << (barrierBetween_ ? "true" : "false") << endl;
00705           out << "\"Use distinct tags\": "
00706               << (useDistinctTags_ ? "true" : "false") << endl;
00707           out << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
00708           out << "\"Enable MPI CUDA RDMA support\": " <<
00709             (enable_cuda_rdma_ ? "true" : "false") << endl;
00710         }
00711       }
00712       if (vl == VERB_LOW) {
00713         return;
00714       } else {
00715         Teuchos::OSTab tab2 (out);
00716         // vl > VERB_LOW lets each image print its data.  We assume
00717         // that all images can print to the given output stream, and
00718         // execute barriers to make it more likely that the output
00719         // will be in the right order.
00720         for (int imageCtr = 0; imageCtr < numImages; ++imageCtr) {
00721           if (myImageID == imageCtr) {
00722             if (myImageID == 0) {
00723               out << "Number of processes: " << numImages << endl;
00724             }
00725             out << "Process: " << myImageID << endl;
00726             Teuchos::OSTab tab3 (out);
00727             out << "selfMessage: " << hasSelfMessage () << endl;
00728             out << "numSends: " << getNumSends () << endl;
00729             if (vl == VERB_HIGH || vl == VERB_EXTREME) {
00730               out << "imagesTo: " << toString (imagesTo_) << endl;
00731               out << "lengthsTo: " << toString (lengthsTo_) << endl;
00732               out << "maxSendLength: " << getMaxSendLength () << endl;
00733             }
00734             if (vl == VERB_EXTREME) {
00735               out << "startsTo: " << toString (startsTo_) << endl;
00736               out << "indicesTo: " << toString (indicesTo_) << endl;
00737             }
00738             if (vl == VERB_HIGH || vl == VERB_EXTREME) {
00739               out << "numReceives: " << getNumReceives () << endl;
00740               out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
00741               out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
00742               out << "startsFrom: " << toString (startsFrom_) << endl;
00743               out << "imagesFrom: " << toString (imagesFrom_) << endl;
00744             }
00745             // Last output is a flush; it leaves a space and also
00746             // helps synchronize output.
00747             out << std::flush;
00748           } // if it's my image's turn to print
00749           // Execute barriers to give output time to synchronize.
00750           // One barrier generally isn't enough.
00751           comm_->barrier();
00752           comm_->barrier();
00753           comm_->barrier();
00754         } // for each image
00755       }
00756     }
00757   }
00758 
00759   void
00760   Distributor::computeReceives ()
00761   {
00762     using Teuchos::Array;
00763     using Teuchos::as;
00764     using Teuchos::CommStatus;
00765     using Teuchos::CommRequest;
00766     using Teuchos::ireceive;
00767     using Teuchos::RCP;
00768     using Teuchos::rcp;
00769     using Teuchos::REDUCE_SUM;
00770     using Teuchos::receive;
00771     using Teuchos::reduceAllAndScatter;
00772     using Teuchos::send;
00773     using Teuchos::waitAll;
00774     using std::endl;
00775 
00776     Teuchos::OSTab tab (out_);
00777     const int myRank = comm_->getRank();
00778     const int numProcs = comm_->getSize();
00779 
00780     // MPI tag for nonblocking receives and blocking sends in this method.
00781     const int pathTag = 2;
00782     const int tag = this->getTag (pathTag);
00783 
00784     if (debug_) {
00785       std::ostringstream os;
00786       os << myRank << ": computeReceives: "
00787         "{selfMessage_: " << (selfMessage_ ? "true" : "false")
00788          << ", tag: " << tag << "}" << endl;
00789       *out_ << os.str ();
00790     }
00791 
00792     // toNodesFromMe[i] == the number of messages sent by this process
00793     // to process i.  The data in numSends_, imagesTo_, and lengthsTo_
00794     // concern the contiguous sends.  Therefore, each process will be
00795     // listed in imagesTo_ at most once, and so toNodesFromMe[i] will
00796     // either be 0 or 1.
00797     {
00798       Array<int> toNodesFromMe (numProcs, 0);
00799 #ifdef HAVE_TEUCHOS_DEBUG
00800       bool counting_error = false;
00801 #endif // HAVE_TEUCHOS_DEBUG
00802       for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
00803 #ifdef HAVE_TEUCHOS_DEBUG
00804         if (toNodesFromMe[imagesTo_[i]] != 0) {
00805           counting_error = true;
00806         }
00807 #endif // HAVE_TEUCHOS_DEBUG
00808         toNodesFromMe[imagesTo_[i]] = 1;
00809       }
00810 #ifdef HAVE_TEUCHOS_DEBUG
00811       SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
00812         "Tpetra::Distributor::computeReceives: There was an error on at least "
00813         "one process in counting the number of messages send by that process to "
00814         "the other processs.  Please report this bug to the Tpetra developers.",
00815         *comm_);
00816 #endif // HAVE_TEUCHOS_DEBUG
00817 
00818       if (debug_) {
00819         std::ostringstream os;
00820         os << myRank << ": computeReceives: "
00821           "Calling reduceAllAndScatter" << endl;
00822         *out_ << os.str ();
00823       }
00824 
00825       // Compute the number of receives that this process needs to
00826       // post.  The number of receives includes any self sends (i.e.,
00827       // messages sent by this process to itself).
00828       //
00829       // (We will use numReceives_ this below to post exactly that
00830       // number of receives, with MPI_ANY_SOURCE as the sending rank.
00831       // This will tell us from which processes this process expects
00832       // to receive, and how many packets of data we expect to receive
00833       // from each process.)
00834       //
00835       // toNodesFromMe[i] is the number of messages sent by this
00836       // process to process i.  Compute the sum (elementwise) of all
00837       // the toNodesFromMe arrays on all processes in the
00838       // communicator.  If the array x is that sum, then if this
00839       // process has rank j, x[j] is the number of messages sent
00840       // to process j, that is, the number of receives on process j
00841       // (including any messages sent by process j to itself).
00842       //
00843       // Yes, this requires storing and operating on an array of
00844       // length P, where P is the number of processes in the
00845       // communicator.  Epetra does this too.  Avoiding this O(P)
00846       // memory bottleneck would require some research.
00847       //
00848       // In the (wrapped) MPI_Reduce_scatter call below, since the
00849       // counts array contains only ones, there is only one output on
00850       // each process, namely numReceives_ (which is x[j], in the
00851       // above notation).
00852       //
00853       // mfh 09 Jan 2012: The reduceAllAndScatter really isn't
00854       // necessary here.  Since counts is just all ones, we could
00855       // replace this with an all-reduce on toNodesFromMe, and let my
00856       // process (with rank myRank) get numReceives_ from
00857       // toNodesFromMe[myRank].  The HPCCG miniapp uses the all-reduce
00858       // method.  It could be possible that reduceAllAndScatter is
00859       // faster, but it also makes the code more complicated, and it
00860       // can't be _asymptotically_ faster (MPI_Allreduce has twice the
00861       // critical path length of MPI_Reduce, so reduceAllAndScatter
00862       // can't be more than twice as fast as the all-reduce, even if
00863       // the scatter is free).
00864       //
00865       // mfh 12 Apr 2013: See discussion in createFromSends() about
00866       // how we could use this communication to propagate an error
00867       // flag for "free" in a release build.
00868       Array<int> counts (numProcs, 1);
00869       int numReceivesAsInt = 0; // output
00870       reduceAllAndScatter<int, int> (*comm_, REDUCE_SUM, numProcs,
00871                                      toNodesFromMe.getRawPtr (),
00872                                      counts.getRawPtr (),
00873                                      &numReceivesAsInt);
00874       numReceives_ = Teuchos::as<size_t> (numReceivesAsInt);
00875     }
00876 
00877     // Now we know numReceives_, which is this process' number of
00878     // receives.  Allocate the lengthsFrom_ and imagesFrom_ arrays
00879     // with this number of entries.
00880     lengthsFrom_.assign (numReceives_, 0);
00881     imagesFrom_.assign (numReceives_, 0);
00882 
00883     //
00884     // Ask (via nonblocking receive) each process from which we are
00885     // receiving how many packets we should expect from it in the
00886     // communication pattern.
00887     //
00888 
00889     // At this point, numReceives_ includes any self message that
00890     // there may be.  At the end of this routine, we'll subtract off
00891     // the self message (if there is one) from numReceives_.  In this
00892     // routine, we don't need to receive a message from ourselves in
00893     // order to figure out our lengthsFrom_ and source process ID; we
00894     // can just ask ourselves directly.  Thus, the actual number of
00895     // nonblocking receives we post here does not include the self
00896     // message.
00897     const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
00898 
00899     // Teuchos' wrapper for nonblocking receives requires receive
00900     // buffers that it knows won't go away.  This is why we use RCPs,
00901     // one RCP per nonblocking receive request.  They get allocated in
00902     // the loop below.
00903     Array<RCP<CommRequest<int> > > requests (actualNumReceives);
00904     Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
00905     Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
00906 
00907     // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
00908     // (receive data from any process).
00909 #ifdef HAVE_MPI
00910     const int anySourceProc = MPI_ANY_SOURCE;
00911 #else
00912     const int anySourceProc = -1;
00913 #endif
00914 
00915     if (debug_) {
00916       std::ostringstream os;
00917       os << myRank << ": computeReceives: Posting "
00918          << actualNumReceives << " irecvs" << endl;
00919       *out_ << os.str ();
00920     }
00921 
00922     // Post the (nonblocking) receives.
00923     for (size_t i = 0; i < actualNumReceives; ++i) {
00924       // Once the receive completes, we can ask the corresponding
00925       // CommStatus object (output by wait()) for the sending process'
00926       // ID (which we'll assign to imagesFrom_[i] -- don't forget to
00927       // do that!).
00928       lengthsFromBuffers[i].resize (1);
00929       lengthsFromBuffers[i][0] = as<size_t> (0);
00930       requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
00931       if (debug_) {
00932         std::ostringstream os;
00933         os << myRank << ": computeReceives: "
00934           "Posted any-proc irecv w/ specified tag " << tag << endl;
00935         *out_ << os.str ();
00936       }
00937     }
00938 
00939     if (debug_) {
00940       std::ostringstream os;
00941       os << myRank << ": computeReceives: "
00942         "posting " << numSends_ << " sends" << endl;
00943       *out_ << os.str ();
00944     }
00945     // Post the sends: Tell each process to which we are sending how
00946     // many packets it should expect from us in the communication
00947     // pattern.  We could use nonblocking sends here, as long as we do
00948     // a waitAll() on all the sends and receives at once.
00949     //
00950     // We assume that numSends_ and selfMessage_ have already been
00951     // set.  The value of numSends_ (my process' number of sends) does
00952     // not include any message that it might send to itself.
00953     for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
00954       if (imagesTo_[i] != myRank) {
00955         // Send a message to imagesTo_[i], telling that process that
00956         // this communication pattern will send that process
00957         // lengthsTo_[i] blocks of packets.
00958         const size_t* const lengthsTo_i = &lengthsTo_[i];
00959         send<int, size_t> (lengthsTo_i, 1, as<int> (imagesTo_[i]), tag, *comm_);
00960         if (debug_) {
00961           std::ostringstream os;
00962           os << myRank << ": computeReceives: "
00963             "Posted send to Proc " << imagesTo_[i] << " w/ specified tag "
00964              << tag << endl;
00965           *out_ << os.str ();
00966         }
00967       }
00968       else {
00969         // We don't need a send in the self-message case.  If this
00970         // process will send a message to itself in the communication
00971         // pattern, then the last element of lengthsFrom_ and
00972         // imagesFrom_ corresponds to the self-message.  Of course
00973         // this process knows how long the message is, and the process
00974         // ID is its own process ID.
00975         lengthsFrom_[numReceives_-1] = lengthsTo_[i];
00976         imagesFrom_[numReceives_-1] = myRank;
00977       }
00978     }
00979 
00980     if (debug_) {
00981       std::ostringstream os;
00982       os << myRank << ": computeReceives: waitAll on "
00983          << requests.size () << " requests" << endl;
00984       *out_ << os.str ();
00985     }
00986     //
00987     // Wait on all the receives.  When they arrive, check the status
00988     // output of wait() for the receiving process ID, unpack the
00989     // request buffers into lengthsFrom_, and set imagesFrom_ from the
00990     // status.
00991     //
00992     waitAll (*comm_, requests (), statuses ());
00993     for (size_t i = 0; i < actualNumReceives; ++i) {
00994       lengthsFrom_[i] = *lengthsFromBuffers[i];
00995       imagesFrom_[i] = statuses[i]->getSourceRank ();
00996     }
00997 
00998     // Sort the imagesFrom_ array, and apply the same permutation to
00999     // lengthsFrom_.  This ensures that imagesFrom_[i] and
01000     // lengthsFrom_[i] refers to the same thing.
01001     sort2 (imagesFrom_.begin(), imagesFrom_.end(), lengthsFrom_.begin());
01002 
01003     // Compute indicesFrom_
01004     totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
01005     indicesFrom_.clear ();
01006     indicesFrom_.reserve (totalReceiveLength_);
01007     for (size_t i = 0; i < totalReceiveLength_; ++i) {
01008       indicesFrom_.push_back(i);
01009     }
01010 
01011     startsFrom_.clear ();
01012     startsFrom_.reserve (numReceives_);
01013     for (size_t i = 0, j = 0; i < numReceives_; ++i) {
01014       startsFrom_.push_back(j);
01015       j += lengthsFrom_[i];
01016     }
01017 
01018     if (selfMessage_) {
01019       --numReceives_;
01020     }
01021 
01022     if (debug_) {
01023       std::ostringstream os;
01024       os << myRank << ": computeReceives: done" << endl;
01025       *out_ << os.str ();
01026     }
01027   }
01028 
01029   size_t
01030   Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportNodeIDs)
01031   {
01032     using Teuchos::outArg;
01033     using Teuchos::REDUCE_MAX;
01034     using Teuchos::reduceAll;
01035     using std::endl;
01036 
01037     Teuchos::OSTab tab (out_);
01038 
01039     numExports_ = exportNodeIDs.size();
01040 
01041     const int myImageID = comm_->getRank();
01042     const int numImages = comm_->getSize();
01043     if (debug_) {
01044       std::ostringstream os;
01045       os << myImageID << ": createFromSends" << endl;
01046       *out_ << os.str ();
01047     }
01048 
01049     // exportNodeIDs tells us the communication pattern for this
01050     // distributor.  It dictates the way that the export data will be
01051     // interpreted in doPosts().  We want to perform at most one
01052     // send per process in doPosts; this is for two reasons:
01053     //   * minimize latency / overhead in the comm routines (nice)
01054     //   * match the number of receives and sends between processes
01055     //     (necessary)
01056     //
01057     // Teuchos::Comm requires that the data for a send are contiguous
01058     // in a send buffer.  Therefore, if the data in the send buffer
01059     // for doPosts() are not contiguous, they will need to be copied
01060     // into a contiguous buffer.  The user has specified this
01061     // noncontiguous pattern and we can't do anything about it.
01062     // However, if they do not provide an efficient pattern, we will
01063     // warn them if one of the following compile-time options has been
01064     // set:
01065     //   * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
01066     //   * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
01067     //
01068     // If the data are contiguous, then we can post the sends in situ
01069     // (i.e., without needing to copy them into a send buffer).
01070     //
01071     // Determine contiguity. There are a number of ways to do this:
01072     // * If the export IDs are sorted, then all exports to a
01073     //   particular node must be contiguous. This is what Epetra does.
01074     // * If the export ID of the current export already has been
01075     //   listed, then the previous listing should correspond to the
01076     //   same export.  This tests contiguity, but not sortedness.
01077     //
01078     // Both of these tests require O(n), where n is the number of
01079     // exports. However, the latter will positively identify a greater
01080     // portion of contiguous patterns.  We use the latter method.
01081     //
01082     // Check to see if values are grouped by images without gaps
01083     // If so, indices_to -> 0.
01084 
01085     // Set up data structures for quick traversal of arrays.
01086     // This contains the number of sends for each process ID.
01087     //
01088     // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
01089     // that create an array of length the number of processes in the
01090     // communicator (plus one).  Given how this code uses this array,
01091     // it should be straightforward to replace it with a hash table or
01092     // some other more space-efficient data structure.  In practice,
01093     // most of the entries of starts should be zero for a sufficiently
01094     // large process count, unless the communication pattern is dense.
01095     // Note that it's important to be able to iterate through keys (i
01096     // for which starts[i] is nonzero) in increasing order.
01097     Teuchos::Array<size_t> starts (numImages + 1, 0);
01098 
01099     // numActive is the number of sends that are not Null
01100     size_t numActive = 0;
01101     int needSendBuff = 0; // Boolean
01102 
01103 #ifdef HAVE_TPETRA_DEBUG
01104     int badID = -1; // only used in a debug build
01105 #endif // HAVE_TPETRA_DEBUG
01106     for (size_t i = 0; i < numExports_; ++i) {
01107       const int exportID = exportNodeIDs[i];
01108       if (exportID >= numImages) {
01109 #ifdef HAVE_TPETRA_DEBUG
01110         badID = myImageID;
01111 #endif // HAVE_TPETRA_DEBUG
01112         break;
01113       }
01114       else if (exportID >= 0) {
01115         // exportID is a valid process ID.  Increment the number of
01116         // messages this process will send to that process.
01117         ++starts[exportID];
01118 
01119         // If we're sending more than one message to process exportID,
01120         // then it is possible that the data are not contiguous.
01121         // Check by seeing if the previous process ID in the list
01122         // (exportNodeIDs[i-1]) is the same.  It's safe to use i-1,
01123         // because if starts[exportID] > 1, then i must be > 1 (since
01124         // the starts array was filled with zeros initially).
01125 
01126         // null entries break continuity.
01127         // e.g.,  [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
01128         if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportNodeIDs[i-1]) {
01129           needSendBuff = 1;
01130         }
01131         ++numActive;
01132       }
01133     }
01134 
01135 #ifdef HAVE_TPETRA_DEBUG
01136     // Test whether any process in the communicator got an invalid
01137     // process ID.  If badID != -1 on this process, then it equals
01138     // this process' rank.  The max of all badID over all processes is
01139     // the max rank which has an invalid process ID.
01140     {
01141       int gbl_badID;
01142       reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
01143       TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
01144         Teuchos::typeName(*this) << "::createFromSends(): Process  " << gbl_badID
01145         << ", perhaps among other processes, got a bad send process ID.");
01146     }
01147 #else
01148     // FIXME (mfh 12 Apr 2013) Rather than simply ignoring this
01149     // information, we should think about how to pass it along so that
01150     // all the processes find out about it.  In a release build with
01151     // efficiency warnings turned off, the next communication happens
01152     // in computeReceives(), in the reduceAllAndScatter
01153     // (MPI_Reduce_scatter).  We could figure out how to encode the
01154     // error flag in that operation, for example by replacing it with
01155     // a reduceAll (MPI_Allreduce) as described there, and adding an
01156     // extra element to the array that encodes the error condition
01157     // (zero on all processes if no error, else 1 on any process with
01158     // the error, so that the sum will produce a nonzero value if any
01159     // process had an error).  I'll defer this change for now and
01160     // recommend instead that people with troubles try a debug build.
01161 #endif // HAVE_TPETRA_DEBUG
01162 
01163 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
01164     {
01165       int global_needSendBuff;
01166       reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
01167                             outArg (global_needSendBuff));
01168       TPETRA_EFFICIENCY_WARNING(
01169         global_needSendBuff != 0, std::runtime_error,
01170         "::createFromSends: Grouping export IDs together by process rank often "
01171         "improves performance.");
01172     }
01173 #endif
01174 
01175     // Determine from the caller's data whether or not the current
01176     // process should send (a) message(s) to itself.
01177     if (starts[myImageID] != 0) {
01178       selfMessage_ = true;
01179     }
01180     else {
01181       selfMessage_ = false;
01182     }
01183 
01184 #ifdef HAVE_TEUCHOS_DEBUG
01185     bool index_neq_numActive = false;
01186     bool send_neq_numSends = false;
01187 #endif
01188     if (! needSendBuff) {
01189       // grouped by image, no send buffer or indicesTo_ needed
01190       numSends_ = 0;
01191       // Count total number of sends, i.e., total number of images to
01192       // which we are sending.  This includes myself, if applicable.
01193       for (int i = 0; i < numImages; ++i) {
01194         if (starts[i]) {
01195           ++numSends_;
01196         }
01197       }
01198 
01199       // Not only do we not need these, but we must clear them, as
01200       // empty status of indicesTo is a flag used later.
01201       indicesTo_.resize(0);
01202       // Size these to numSends_; note, at the moment, numSends_
01203       // includes self sends.  Set their values to zeros.
01204       imagesTo_.assign(numSends_,0);
01205       startsTo_.assign(numSends_,0);
01206       lengthsTo_.assign(numSends_,0);
01207 
01208       // set startsTo to the offset for each send (i.e., each image ID)
01209       // set imagesTo to the image ID for each send
01210       // in interpreting this code, remember that we are assuming contiguity
01211       // that is why index skips through the ranks
01212       {
01213         size_t index = 0, nodeIndex = 0;
01214         for (size_t i = 0; i < numSends_; ++i) {
01215           while (exportNodeIDs[nodeIndex] < 0) {
01216             ++nodeIndex; // skip all negative node IDs
01217           }
01218           startsTo_[i] = nodeIndex;
01219           int imageID = exportNodeIDs[nodeIndex];
01220           imagesTo_[i] = imageID;
01221           index     += starts[imageID];
01222           nodeIndex += starts[imageID];
01223         }
01224 #ifdef HAVE_TEUCHOS_DEBUG
01225         if (index != numActive) {
01226           index_neq_numActive = true;
01227         }
01228 #endif
01229       }
01230       // sort the startsTo and image IDs together, in ascending order, according
01231       // to image IDs
01232       if (numSends_ > 0) {
01233         sort2(imagesTo_.begin(), imagesTo_.end(), startsTo_.begin());
01234       }
01235       // compute the maximum send length
01236       maxSendLength_ = 0;
01237       for (size_t i = 0; i < numSends_; ++i) {
01238         int imageID = imagesTo_[i];
01239         lengthsTo_[i] = starts[imageID];
01240         if ((imageID != myImageID) && (lengthsTo_[i] > maxSendLength_)) {
01241           maxSendLength_ = lengthsTo_[i];
01242         }
01243       }
01244     }
01245     else {
01246       // not grouped by image, need send buffer and indicesTo_
01247 
01248       // starts[i] is the number of sends to node i
01249       // numActive equals number of sends total, \sum_i starts[i]
01250 
01251       // this loop starts at starts[1], so explicitly check starts[0]
01252       if (starts[0] == 0 ) {
01253         numSends_ = 0;
01254       }
01255       else {
01256         numSends_ = 1;
01257       }
01258       for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
01259                                             im1=starts.begin();
01260            i != starts.end(); ++i)
01261       {
01262         if (*i != 0) ++numSends_;
01263         *i += *im1;
01264         im1 = i;
01265       }
01266       // starts[i] now contains the number of exports to nodes 0 through i
01267 
01268       for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
01269                                                       i=starts.rbegin()+1;
01270            i != starts.rend(); ++i)
01271       {
01272         *ip1 = *i;
01273         ip1 = i;
01274       }
01275       starts[0] = 0;
01276       // starts[i] now contains the number of exports to nodes 0 through
01277       // i-1, i.e., all nodes before node i
01278 
01279       indicesTo_.resize(numActive);
01280 
01281       for (size_t i = 0; i < numExports_; ++i) {
01282         if (exportNodeIDs[i] >= 0) {
01283           // record the offset to the sendBuffer for this export
01284           indicesTo_[starts[exportNodeIDs[i]]] = i;
01285           // now increment the offset for this node
01286           ++starts[exportNodeIDs[i]];
01287         }
01288       }
01289       // our send buffer will contain the export data for each of the nodes
01290       // we communicate with, in order by node id
01291       // sendBuffer = {node_0_data, node_1_data, ..., node_np-1_data}
01292       // indicesTo now maps each export to the location in our send buffer
01293       // associated with the export
01294       // data for export i located at sendBuffer[indicesTo[i]]
01295       //
01296       // starts[i] once again contains the number of exports to
01297       // nodes 0 through i
01298       for (int node = numImages-1; node != 0; --node) {
01299         starts[node] = starts[node-1];
01300       }
01301       starts.front() = 0;
01302       starts[numImages] = numActive;
01303       //
01304       // starts[node] once again contains the number of exports to
01305       // nodes 0 through node-1
01306       // i.e., the start of my data in the sendBuffer
01307 
01308       // this contains invalid data at nodes we don't care about, that is okay
01309       imagesTo_.resize(numSends_);
01310       startsTo_.resize(numSends_);
01311       lengthsTo_.resize(numSends_);
01312 
01313       // for each group of sends/exports, record the destination node,
01314       // the length, and the offset for this send into the
01315       // send buffer (startsTo_)
01316       maxSendLength_ = 0;
01317       size_t snd = 0;
01318       for (int node = 0; node < numImages; ++node ) {
01319         if (starts[node+1] != starts[node]) {
01320           lengthsTo_[snd] = starts[node+1] - starts[node];
01321           startsTo_[snd] = starts[node];
01322           // record max length for all off-node sends
01323           if ((node != myImageID) && (lengthsTo_[snd] > maxSendLength_)) {
01324             maxSendLength_ = lengthsTo_[snd];
01325           }
01326           imagesTo_[snd] = node;
01327           ++snd;
01328         }
01329       }
01330 #ifdef HAVE_TEUCHOS_DEBUG
01331       if (snd != numSends_) {
01332         send_neq_numSends = true;
01333       }
01334 #endif
01335     }
01336 #ifdef HAVE_TEUCHOS_DEBUG
01337         SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
01338             "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
01339         SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
01340             "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
01341 #endif
01342 
01343     if (selfMessage_) --numSends_;
01344 
01345     // Invert map to see what msgs are received and what length
01346     computeReceives();
01347 
01348     if (debug_) {
01349       std::ostringstream os;
01350       os << myImageID << ": createFromSends: done" << endl;
01351       *out_ << os.str ();
01352     }
01353 
01354     // createFromRecvs() calls createFromSends(), but will set
01355     // howInitialized_ again after calling createFromSends().
01356     howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
01357 
01358     return totalReceiveLength_;
01359   }
01360 
01361 } // namespace Tpetra
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines