Tpetra Matrix/Vector Services Version of the Day
Tpetra_Distributor.cpp
00001 // @HEADER
00002 // ***********************************************************************
00003 //
00004 //          Tpetra: Templated Linear Algebra Services Package
00005 //                 Copyright (2008) Sandia Corporation
00006 //
00007 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00008 // the U.S. Government retains certain rights in this software.
00009 //
00010 // Redistribution and use in source and binary forms, with or without
00011 // modification, are permitted provided that the following conditions are
00012 // met:
00013 //
00014 // 1. Redistributions of source code must retain the above copyright
00015 // notice, this list of conditions and the following disclaimer.
00016 //
00017 // 2. Redistributions in binary form must reproduce the above copyright
00018 // notice, this list of conditions and the following disclaimer in the
00019 // documentation and/or other materials provided with the distribution.
00020 //
00021 // 3. Neither the name of the Corporation nor the names of the
00022 // contributors may be used to endorse or promote products derived from
00023 // this software without specific prior written permission.
00024 //
00025 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00026 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00027 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00028 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00029 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00030 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00031 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00032 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00033 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00034 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00035 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00036 //
00037 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
00038 //
00039 // ************************************************************************
00040 // @HEADER
00041 
00042 #include "Tpetra_Distributor.hpp"
00043 #include "Teuchos_StandardParameterEntryValidators.hpp"
00044 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
00045 
00046 namespace Tpetra {
00047 
00048   Array<std::string>
00049   distributorSendTypes ()
00050   {
00051     Array<std::string> sendTypes;
00052     sendTypes.push_back ("Isend");
00053     sendTypes.push_back ("Rsend");
00054     sendTypes.push_back ("Send");
00055     sendTypes.push_back ("Ssend");
00056     return sendTypes;
00057   }
00058 
00059   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> > &comm)
00060     : comm_(comm)
00061     , sendType_ (DISTRIBUTOR_SEND)
00062     , barrierBetween_ (true)
00063     , numExports_(0)
00064     , selfMessage_(false)
00065     , numSends_(0)
00066     , maxSendLength_(0)
00067     , numReceives_(0)
00068     , totalReceiveLength_(0)
00069   {
00070     using Teuchos::getFancyOStream;
00071     using Teuchos::oblackholestream;
00072     using Teuchos::rcp;
00073 
00074     // Always start by making sure the Distributor won't print anything.
00075     this->setVerbLevel (Teuchos::VERB_NONE);
00076     this->setOStream (getFancyOStream (rcp (new oblackholestream)));
00077   }
00078 
00079   Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00080                             const Teuchos::RCP<Teuchos::ParameterList>& plist)
00081     : comm_(comm)
00082     , sendType_ (DISTRIBUTOR_SEND)
00083     , barrierBetween_ (true)
00084     , numExports_(0)
00085     , selfMessage_(false)
00086     , numSends_(0)
00087     , maxSendLength_(0)
00088     , numReceives_(0)
00089     , totalReceiveLength_(0)
00090   {
00091     TEUCHOS_TEST_FOR_EXCEPTION (plist.is_null(), std::invalid_argument, "The "
00092       "two-argument Distributor constructor requires that the input "
00093       "RCP<ParameterList> be nonnull.  If you don't know what parameters to "
00094       "set, you can either call the one-argument constructor, or supply a "
00095       "nonnull but empty ParameterList.  Both of these options will set default "
00096       "parameters.");
00097     using Teuchos::getFancyOStream;
00098     using Teuchos::oblackholestream;
00099     using Teuchos::rcp;
00100 
00101     // Always start by making sure the Distributor won't print anything.
00102     this->setVerbLevel (Teuchos::VERB_NONE);
00103     this->setOStream (getFancyOStream (rcp (new oblackholestream)));
00104     // Setting parameters may override these, if there is a
00105     // "VerboseObject" sublist.
00106     this->setParameterList (plist);
00107   }
00108 
00109   Distributor::Distributor (const Distributor & distributor)
00110     : comm_(distributor.comm_)
00111     , sendType_ (distributor.sendType_)
00112     , barrierBetween_ (distributor.barrierBetween_)
00113     , numExports_(distributor.numExports_)
00114     , selfMessage_(distributor.selfMessage_)
00115     , numSends_(distributor.numSends_)
00116     , maxSendLength_(distributor.maxSendLength_)
00117     , numReceives_(distributor.numReceives_)
00118     , totalReceiveLength_(distributor.totalReceiveLength_)
00119     , reverseDistributor_(distributor.reverseDistributor_)
00120   {
00121     using Teuchos::getFancyOStream;
00122     using Teuchos::oblackholestream;
00123     using Teuchos::ParameterList;
00124     using Teuchos::parameterList;
00125     using Teuchos::RCP;
00126     using Teuchos::rcp;
00127 
00128     this->setVerbLevel (Teuchos::VERB_NONE);
00129     this->setOStream (getFancyOStream (rcp (new oblackholestream)));
00130 
00131     // Clone the right-hand side's ParameterList, so that this' list
00132     // is decoupled from the right-hand side's list.  We don't need to
00133     // do validation, since the right-hand side already has validated
00134     // its parameters, so just call setMyParamList().  Note that this
00135     // won't work if the right-hand side doesn't have a list set yet,
00136     // so we first check for null.
00137     RCP<const ParameterList> rhsList = distributor.getParameterList ();
00138     if (! rhsList.is_null ()) {
00139       this->setMyParamList (parameterList (* rhsList));
00140     }
00141   }
00142 
00143   Distributor::~Distributor()
00144   {
00145     // We shouldn't have any outstanding communication requests at
00146     // this point.
00147     TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
00148       "Tpetra::Distributor: Destructor called with " << requests_.size()
00149       << " outstanding posts (unfulfilled communication requests).  There "
00150       "should be none at this point.  Please report this bug to the Tpetra "
00151       "developers.");
00152   }
00153 
00154   void
00155   Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
00156   {
00157     using Teuchos::FancyOStream;
00158     using Teuchos::getIntegralValue;
00159     using Teuchos::includesVerbLevel;
00160     using Teuchos::OSTab;
00161     using Teuchos::ParameterList;
00162     using Teuchos::parameterList;
00163     using Teuchos::RCP;
00164     using std::endl;
00165 
00166     RCP<const ParameterList> validParams = getValidParameters ();
00167     plist->validateParametersAndSetDefaults (*validParams);
00168 
00169     const bool barrierBetween =
00170       plist->get<bool> ("Barrier between receives and sends");
00171     const EDistributorSendType sendType =
00172       getIntegralValue<EDistributorSendType> (*plist, "Send type");
00173 
00174     // We check this property explicitly, since we haven't yet learned
00175     // how to make a validator that can cross-check properties.
00176     // Later, turn this into a validator so that it can be embedded in
00177     // the valid ParameterList and used in Optika.
00178     TEUCHOS_TEST_FOR_EXCEPTION(! barrierBetween && sendType == DISTRIBUTOR_RSEND,
00179       std::invalid_argument, "If you use ready sends, you must include a "
00180       "barrier between receives and sends.  Ready sends require that their "
00181       "corresponding receives have already been posted, and the only way to "
00182       "guarantee that in general is with a barrier.");
00183 
00184     if (plist->isSublist ("VerboseObject")) {
00185       // Read the "VerboseObject" sublist for (optional) verbosity
00186       // settings.  We've set defaults already in Distributor's
00187       // constructor, so we don't need this sublist to exist.
00188       Teuchos::readVerboseObjectSublist (&*plist, this);
00189     }
00190 
00191     // Now that we've validated the input list, save the results.
00192     sendType_ = sendType;
00193     barrierBetween_ = barrierBetween;
00194 
00195 #ifdef HAVE_TEUCHOS_DEBUG
00196     // Prepare for verbose output, if applicable.
00197     Teuchos::EVerbosityLevel verbLevel = this->getVerbLevel ();
00198     RCP<FancyOStream> out = this->getOStream ();
00199     const int myRank = comm_->getRank ();
00200     // We only want one process to print verbose output here.
00201     const bool doPrint = out.get () && (myRank == 0) &&
00202       includesVerbLevel (verbLevel, Teuchos::VERB_EXTREME, true);
00203 
00204     if (doPrint) {
00205       *out << "Distributor::setParameterList" << endl;
00206       OSTab tab = this->getOSTab(); // Add one tab level
00207       *out << "sendType_=" << DistributorSendTypeEnumToString (sendType_)
00208            << ", barrierBetween_=" << barrierBetween_ << endl;
00209     }
00210 #endif // HAVE_TEUCHOS_DEBUG
00211 
00212     // ParameterListAcceptor semantics require pointer identity of the
00213     // sublist passed to setParameterList(), so we save the pointer.
00214     this->setMyParamList (plist);
00215   }
00216 
00217   Teuchos::RCP<const Teuchos::ParameterList>
00218   Distributor::getValidParameters () const
00219   {
00220     using Teuchos::Array;
00221     using Teuchos::ParameterList;
00222     using Teuchos::parameterList;
00223     using Teuchos::RCP;
00224     using Teuchos::setStringToIntegralParameter;
00225 
00226     const bool barrierBetween = false;
00227 
00228     Array<std::string> sendTypes = distributorSendTypes ();
00229     const std::string defaultSendType ("Send");
00230     Array<EDistributorSendType> sendTypeEnums;
00231     sendTypeEnums.push_back (DISTRIBUTOR_ISEND);
00232     sendTypeEnums.push_back (DISTRIBUTOR_RSEND);
00233     sendTypeEnums.push_back (DISTRIBUTOR_SEND);
00234     sendTypeEnums.push_back (DISTRIBUTOR_SSEND);
00235 
00236     RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
00237     plist->set ("Barrier between receives and sends", barrierBetween,
00238                 "Whether to execute a barrier between receives and sends in do"
00239                 "[Reverse]Posts().  Required for correctness when \"Send type\""
00240                 "=\"Rsend\", otherwise correct but not recommended.");
00241     setStringToIntegralParameter<EDistributorSendType> ("Send type",
00242       defaultSendType, "When using MPI, the variant of MPI_Send to use in "
00243       "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
00244 
00245     Teuchos::setupVerboseObjectSublist (&*plist);
00246     return Teuchos::rcp_const_cast<const ParameterList> (plist);
00247   }
00248 
00249 
00250   size_t Distributor::getTotalReceiveLength() const
00251   { return totalReceiveLength_; }
00252 
00253   size_t Distributor::getNumReceives() const
00254   { return numReceives_; }
00255 
00256   bool Distributor::hasSelfMessage() const
00257   { return selfMessage_; }
00258 
00259   size_t Distributor::getNumSends() const
00260   { return numSends_; }
00261 
00262   size_t Distributor::getMaxSendLength() const
00263   { return maxSendLength_; }
00264 
00265   Teuchos::ArrayView<const int> Distributor::getImagesFrom() const
00266   { return imagesFrom_; }
00267 
00268   Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
00269   { return lengthsFrom_; }
00270 
00271   Teuchos::ArrayView<const int> Distributor::getImagesTo() const
00272   { return imagesTo_; }
00273 
00274   Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
00275   { return lengthsTo_; }
00276 
00277   const Teuchos::RCP<Distributor> &
00278   Distributor::getReverse() const {
00279     if (reverseDistributor_ == Teuchos::null) {
00280       // need to create reverse distributor
00281       createReverseDistributor();
00282     }
00283     return reverseDistributor_;
00284   }
00285 
00286 
00287   void Distributor::createReverseDistributor() const {
00288 
00289     reverseDistributor_ = Teuchos::rcp(new Distributor(comm_));
00290 
00291     // The total length of all the sends of this Distributor.  We
00292     // calculate it because it's the total length of all the receives
00293     // of the reverse Distributor.
00294     size_t totalSendLength = std::accumulate(lengthsTo_.begin(),lengthsTo_.end(),0);
00295 
00296     // The maximum length of any of the receives of this Distributor.
00297     // We calculate it because it's the maximum length of any of the
00298     // sends of the reverse Distributor.
00299     size_t maxReceiveLength = 0;
00300     const int myImageID = comm_->getRank();
00301     for (size_t i=0; i < numReceives_; ++i) {
00302       if (imagesFrom_[i] != myImageID) {
00303         // Don't count receives for messages sent by myself to myself.
00304         if (lengthsFrom_[i] > maxReceiveLength) {
00305           maxReceiveLength = lengthsFrom_[i];
00306         }
00307       }
00308     }
00309 
00310     // Initialize all of reverseDistributor's data members.  This
00311     // mainly just involves flipping "send" and "receive," or the
00312     // equivalent "to" and "from."
00313     reverseDistributor_->lengthsTo_ = lengthsFrom_;
00314     reverseDistributor_->imagesTo_ = imagesFrom_;
00315     reverseDistributor_->indicesTo_ = indicesFrom_;
00316     reverseDistributor_->startsTo_ = startsFrom_;
00317     reverseDistributor_->lengthsFrom_ = lengthsTo_;
00318     reverseDistributor_->imagesFrom_ = imagesTo_;
00319     reverseDistributor_->indicesFrom_ = indicesTo_;
00320     reverseDistributor_->startsFrom_ = startsTo_;
00321     reverseDistributor_->numSends_ = numReceives_;
00322     reverseDistributor_->numReceives_ = numSends_;
00323     reverseDistributor_->selfMessage_ = selfMessage_;
00324     reverseDistributor_->maxSendLength_ = maxReceiveLength;
00325     reverseDistributor_->totalReceiveLength_ = totalSendLength;
00326     // Note: technically, I am my reverse distributor's reverse distributor, but
00327     //       we will not set this up, as it gives us an opportunity to test
00328     //       that reverseDistributor is an inverse operation w.r.t. value semantics of distributors
00329     // Note: numExports_ was not copied
00330   }
00331 
00332 
00333   void Distributor::doWaits() {
00334     using Teuchos::FancyOStream;
00335     using Teuchos::includesVerbLevel;
00336     using Teuchos::is_null;
00337     using Teuchos::OSTab;
00338     using Teuchos::RCP;
00339     using Teuchos::waitAll;
00340     using std::endl;
00341 
00342 #ifdef HAVE_TEUCHOS_DEBUG
00343     // Prepare for verbose output, if applicable.
00344     Teuchos::EVerbosityLevel verbLevel = this->getVerbLevel ();
00345     RCP<FancyOStream> out = this->getOStream ();
00346     const bool doPrint = out.get () &&
00347       includesVerbLevel (verbLevel, Teuchos::VERB_EXTREME, true);
00348     const int myRank = comm_->getRank ();
00349 
00350     if (doPrint) {
00351       *out << "Distributor::doWaits (Proc " << myRank << "):" << endl;
00352     }
00353     OSTab tab = this->getOSTab(); // Add one tab level
00354 #endif // HAVE_TEUCHOS_DEBUG
00355 
00356     if (requests_.size() > 0) {
00357       waitAll (*comm_, requests_());
00358 
00359 #ifdef HAVE_TEUCHOS_DEBUG
00360       if (doPrint) {
00361         *out << "Proc " << myRank << ": waitAll completed " << requests_.size()
00362              << " requests" << endl;
00363       }
00364 
00365       // Make sure that waitAll() nulled out all the requests.
00366       using Teuchos::Array;
00367       using Teuchos::CommRequest;
00368       using Teuchos::RCP;
00369       for (Array<RCP<CommRequest> >::const_iterator it = requests_.begin();
00370            it != requests_.end(); ++it)
00371       {
00372         TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
00373           Teuchos::typeName(*this) << "::doWaits(): Communication requests "
00374           "should all be null aftr calling Teuchos::waitAll() on them, but "
00375           "at least one request is not null.");
00376       }
00377 #endif // HAVE_TEUCHOS_DEBUG
00378       // Restore the invariant that requests_.size() is the number of
00379       // outstanding nonblocking communication requests.
00380       requests_.resize (0);
00381     }
00382   }
00383 
00384 
00385   void Distributor::doReverseWaits()
00386   {
00387     // call doWaits() on the reverse Distributor, if it exists
00388     if (! reverseDistributor_.is_null()) {
00389       reverseDistributor_->doWaits();
00390     }
00391   }
00392 
00393   std::string Distributor::description() const
00394   {
00395     std::ostringstream oss;
00396     oss << Teuchos::Describable::description();
00397     return oss.str();
00398   }
00399 
00400   void Distributor::describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel) const
00401   {
00402     using std::endl;
00403     using std::setw;
00404     using Teuchos::VERB_DEFAULT;
00405     using Teuchos::VERB_NONE;
00406     using Teuchos::VERB_LOW;
00407     using Teuchos::VERB_MEDIUM;
00408     using Teuchos::VERB_HIGH;
00409     using Teuchos::VERB_EXTREME;
00410     Teuchos::EVerbosityLevel vl = verbLevel;
00411     if (vl == VERB_DEFAULT) vl = VERB_LOW;
00412     const int myImageID = comm_->getRank();
00413     const int numImages = comm_->getSize();
00414     Teuchos::OSTab tab(out);
00415 
00416     if (vl == VERB_NONE) {
00417       return;
00418     } else {
00419       if (myImageID == 0) {
00420         // VERB_LOW and higher prints description() (on Proc 0 only).
00421         out << this->description() << endl;
00422       }
00423       if (vl == VERB_LOW) {
00424         return;
00425       } else {
00426         // vl > VERB_LOW lets each image print its data.  We assume
00427         // that all images can print to the given output stream, and
00428         // execute barriers to make it more likely that the output
00429         // will be in the right order.
00430         for (int imageCtr = 0; imageCtr < numImages; ++imageCtr) {
00431           if (myImageID == imageCtr) {
00432             out << "[Node " << myImageID << " of " << numImages << "]" << endl;
00433             out << " selfMessage: " << hasSelfMessage() << endl;
00434             out << " numSends: " << getNumSends() << endl;
00435             if (vl == VERB_HIGH || vl == VERB_EXTREME) {
00436               out << " imagesTo: " << toString(imagesTo_) << endl;
00437               out << " lengthsTo: " << toString(lengthsTo_) << endl;
00438               out << " maxSendLength: " << getMaxSendLength() << endl;
00439             }
00440             if (vl == VERB_EXTREME) {
00441               out << " startsTo: " << toString(startsTo_) << endl;
00442               out << " indicesTo: " << toString(indicesTo_) << endl;
00443             }
00444             if (vl == VERB_HIGH || vl == VERB_EXTREME) {
00445               out << " numReceives: " << getNumReceives() << endl;
00446               out << " totalReceiveLength: " << getTotalReceiveLength() << endl;
00447               out << " lengthsFrom: " << toString(lengthsFrom_) << endl;
00448               out << " startsFrom: " << toString(startsFrom_) << endl;
00449               out << " imagesFrom: " << toString(imagesFrom_) << endl;
00450             }
00451             // Last output is a flush; it leaves a space and also
00452             // helps synchronize output.
00453             out << std::flush;
00454           } // if it's my image's turn to print
00455           // Execute barriers to give output time to synchronize.
00456           // One barrier generally isn't enough.
00457           comm_->barrier();
00458           comm_->barrier();
00459           comm_->barrier();
00460         } // for each image
00461       }
00462     }
00463   }
00464 
00465   void
00466   Distributor::computeReceives ()
00467   {
00468     using Teuchos::Array;
00469     using Teuchos::CommStatus;
00470     using Teuchos::CommRequest;
00471     using Teuchos::ireceive;
00472     using Teuchos::RCP;
00473     using Teuchos::rcp;
00474     using Teuchos::REDUCE_SUM;
00475     using Teuchos::receive;
00476     using Teuchos::reduceAllAndScatter;
00477     using Teuchos::send;
00478     using Teuchos::waitAll;
00479 
00480     const int myRank = comm_->getRank();
00481     const int numProcs = comm_->getSize();
00482 
00483     // toNodesFromMe[i] == the number of messages sent by this process
00484     // to process i.  The data in numSends_, imagesTo_, lengthsTo_
00485     // concern the contiguous sends.  Therefore, each process will be
00486     // listed in imagesTo_ at most once.
00487     {
00488       Array<size_t> toNodesFromMe (numProcs,0);
00489 #ifdef HAVE_TEUCHOS_DEBUG
00490       bool counting_error = false;
00491 #endif // HAVE_TEUCHOS_DEBUG
00492       for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
00493 #ifdef HAVE_TEUCHOS_DEBUG
00494         if (toNodesFromMe[imagesTo_[i]] != 0) {
00495           counting_error = true;
00496         }
00497 #endif // HAVE_TEUCHOS_DEBUG
00498         toNodesFromMe[imagesTo_[i]] = 1;
00499       }
00500 #ifdef HAVE_TEUCHOS_DEBUG
00501       SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
00502         "Tpetra::Distributor::computeReceives: There was an error on at least "
00503         "one process in counting the number of messages send by that process to "
00504         "the other processs.  Please report this bug to the Tpetra developers.",
00505         *comm_);
00506 #endif // HAVE_TEUCHOS_DEBUG
00507 
00508       // Each process will get back only one item (hence, counts =
00509       // ones) from the array of global sums, namely that entry
00510       // corresponding to the process, and detailing how many receives
00511       // it has.  This total includes self sends.
00512       //
00513       // mfh 09 Jan 2012: The reduceAllAndScatter really isn't
00514       // necessary here.  Since counts is just all ones, we could
00515       // replace this with an all-reduce on toNodesFromMe, and let my
00516       // process (with rank myRank) get numReceives_ from
00517       // toNodesFromMe[myRank].  The HPCCG miniapp uses the all-reduce
00518       // method.  It could be possible that reduceAllAndScatter is
00519       // faster, but it also makes the code more complicated, and it
00520       // can't be _asymptotically_ faster (MPI_Allreduce has twice the
00521       // critical path length of MPI_Reduce, so reduceAllAndScatter
00522       // can't be more than twice as fast as the all-reduce, even if
00523       // the scatter is free).
00524       Array<int> counts (numProcs, 1);
00525       reduceAllAndScatter (*comm_, REDUCE_SUM, numProcs, &toNodesFromMe[0],
00526                            &counts[0], &numReceives_);
00527     }
00528 
00529     // Now we know numReceives_, which is this process' number of
00530     // receives.  Allocate the lengthsFrom_ and imagesFrom_ arrays
00531     // with this number of entries.
00532     lengthsFrom_.assign (numReceives_, 0);
00533     imagesFrom_.assign (numReceives_, 0);
00534 
00535     //
00536     // Ask (via nonblocking receive) each process from which we are
00537     // receiving how many packets we should expect from it in the
00538     // communication pattern.
00539     //
00540 
00541     // At this point, numReceives_ includes any self message that
00542     // there may be.  At the end of this routine, we'll subtract off
00543     // the self message (if there is one) from numReceives_.  In this
00544     // routine, we don't need to receive a message from ourselves in
00545     // order to figure out our lengthsFrom_ and source process ID; we
00546     // can just ask ourselves directly.  Thus, the actual number of
00547     // nonblocking receives we post here does not include the self
00548     // message.
00549     const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
00550 
00551     // Teuchos' wrapper for nonblocking receives requires receive
00552     // buffers that it knows won't go away.  This is why we use RCPs,
00553     // one RCP per nonblocking receive request.  They get allocated in
00554     // the loop below.
00555     Array<RCP<CommRequest> > requests (actualNumReceives);
00556     Array<RCP<size_t> > lengthsFromBuffers (actualNumReceives);
00557     Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
00558 
00559     // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
00560     // (receive data from any process).
00561     const int anySourceProc = -1;
00562 
00563     for (size_t i = 0; i < actualNumReceives; ++i) {
00564       lengthsFromBuffers[i] = rcp (new size_t (0));
00565       // Once the receive completes, we can ask the corresponding
00566       // CommStatus object (output by wait()) for the sending process'
00567       // ID (which we'll assign to imagesFrom_[i] -- don't forget to
00568       // do that!).
00569       requests[i] = ireceive (*comm_, lengthsFromBuffers[i], anySourceProc);
00570     }
00571 
00572     // Tell each process to which we are sending how many packets it
00573     // should expect from us in the communication pattern.
00574     //
00575     // We assume that numSends_ and selfMessage_ have already been
00576     // set.  The value of numSends_ (my process' number of sends) does
00577     // not include any message that it might send to itself.
00578     for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
00579       if (imagesTo_[i] != myRank ) {
00580         // Send a message to imagesTo_[i], telling that process that
00581         // this communication pattern will send that process
00582         // lengthsTo_[i] blocks of packets.
00583         send (*comm_, lengthsTo_[i], imagesTo_[i]);
00584       }
00585       else {
00586         // We don't need a send in the self-message case.  If this
00587         // process will send a message to itself in the communication
00588         // pattern, then the last element of lengthsFrom_ and
00589         // imagesFrom_ corresponds to the self-message.  Of course
00590         // this process knows how long the message is, and the process
00591         // ID is its own process ID.
00592         lengthsFrom_[numReceives_-1] = lengthsTo_[i];
00593         imagesFrom_[numReceives_-1] = myRank;
00594       }
00595     }
00596 
00597     //
00598     // Wait on all the receives.  When they arrive, check the status
00599     // output of wait() for the receiving process ID, unpack the
00600     // request buffers into lengthsFrom_, and set imagesFrom_ from the
00601     // status.
00602     //
00603     waitAll (*comm_, requests (), statuses ());
00604     for (size_t i = 0; i < actualNumReceives; ++i) {
00605       lengthsFrom_[i] = *lengthsFromBuffers[i];
00606       imagesFrom_[i] = statuses[i]->getSourceRank ();
00607     }
00608 
00609 #ifdef HAVE_TEUCHOS_DEBUG
00610     comm_->barrier();
00611 #endif // HAVE_TEUCHOS_DEBUG
00612 
00613     // Sort the imagesFrom_ array, and apply the same permutation to
00614     // lengthsFrom_.  This ensures that imagesFrom_[i] and
00615     // lengthsFrom_[i] refers to the same thing.
00616     sort2 (imagesFrom_.begin(), imagesFrom_.end(), lengthsFrom_.begin());
00617 
00618     // Compute indicesFrom_
00619     totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
00620     indicesFrom_.clear ();
00621     indicesFrom_.reserve (totalReceiveLength_);
00622     for (size_t i = 0; i < totalReceiveLength_; ++i) {
00623       indicesFrom_.push_back(i);
00624     }
00625 
00626     startsFrom_.clear ();
00627     startsFrom_.reserve (numReceives_);
00628     for (size_t i = 0, j = 0; i < numReceives_; ++i) {
00629       startsFrom_.push_back(j);
00630       j += lengthsFrom_[i];
00631     }
00632 
00633     if (selfMessage_) {
00634       --numReceives_;
00635     }
00636 
00637 #ifdef HAVE_TEUCHOS_DEBUG
00638     comm_->barrier();
00639 #endif // HAVE_TEUCHOS_DEBUG
00640   }
00641 
00642   size_t
00643   Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportNodeIDs)
00644   {
00645     using Teuchos::outArg;
00646     numExports_ = exportNodeIDs.size();
00647 
00648     const int myImageID = comm_->getRank();
00649     const int numImages = comm_->getSize();
00650 
00651     // exportNodeIDs tells us the communication pattern for this
00652     // distributor.  It dictates the way that the export data will be
00653     // interpreted in doPosts().  We want to perform at most one
00654     // communication per node; this is for two reasons:
00655     //   * minimize latency/overhead in the comm routines (nice)
00656     //   * match the number of receives and sends between nodes (necessary)
00657     //
00658     // Teuchos::Comm requires that the data for a send is contiguous
00659     // in a send buffer.  Therefore, if the data in the send buffer
00660     // for doPosts() is not contiguous, it will need to be copied into
00661     // a contiguous buffer.  The user has specified this noncontiguous
00662     // pattern and we can't do anything about it.  However, if they do
00663     // not provide an efficient pattern, we will warn them if one of
00664     // the following compile-time options has been set:
00665     //   * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
00666     //   * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
00667     //
00668     // If the data is contiguous, then we can post the sends in situ
00669     // (i.e., without needing to copy them into a send buffer).
00670     //
00671     // Determine contiguity. There are a number of ways to do this:
00672     // * If the export IDs are sorted, then all exports to a
00673     //   particular node must be contiguous. This is what Epetra does.
00674     // * If the export ID of the current export already has been
00675     //   listed, then the previous listing should correspond to the
00676     //   same export.  This tests contiguity, but not sortedness.
00677     //
00678     // Both of these tests require O(n), where n is the number of
00679     // exports. However, the latter will positively identify a greater
00680     // portion of contiguous patterns. We will use the latter method.
00681     //
00682     // Check to see if values are grouped by images without gaps
00683     // If so, indices_to -> 0.
00684 
00685     // Set up data structures for quick traversal of arrays.
00686     // This contains the number of sends for each image id.
00687     Teuchos::Array<size_t> starts (numImages + 1, 0);
00688 
00689     // numActive is the number of sends that are not Null
00690     size_t numActive = 0;
00691     char needSendBuff = 0;
00692 
00693     int badID = -1;
00694     for (size_t i = 0; i < numExports_; ++i) {
00695       int exportID = exportNodeIDs[i];
00696       if (exportID >= numImages) {
00697         badID = myImageID;
00698         break;
00699       }
00700       else if (exportID >= 0) {
00701         // increment starts[exportID]
00702         ++starts[exportID];
00703         // if after incrementing it is greater than one, check that the
00704         // previous export went to this node
00705         // this is a safe comparison, because starts[exportID] > 1
00706         // implies that i > 1.
00707         // null entries break continuity.
00708         // e.g.,  [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
00709         if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportNodeIDs[i-1]) {
00710           needSendBuff = 1;
00711         }
00712         ++numActive;
00713       }
00714     }
00715     {
00716       int gbl_badID;
00717       Teuchos::reduceAll(*comm_,Teuchos::REDUCE_MAX,badID,outArg(gbl_badID));
00718       TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
00719           Teuchos::typeName(*this) << "::createFromSends(): bad node id listed on node " << gbl_badID << ".");
00720     }
00721 
00722 #   if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
00723     {
00724       char global_needSendBuff;
00725       Teuchos::reduceAll (*comm_, Teuchos::REDUCE_MAX, needSendBuff, Teuchos::ptr (&global_needSendBuff));
00726       TPETRA_EFFICIENCY_WARNING(global_needSendBuff,std::runtime_error,
00727           "::createFromSends(): Grouping export IDs together leads to improved performance.");
00728     }
00729 #   endif
00730 
00731     // Determine from the caller's data whether or not the current
00732     // process should send (a) message(s) to itself.
00733     if (starts[myImageID] != 0) {
00734       selfMessage_ = true;
00735     }
00736     else {
00737       selfMessage_ = false;
00738     }
00739 
00740 
00741 #ifdef HAVE_TEUCHOS_DEBUG
00742     bool index_neq_numActive = false;
00743     bool send_neq_numSends = false;
00744 #endif
00745     if (!needSendBuff) {
00746       // grouped by image, no send buffer or indicesTo_ needed
00747       numSends_ = 0;
00748       // Count total number of sends, i.e., total number of images to
00749       // which we are sending.  This includes myself, if applicable.
00750       for (int i=0; i < numImages; ++i) {
00751         if (starts[i]) ++numSends_;
00752       }
00753 
00754       // Not only do we not need these, but we must clear them, as
00755       // empty status of indicesTo is a flag used later.
00756       indicesTo_.resize(0);
00757       // Size these to numSends_; note, at the moment, numSends_
00758       // includes self sends.  Set their values to zeros.
00759       imagesTo_.assign(numSends_,0);
00760       startsTo_.assign(numSends_,0);
00761       lengthsTo_.assign(numSends_,0);
00762 
00763       // set startsTo to the offset for each send (i.e., each image ID)
00764       // set imagesTo to the image ID for each send
00765       // in interpreting this code, remember that we are assuming contiguity
00766       // that is why index skips through the ranks
00767       {
00768         size_t index = 0, nodeIndex = 0;
00769         for (size_t i = 0; i < numSends_; ++i) {
00770           while (exportNodeIDs[nodeIndex] < 0) {
00771             ++nodeIndex; // skip all negative node IDs
00772           }
00773           startsTo_[i] = nodeIndex;
00774           int imageID = exportNodeIDs[nodeIndex];
00775           imagesTo_[i] = imageID;
00776           index     += starts[imageID];
00777           nodeIndex += starts[imageID];
00778         }
00779 #ifdef HAVE_TEUCHOS_DEBUG
00780         if (index != numActive) {
00781           index_neq_numActive = true;
00782         }
00783 #endif
00784       }
00785       // sort the startsTo and image IDs together, in ascending order, according
00786       // to image IDs
00787       if (numSends_ > 0) {
00788         sort2(imagesTo_.begin(), imagesTo_.end(), startsTo_.begin());
00789       }
00790       // compute the maximum send length
00791       maxSendLength_ = 0;
00792       for (size_t i = 0; i < numSends_; ++i) {
00793         int imageID = imagesTo_[i];
00794         lengthsTo_[i] = starts[imageID];
00795         if ((imageID != myImageID) && (lengthsTo_[i] > maxSendLength_)) {
00796           maxSendLength_ = lengthsTo_[i];
00797         }
00798       }
00799     }
00800     else {
00801       // not grouped by image, need send buffer and indicesTo_
00802 
00803       // starts[i] is the number of sends to node i
00804       // numActive equals number of sends total, \sum_i starts[i]
00805 
00806       // this loop starts at starts[1], so explicitly check starts[0]
00807       if (starts[0] == 0 ) {
00808         numSends_ = 0;
00809       }
00810       else {
00811         numSends_ = 1;
00812       }
00813       for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
00814                                             im1=starts.begin();
00815            i != starts.end(); ++i)
00816       {
00817         if (*i != 0) ++numSends_;
00818         *i += *im1;
00819         im1 = i;
00820       }
00821       // starts[i] now contains the number of exports to nodes 0 through i
00822 
00823       for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
00824                                                       i=starts.rbegin()+1;
00825            i != starts.rend(); ++i)
00826       {
00827         *ip1 = *i;
00828         ip1 = i;
00829       }
00830       starts[0] = 0;
00831       // starts[i] now contains the number of exports to nodes 0 through
00832       // i-1, i.e., all nodes before node i
00833 
00834       indicesTo_.resize(numActive);
00835 
00836       for (size_t i = 0; i < numExports_; ++i) {
00837         if (exportNodeIDs[i] >= 0) {
00838           // record the offset to the sendBuffer for this export
00839           indicesTo_[starts[exportNodeIDs[i]]] = i;
00840           // now increment the offset for this node
00841           ++starts[exportNodeIDs[i]];
00842         }
00843       }
00844       // our send buffer will contain the export data for each of the nodes
00845       // we communicate with, in order by node id
00846       // sendBuffer = {node_0_data, node_1_data, ..., node_np-1_data}
00847       // indicesTo now maps each export to the location in our send buffer
00848       // associated with the export
00849       // data for export i located at sendBuffer[indicesTo[i]]
00850       //
00851       // starts[i] once again contains the number of exports to
00852       // nodes 0 through i
00853       for (int node = numImages-1; node != 0; --node) {
00854         starts[node] = starts[node-1];
00855       }
00856       starts.front() = 0;
00857       starts[numImages] = numActive;
00858       //
00859       // starts[node] once again contains the number of exports to
00860       // nodes 0 through node-1
00861       // i.e., the start of my data in the sendBuffer
00862 
00863       // this contains invalid data at nodes we don't care about, that is okay
00864       imagesTo_.resize(numSends_);
00865       startsTo_.resize(numSends_);
00866       lengthsTo_.resize(numSends_);
00867 
00868       // for each group of sends/exports, record the destination node,
00869       // the length, and the offset for this send into the
00870       // send buffer (startsTo_)
00871       maxSendLength_ = 0;
00872       size_t snd = 0;
00873       for (int node = 0; node < numImages; ++node ) {
00874         if (starts[node+1] != starts[node]) {
00875           lengthsTo_[snd] = starts[node+1] - starts[node];
00876           startsTo_[snd] = starts[node];
00877           // record max length for all off-node sends
00878           if ((node != myImageID) && (lengthsTo_[snd] > maxSendLength_)) {
00879             maxSendLength_ = lengthsTo_[snd];
00880           }
00881           imagesTo_[snd] = node;
00882           ++snd;
00883         }
00884       }
00885 #ifdef HAVE_TEUCHOS_DEBUG
00886       if (snd != numSends_) {
00887         send_neq_numSends = true;
00888       }
00889 #endif
00890     }
00891 #ifdef HAVE_TEUCHOS_DEBUG
00892         SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
00893             "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
00894         SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
00895             "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
00896 #endif
00897 
00898     if (selfMessage_) --numSends_;
00899 
00900     // Invert map to see what msgs are received and what length
00901     computeReceives();
00902 
00903     return totalReceiveLength_;
00904   }
00905 
00906 } // namespace Tpetra
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines