Tpetra Matrix/Vector Services Version of the Day
Tpetra_Distributor.hpp
00001 // @HEADER
00002 // ***********************************************************************
00003 //
00004 //          Tpetra: Templated Linear Algebra Services Package
00005 //                 Copyright (2008) Sandia Corporation
00006 //
00007 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00008 // the U.S. Government retains certain rights in this software.
00009 //
00010 // Redistribution and use in source and binary forms, with or without
00011 // modification, are permitted provided that the following conditions are
00012 // met:
00013 //
00014 // 1. Redistributions of source code must retain the above copyright
00015 // notice, this list of conditions and the following disclaimer.
00016 //
00017 // 2. Redistributions in binary form must reproduce the above copyright
00018 // notice, this list of conditions and the following disclaimer in the
00019 // documentation and/or other materials provided with the distribution.
00020 //
00021 // 3. Neither the name of the Corporation nor the names of the
00022 // contributors may be used to endorse or promote products derived from
00023 // this software without specific prior written permission.
00024 //
00025 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00026 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00027 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00028 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00029 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00030 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00031 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00032 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00033 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00034 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00035 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00036 //
00037 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
00038 //
00039 // ************************************************************************
00040 // @HEADER
00041 
00042 #ifndef TPETRA_DISTRIBUTOR_HPP
00043 #define TPETRA_DISTRIBUTOR_HPP
00044 
00045 #include "Tpetra_Util.hpp"
00046 #include <Teuchos_as.hpp>
00047 #include <Teuchos_Describable.hpp>
00048 #include <Teuchos_ParameterListAcceptorDefaultBase.hpp>
00049 #include <Teuchos_VerboseObject.hpp>
00050 
00051 namespace Tpetra {
00052 
00053   namespace {
00054     // This is an implementation detail of Distributor.  Please do not
00055     // rely on these values in your code.  We use this to pick the
00056     // type of send operation that Distributor uses.
00057     enum EDistributorSendType {
00058       DISTRIBUTOR_ISEND, // Use MPI_Isend (Teuchos::isend)
00059       DISTRIBUTOR_RSEND, // Use MPI_Rsend (Teuchos::readySend)
00060       DISTRIBUTOR_SEND,  // Use MPI_Send (Teuchos::send)
00061       DISTRIBUTOR_SSEND  // Use MPI_Ssend (Teuchos::ssend)
00062     };
00063 
00064     // Convert an EDistributorSendType enum value to a string.
00065     std::string
00066     DistributorSendTypeEnumToString (EDistributorSendType sendType)
00067     {
00068       if (sendType == DISTRIBUTOR_ISEND) {
00069         return "Isend";
00070       }
00071       else if (sendType == DISTRIBUTOR_RSEND) {
00072         return "Rsend";
00073       }
00074       else if (sendType == DISTRIBUTOR_SEND) {
00075         return "Send";
00076       }
00077       else if (sendType == DISTRIBUTOR_SSEND) {
00078         return "Ssend";
00079       }
00080       else {
00081         TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
00082           "EDistributorSendType enum value " << sendType << ".");
00083       }
00084     }
00085 
00086   } // namespace (anonymous)
00087 
00089   Array<std::string> distributorSendTypes ();
00090 
00098   class Distributor :
00099     public Teuchos::Describable,
00100     public Teuchos::ParameterListAcceptorDefaultBase,
00101     public Teuchos::VerboseObject<Distributor> {
00102   public:
00103 
00105 
00106 
00114     explicit Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm);
00115 
00126     explicit Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
00127                           const Teuchos::RCP<Teuchos::ParameterList>& plist);
00128 
00130     Distributor(const Distributor &distributor);
00131 
00133     virtual ~Distributor();
00134 
00136 
00137 
00138 
00143     void setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist);
00144 
00163     Teuchos::RCP<const Teuchos::ParameterList> getValidParameters () const;
00164 
00166 
00167 
00168 
00186     size_t createFromSends (const ArrayView<const int>& exportNodeIDs);
00187 
00220     template <class Ordinal>
00221     void createFromRecvs(const ArrayView<const Ordinal> &remoteIDs,
00222                          const ArrayView<const int> &remoteNodeIDs,
00223                                ArrayRCP<Ordinal> &exportIDs,
00224                                ArrayRCP<int> &exportNodeIDs);
00225 
00227 
00229 
00230 
00232     size_t getNumReceives() const;
00233 
00235     size_t getNumSends() const;
00236 
00238 
00239     bool hasSelfMessage() const;
00240 
00242     size_t getMaxSendLength() const;
00243 
00245     size_t getTotalReceiveLength() const;
00246 
00248     ArrayView<const int> getImagesFrom() const;
00249 
00251     ArrayView<const int> getImagesTo() const;
00252 
00254 
00255     ArrayView<const size_t> getLengthsFrom() const;
00256 
00258 
00259     ArrayView<const size_t> getLengthsTo() const;
00260 
00262 
00264 
00265 
00267 
00270     const RCP<Distributor>& getReverse() const;
00271 
00273 
00275 
00276 
00295     template <class Packet>
00296     void
00297     doPostsAndWaits (const ArrayView<const Packet> &exports,
00298                      size_t numPackets,
00299                      const ArrayView<Packet> &imports);
00300 
00319     template <class Packet>
00320     void
00321     doPostsAndWaits (const ArrayView<const Packet> &exports,
00322                      const ArrayView<size_t> &numExportPacketsPerLID,
00323                      const ArrayView<Packet> &imports,
00324                      const ArrayView<size_t> &numImportPacketsPerLID);
00325 
00348     template <class Packet>
00349     void
00350     doPosts (const ArrayRCP<const Packet> &exports,
00351              size_t numPackets,
00352              const ArrayRCP<Packet> &imports);
00353 
00370     template <class Packet>
00371     void
00372     doPosts (const ArrayRCP<const Packet> &exports,
00373              const ArrayView<size_t> &numExportPacketsPerLID,
00374              const ArrayRCP<Packet> &imports,
00375              const ArrayView<size_t> &numImportPacketsPerLID);
00376 
00378     void doWaits ();
00379 
00384     template <class Packet>
00385     void
00386     doReversePostsAndWaits (const ArrayView<const Packet> &exports,
00387                             size_t numPackets,
00388                             const ArrayView<Packet> &imports);
00389 
00394     template <class Packet>
00395     void
00396     doReversePostsAndWaits (const ArrayView<const Packet> &exports,
00397                             const ArrayView<size_t> &numExportPacketsPerLID,
00398                             const ArrayView<Packet> &imports,
00399                             const ArrayView<size_t> &numImportPacketsPerLID);
00400 
00405     template <class Packet>
00406     void
00407     doReversePosts (const ArrayRCP<const Packet> &exports,
00408                     size_t numPackets,
00409                     const ArrayRCP<Packet> &imports);
00410 
00415     template <class Packet>
00416     void
00417     doReversePosts (const ArrayRCP<const Packet> &exports,
00418                     const ArrayView<size_t> &numExportPacketsPerLID,
00419                     const ArrayRCP<Packet> &imports,
00420                     const ArrayView<size_t> &numImportPacketsPerLID);
00421 
00423     void doReverseWaits ();
00424 
00426 
00428 
00429 
00431     std::string description() const;
00432 
00434     void describe (Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const;
00435 
00437 
00438   private:
00440     RCP<const Comm<int> > comm_;
00441 
00443 
00444 
00445     EDistributorSendType sendType_;
00447     bool barrierBetween_;
00449 
00458     size_t numExports_;
00459 
00461     bool selfMessage_;
00462 
00471     size_t numSends_;
00472 
00477     Array<int> imagesTo_;
00478 
00487     Array<size_t> startsTo_;
00488 
00494     Array<size_t> lengthsTo_;
00495 
00499     size_t maxSendLength_;
00500 
00516     Array<size_t> indicesTo_;
00517 
00527     size_t numReceives_;
00528 
00535     size_t totalReceiveLength_;
00536 
00537     // imagesFrom_, startsFrom_ and lengthsFrom_ each have size
00538     //   numReceives_ + selfMessage_
00539     Array<size_t> lengthsFrom_;
00540     Array<int> imagesFrom_;
00541     Array<size_t> startsFrom_;
00542 
00547     Array<size_t> indicesFrom_;
00548 
00555     Array<RCP<Teuchos::CommRequest> > requests_;
00556 
00561     mutable RCP<Distributor> reverseDistributor_;
00562 
00573     void computeReceives ();
00574 
00586     template <class Ordinal>
00587     void computeSends (const ArrayView<const Ordinal> &importIDs,
00588                        const ArrayView<const int> &importNodeIDs,
00589                        ArrayRCP<Ordinal> &exportIDs,
00590                        ArrayRCP<int> &exportNodeIDs);
00591 
00593     void createReverseDistributor() const;
00594 
00595   }; // class Distributor
00596 
00597 
00598   template <class Packet>
00599   void Distributor::
00600   doPostsAndWaits (const ArrayView<const Packet>& exports,
00601                    size_t numPackets,
00602                    const ArrayView<Packet>& imports)
00603   {
00604     using Teuchos::as;
00605     using Teuchos::arcp;
00606     using Teuchos::ArrayRCP;
00607 
00608     TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
00609       Teuchos::typeName(*this) << "::doPostsAndWaits(): There are "
00610       << requests_.size() << " outstanding nonblocking messages pending.  It is "
00611       "incorrect to call doPostsAndWaits with posts outstanding.");
00612 
00613     // doPosts() accepts the exports and imports arrays as ArrayRCPs,
00614     // requiring that the memory location is persisting (as is
00615     // necessary for nonblocking receives).  However, it need only
00616     // persist until doWaits() completes, so it is safe for us to use
00617     // a nonpersisting reference in this case.  The use of a
00618     // nonpersisting reference is purely a performance optimization.
00619 
00620     typedef typename ArrayRCP<const Packet>::size_type size_type;
00621     //const Packet* exportsPtr = exports.getRawPtr();
00622     //ArrayRCP<const Packet> exportsArcp (exportsPtr, as<size_type> (0), exports.size(), false);
00623     ArrayRCP<const Packet> exportsArcp (exports.getRawPtr(), as<size_type> (0),
00624                                         exports.size(), false);
00625 
00626     // For some reason, neither of the options below (that use arcp)
00627     // compile for Packet=std::complex<double> with GCC 4.5.1.  The
00628     // issue only arises with the exports array.  This is why we
00629     // construct a separate nonowning ArrayRCP.
00630 
00631     // doPosts (arcp<const Packet> (exports.getRawPtr(), 0, exports.size(), false),
00632     //              numPackets,
00633     //              arcp<Packet> (imports.getRawPtr(), 0, imports.size(), false));
00634     // doPosts (arcp<const Packet> (exportsPtr, 0, exports.size(), false),
00635     //              numPackets,
00636     //              arcp<Packet> (imports.getRawPtr(), 0, imports.size(), false));
00637     doPosts (exportsArcp,
00638              numPackets,
00639              arcp<Packet> (imports.getRawPtr(), 0, imports.size(), false));
00640     doWaits();
00641   }
00642 
00643   template <class Packet>
00644   void Distributor::
00645   doPostsAndWaits (const ArrayView<const Packet>& exports,
00646                    const ArrayView<size_t> &numExportPacketsPerLID,
00647                    const ArrayView<Packet> &imports,
00648                    const ArrayView<size_t> &numImportPacketsPerLID)
00649   {
00650     using Teuchos::arcp;
00651     using Teuchos::ArrayRCP;
00652     using Teuchos::as;
00653 
00654     TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
00655       Teuchos::typeName(*this) << "::doPostsAndWaits(): There are "
00656       << requests_.size() << " outstanding nonblocking messages pending.  It is "
00657       "incorrect to call doPostsAndWaits with posts outstanding.");
00658 
00659     // doPosts() accepts the exports and imports arrays as ArrayRCPs,
00660     // requiring that the memory location is persisting (as is
00661     // necessary for nonblocking receives).  However, it need only
00662     // persist until doWaits() completes, so it is safe for us to use
00663     // a nonpersisting reference in this case.
00664 
00665     // mfh 04 Apr 2012: For some reason, calling arcp<const Packet>
00666     // for Packet=std::complex<T> (e.g., T=float) fails to compile
00667     // with some versions of GCC.  The issue only arises with the
00668     // exports array.  This is why we construct a separate nonowning
00669     // ArrayRCP.
00670     typedef typename ArrayRCP<const Packet>::size_type size_type;
00671     ArrayRCP<const Packet> exportsArcp (exports.getRawPtr(), as<size_type> (0),
00672                                         exports.size(), false);
00673     // mfh 04 Apr 2012: This is the offending code.  This statement
00674     // would normally be in place of "exportsArcp" in the
00675     // doPosts() call below.
00676     //arcp<const Packet> (exports.getRawPtr(), 0, exports.size(), false),
00677     doPosts (exportsArcp,
00678              numExportPacketsPerLID,
00679              arcp<Packet> (imports.getRawPtr(), 0, imports.size(), false),
00680              numImportPacketsPerLID);
00681     doWaits();
00682   }
00683 
00684 
00685   template <class Packet>
00686   void Distributor::
00687   doPosts (const ArrayRCP<const Packet>& exports,
00688            size_t numPackets,
00689            const ArrayRCP<Packet>& imports)
00690   {
00691     using Teuchos::as;
00692     using Teuchos::FancyOStream;
00693     using Teuchos::includesVerbLevel;
00694     using Teuchos::ireceive;
00695     using Teuchos::isend;
00696     using Teuchos::OSTab;
00697     using Teuchos::readySend;
00698     using Teuchos::send;
00699     using Teuchos::ssend;
00700     using Teuchos::typeName;
00701     using std::endl;
00702 
00703     // Run-time configurable parameters that come from the input
00704     // ParameterList set by setParameterList().
00705     const EDistributorSendType sendType = sendType_;
00706     const bool doBarrier = barrierBetween_;
00707 
00708 #ifdef HAVE_TEUCHOS_DEBUG
00709     // Prepare for verbose output, if applicable.
00710     Teuchos::EVerbosityLevel verbLevel = this->getVerbLevel ();
00711     RCP<FancyOStream> out = this->getOStream ();
00712     const bool doPrint = out.get () && (comm_->getRank () == 0) &&
00713       includesVerbLevel (verbLevel, Teuchos::VERB_EXTREME, true);
00714 
00715     if (doPrint) {
00716       // Only need one process to print out parameters.
00717       *out << "Distributor::doPosts" << endl;
00718     }
00719     // Add one tab level.  We declare this outside the doPrint scopes
00720     // so that the tab persists until the end of this method.
00721     OSTab tab = this->getOSTab();
00722     if (doPrint) {
00723       *out << "sendType=" << DistributorSendTypeEnumToString (sendType)
00724            << ", barrierBetween=" << doBarrier << endl;
00725     }
00726 #endif // HAVE_TEUCHOS_DEBUG
00727 
00728     TEUCHOS_TEST_FOR_EXCEPTION(sendType == DISTRIBUTOR_RSEND && ! doBarrier,
00729       std::logic_error, "Ready send implementation requires a barrier between "
00730       "posting receives and posting ready sends.  This should have been checked "
00731       "before.  Please report this bug to the Tpetra developers.");
00732 
00733     const int myImageID = comm_->getRank();
00734     size_t selfReceiveOffset = 0;
00735 
00736 #ifdef HAVE_TEUCHOS_DEBUG
00737     // Each message has the same number of packets.
00738     const size_t totalNumImportPackets = totalReceiveLength_ * numPackets;
00739     TEUCHOS_TEST_FOR_EXCEPTION(as<size_t> (imports.size ()) != totalNumImportPackets,
00740       std::runtime_error, typeName (*this) << "::doPosts(): imports must be "
00741       "large enough to store the imported data.  imports.size() = "
00742       << imports.size() << ", but total number of import packets = "
00743       << totalNumImportPackets << ".");
00744 #endif // HAVE_TEUCHOS_DEBUG
00745 
00746     // Distributor uses requests_.size() as the number of outstanding
00747     // nonblocking message requests, so we resize to zero to maintain
00748     // this invariant.
00749     //
00750     // numReceives_ does _not_ include the self message, if there is
00751     // one.  Here, we do actually send a message to ourselves, so we
00752     // include any self message in the "actual" number of receives to
00753     // post.
00754     //
00755     // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
00756     // doesn't (re)allocate its array of requests.  That happens in
00757     // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
00758     // demand), or Resize_().
00759     const size_t actualNumReceives = numReceives_ + (selfMessage_ ? 1 : 0);
00760     requests_.resize (0);
00761 
00762     // Post the nonblocking receives.  It's common MPI wisdom to post
00763     // receives before sends.  In MPI terms, this means favoring
00764     // adding to the "posted queue" (of receive requests) over adding
00765     // to the "unexpected queue" (of arrived messages not yet matched
00766     // with a receive).
00767     {
00768       size_t curBufferOffset = 0;
00769       for (size_t i = 0; i < actualNumReceives; ++i) {
00770         if (imagesFrom_[i] != myImageID) {
00771           // If my process is receiving these packet(s) from another
00772           // process (not a self-receive):
00773           //
00774           // 1. Set up the persisting view (recvBuf) of the imports
00775           //    array, given the offset and size (total number of
00776           //    packets from process imagesFrom_[i]).
00777           // 2. Start the Irecv and save the resulting request.
00778           ArrayRCP<Packet> recvBuf =
00779             imports.persistingView (curBufferOffset, lengthsFrom_[i]*numPackets);
00780           requests_.push_back (ireceive<int, Packet> (*comm_, recvBuf, imagesFrom_[i]));
00781         }
00782         else { // Receiving from myself
00783           selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
00784         }
00785         curBufferOffset += lengthsFrom_[i]*numPackets;
00786       }
00787     }
00788 
00789     if (doBarrier) {
00790       // Each ready-send below requires that its matching receive has
00791       // already been posted, so do a barrier to ensure that all the
00792       // nonblocking receives have posted first.
00793       Teuchos::barrier (*comm_);
00794     }
00795 
00796     // setup scan through imagesTo_ list starting with higher numbered images
00797     // (should help balance message traffic)
00798     size_t numBlocks = numSends_ + selfMessage_;
00799     size_t imageIndex = 0;
00800     while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
00801       ++imageIndex;
00802     }
00803     if (imageIndex == numBlocks) {
00804       imageIndex = 0;
00805     }
00806 
00807     size_t selfNum = 0;
00808     size_t selfIndex = 0;
00809 
00810     if (indicesTo_.empty()) {
00811       // Data are already blocked (laid out) by process, so we don't
00812       // need a separate send buffer (besides the exports array).
00813       for (size_t i = 0; i < numBlocks; ++i) {
00814         size_t p = i + imageIndex;
00815         if (p > (numBlocks - 1)) {
00816           p -= numBlocks;
00817         }
00818 
00819         if (imagesTo_[p] != myImageID) {
00820           ArrayView<const Packet> tmpSend =
00821             exports.view (startsTo_[p]*numPackets, lengthsTo_[p]*numPackets);
00822           if (sendType == DISTRIBUTOR_RSEND) {
00823             readySend<int,Packet> (*comm_, tmpSend, imagesTo_[p]);
00824           }
00825           else if (sendType == DISTRIBUTOR_ISEND) {
00826             ArrayRCP<const Packet> tmpSendBuf =
00827               exports.persistingView (startsTo_[p] * numPackets,
00828                                       lengthsTo_[p] * numPackets);
00829             requests_.push_back (isend<int, Packet> (*comm_, tmpSendBuf, imagesTo_[p]));
00830           }
00831           else if (sendType == DISTRIBUTOR_SSEND) {
00832             ssend<int, Packet> (*comm_, tmpSend.size(),
00833                                 tmpSend.getRawPtr(), imagesTo_[p]);
00834 
00835           } else { // if (sendType == DISTRIBUTOR_SEND)
00836             // We've already validated sendType, so it has to be
00837             // DISTRIBUTOR_SEND.  If it's not, well, this is a
00838             // reasonable fallback.
00839             //
00840             // FIXME (mfh 23 Mar 2012) Implement a three-argument
00841             // version of send() that takes an ArrayView instead of a
00842             // raw array.
00843             send<int, Packet> (*comm_, tmpSend.size(),
00844                                tmpSend.getRawPtr(), imagesTo_[p]);
00845           }
00846         }
00847         else { // "Sending" the message to myself
00848           selfNum = p;
00849         }
00850       }
00851 
00852       if (selfMessage_) {
00853         std::copy (exports.begin()+startsTo_[selfNum]*numPackets,
00854                    exports.begin()+startsTo_[selfNum]*numPackets+lengthsTo_[selfNum]*numPackets,
00855                    imports.begin()+selfReceiveOffset);
00856       }
00857     }
00858     else { // data are not blocked by image, use send buffer
00859       ArrayRCP<Packet> sendArray (maxSendLength_ * numPackets); // send buffer
00860 
00861       for (size_t i = 0; i < numBlocks; ++i) {
00862         size_t p = i + imageIndex;
00863         if (p > (numBlocks - 1)) {
00864           p -= numBlocks;
00865         }
00866 
00867         if (imagesTo_[p] != myImageID) {
00868           typename ArrayView<const Packet>::iterator srcBegin, srcEnd;
00869           size_t sendArrayOffset = 0;
00870           size_t j = startsTo_[p];
00871           for (size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
00872             srcBegin = exports.begin() + indicesTo_[j]*numPackets;
00873             srcEnd   = srcBegin + numPackets;
00874             std::copy (srcBegin, srcEnd, sendArray.begin()+sendArrayOffset);
00875             sendArrayOffset += numPackets;
00876           }
00877           ArrayView<const Packet> tmpSend = sendArray.view (0, lengthsTo_[p]*numPackets);
00878 
00879           if (sendType == DISTRIBUTOR_RSEND) {
00880             readySend<int,Packet> (*comm_, tmpSend, imagesTo_[p]);
00881           }
00882           else if (sendType == DISTRIBUTOR_ISEND) {
00883             ArrayRCP<const Packet> tmpSendBuf =
00884               sendArray.persistingView (0, lengthsTo_[p] * numPackets);
00885             requests_.push_back (isend<int, Packet> (*comm_, tmpSendBuf, imagesTo_[p]));
00886           }
00887           else if (sendType == DISTRIBUTOR_SSEND) {
00888             ssend<int,Packet> (*comm_, tmpSend.size(),
00889                                tmpSend.getRawPtr(), imagesTo_[p]);
00890           }
00891           else { // if (sendType == DISTRIBUTOR_SEND)
00892             // We've already validated sendType, so it has to be
00893             // DISTRIBUTOR_SEND.  If it's not, well, this is a
00894             // reasonable fallback.
00895             send<int,Packet> (*comm_, tmpSend.size(),
00896                               tmpSend.getRawPtr(), imagesTo_[p]);
00897           }
00898         }
00899         else { // "Sending" the message to myself
00900           selfNum = p;
00901           selfIndex = startsTo_[p];
00902         }
00903       }
00904 
00905       if (selfMessage_) {
00906         for (size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
00907           std::copy (exports.begin()+indicesTo_[selfIndex]*numPackets,
00908                      exports.begin()+indicesTo_[selfIndex]*numPackets + numPackets,
00909                      imports.begin() + selfReceiveOffset);
00910           ++selfIndex;
00911           selfReceiveOffset += numPackets;
00912         }
00913       }
00914     }
00915   }
00916 
00917   template <class Packet>
00918   void Distributor::
00919   doPosts (const ArrayRCP<const Packet>& exports,
00920            const ArrayView<size_t>& numExportPacketsPerLID,
00921            const ArrayRCP<Packet>& imports,
00922            const ArrayView<size_t>& numImportPacketsPerLID)
00923   {
00924     using Teuchos::as;
00925     using Teuchos::ireceive;
00926     using Teuchos::isend;
00927     using Teuchos::readySend;
00928     using Teuchos::send;
00929     using Teuchos::ssend;
00930 #ifdef HAVE_TEUCHOS_DEBUG
00931     using std::endl;
00932 #endif // HAVE_TEUCHOS_DEBUG
00933 
00934     // Run-time configurable parameters that come from the input
00935     // ParameterList set by setParameterList().
00936     const EDistributorSendType sendType = sendType_;
00937     const bool doBarrier = barrierBetween_;
00938 
00939 #ifdef HAVE_TEUCHOS_DEBUG
00940     // Prepare for verbose output, if applicable.
00941     Teuchos::EVerbosityLevel verbLevel = this->getVerbLevel ();
00942     RCP<Teuchos::FancyOStream> out = this->getOStream ();
00943     const bool doPrint = out.get () && (comm_->getRank () == 0) &&
00944       includesVerbLevel (verbLevel, Teuchos::VERB_EXTREME, true);
00945 
00946     if (doPrint) {
00947       // Only need one process to print out parameters.
00948       *out << "Distributor::doPosts" << endl;
00949     }
00950     // Add one tab level.  We declare this outside the doPrint scopes
00951     // so that the tab persists until the end of this method.
00952     Teuchos::OSTab tab = this->getOSTab();
00953     if (doPrint) {
00954       *out << "sendType=" << DistributorSendTypeEnumToString (sendType)
00955            << ", barrierBetween=" << doBarrier << endl;
00956     }
00957 #endif // HAVE_TEUCHOS_DEBUG
00958 
00959     TEUCHOS_TEST_FOR_EXCEPTION(sendType == DISTRIBUTOR_RSEND && ! doBarrier,
00960       std::logic_error, "Ready send implementation requires a barrier between "
00961       "posting receives and posting ready sends.  This should have been checked "
00962       "before.  Please report this bug to the Tpetra developers.");
00963 
00964     const int myImageID = comm_->getRank();
00965     size_t selfReceiveOffset = 0;
00966 
00967 #ifdef HAVE_TEUCHOS_DEBUG
00968     // Different messages may have different numbers of packets.
00969     size_t totalNumImportPackets = 0;
00970     for (int ii = 0; ii < numImportPacketsPerLID.size(); ++ii) {
00971       totalNumImportPackets += numImportPacketsPerLID[ii];
00972     }
00973     TEUCHOS_TEST_FOR_EXCEPTION(as<size_t> (imports.size ()) != totalNumImportPackets,
00974       std::runtime_error, Teuchos::typeName (*this) << "::doPosts(): The "
00975       "imports array argument must be large enough to store the imported "
00976       "data.  imports.size() = " << imports.size() << ", but the total number "
00977       "of packets is " << totalNumImportPackets << ".");
00978 #endif // HAVE_TEUCHOS_DEBUG
00979 
00980     // Distributor uses requests_.size() as the number of outstanding
00981     // nonblocking message requests, so we resize to zero to maintain
00982     // this invariant.
00983     //
00984     // numReceives_ does _not_ include the self message, if there is
00985     // one.  Here, we do actually send a message to ourselves, so we
00986     // include any self message in the "actual" number of receives to
00987     // post.
00988     //
00989     // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
00990     // doesn't (re)allocate its array of requests.  That happens in
00991     // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
00992     // demand), or Resize_().
00993     const size_t actualNumReceives = numReceives_ + (selfMessage_ ? 1 : 0);
00994     requests_.resize (0);
00995 
00996     // Post the nonblocking receives.  It's common MPI wisdom to post
00997     // receives before sends.  In MPI terms, this means favoring
00998     // adding to the "posted queue" (of receive requests) over adding
00999     // to the "unexpected queue" (of arrived messages not yet matched
01000     // with a receive).
01001     {
01002       size_t curBufferOffset = 0;
01003       size_t curLIDoffset = 0;
01004       for (size_t i = 0; i < actualNumReceives; ++i) {
01005         size_t totalPacketsFrom_i = 0;
01006         for (size_t j = 0; j < lengthsFrom_[i]; ++j) {
01007           totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
01008         }
01009         curLIDoffset += lengthsFrom_[i];
01010         if (imagesFrom_[i] != myImageID && totalPacketsFrom_i) {
01011           // If my process is receiving these packet(s) from another
01012           // process (not a self-receive), and if there is at least
01013           // one packet to receive:
01014           //
01015           // 1. Set up the persisting view (recvBuf) into the imports
01016           //    array, given the offset and size (total number of
01017           //    packets from process imagesFrom_[i]).
01018           // 2. Start the Irecv and save the resulting request.
01019           ArrayRCP<Packet> recvBuf =
01020             imports.persistingView (curBufferOffset, totalPacketsFrom_i);
01021           requests_.push_back (ireceive<int, Packet> (*comm_, recvBuf, imagesFrom_[i]));
01022         }
01023         else { // Receiving these packet(s) from myself
01024           selfReceiveOffset = curBufferOffset; // Remember the offset
01025         }
01026         curBufferOffset += totalPacketsFrom_i;
01027       }
01028     }
01029 
01030     if (doBarrier) {
01031       // Each ready-send below requires that its matching receive has
01032       // already been posted, so do a barrier to ensure that all the
01033       // nonblocking receives have posted first.
01034       Teuchos::barrier (*comm_);
01035     }
01036 
01037     // setup arrays containing starting-offsets into exports for each send,
01038     // and num-packets-to-send for each send.
01039     Array<size_t> sendPacketOffsets(numSends_,0), packetsPerSend(numSends_,0);
01040     size_t maxNumPackets = 0;
01041     size_t curPKToffset = 0;
01042     for (size_t pp=0; pp<numSends_; ++pp) {
01043       sendPacketOffsets[pp] = curPKToffset;
01044       size_t numPackets = 0;
01045       for (size_t j=startsTo_[pp]; j<startsTo_[pp]+lengthsTo_[pp]; ++j) {
01046         numPackets += numExportPacketsPerLID[j];
01047       }
01048       if (numPackets > maxNumPackets) maxNumPackets = numPackets;
01049       packetsPerSend[pp] = numPackets;
01050       curPKToffset += numPackets;
01051     }
01052 
01053     // setup scan through imagesTo_ list starting with higher numbered images
01054     // (should help balance message traffic)
01055     size_t numBlocks = numSends_+ selfMessage_;
01056     size_t imageIndex = 0;
01057     while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
01058       ++imageIndex;
01059     }
01060     if (imageIndex == numBlocks) {
01061       imageIndex = 0;
01062     }
01063 
01064     size_t selfNum = 0;
01065     size_t selfIndex = 0;
01066 
01067     if (indicesTo_.empty()) {
01068       // Data are already blocked (laid out) by process, so we don't
01069       // need a separate send buffer (besides the exports array).
01070       for (size_t i = 0; i < numBlocks; ++i) {
01071         size_t p = i + imageIndex;
01072         if (p > (numBlocks - 1)) {
01073           p -= numBlocks;
01074         }
01075 
01076         if (imagesTo_[p] != myImageID && packetsPerSend[p] > 0) {
01077           ArrayView<const Packet> tmpSend =
01078             exports.view (sendPacketOffsets[p], packetsPerSend[p]);
01079           if (sendType == DISTRIBUTOR_RSEND) {
01080             readySend<int,Packet> (*comm_, tmpSend, imagesTo_[p]);
01081           }
01082           else if (sendType == DISTRIBUTOR_ISEND) {
01083             ArrayRCP<const Packet> tmpSendBuf =
01084               exports.persistingView (sendPacketOffsets[p], packetsPerSend[p]);
01085             requests_.push_back (isend<int, Packet> (*comm_, tmpSendBuf, imagesTo_[p]));
01086           }
01087           else if (sendType == DISTRIBUTOR_SSEND) {
01088             ssend<int, Packet> (*comm_, tmpSend.size(),
01089                                 tmpSend.getRawPtr(), imagesTo_[p]);
01090           }
01091           else { // if (sendType == DISTRIBUTOR_SEND)
01092             // We've already validated sendType, so it has to be
01093             // DISTRIBUTOR_SEND.  If it's not, well, this is a
01094             // reasonable fallback.
01095             send<int, Packet> (*comm_, tmpSend.size(),
01096                                tmpSend.getRawPtr(), imagesTo_[p]);
01097           }
01098         }
01099         else { // "Sending" the message to myself
01100           selfNum = p;
01101         }
01102       }
01103 
01104       if (selfMessage_) {
01105         std::copy (exports.begin()+sendPacketOffsets[selfNum],
01106                    exports.begin()+sendPacketOffsets[selfNum]+packetsPerSend[selfNum],
01107                    imports.begin()+selfReceiveOffset);
01108       }
01109     }
01110     else { // data are not blocked by image, use send buffer
01111       ArrayRCP<Packet> sendArray (maxNumPackets); // send buffer
01112 
01113       Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
01114       size_t ioffset = 0;
01115       for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
01116         indicesOffsets[j] = ioffset;
01117         ioffset += numExportPacketsPerLID[j];
01118       }
01119 
01120       for (size_t i = 0; i < numBlocks; ++i) {
01121         size_t p = i + imageIndex;
01122         if (p > (numBlocks - 1)) {
01123           p -= numBlocks;
01124         }
01125 
01126         if (imagesTo_[p] != myImageID) {
01127           typename ArrayView<const Packet>::iterator srcBegin, srcEnd;
01128           size_t sendArrayOffset = 0;
01129           size_t j = startsTo_[p];
01130           size_t numPacketsTo_p = 0;
01131           for (size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
01132             srcBegin = exports.begin() + indicesOffsets[j];
01133             srcEnd   = srcBegin + numExportPacketsPerLID[j];
01134             numPacketsTo_p += numExportPacketsPerLID[j];
01135             std::copy (srcBegin, srcEnd, sendArray.begin()+sendArrayOffset);
01136             sendArrayOffset += numExportPacketsPerLID[j];
01137           }
01138           if (numPacketsTo_p > 0) {
01139             ArrayView<const Packet> tmpSend = sendArray.view (0, numPacketsTo_p);
01140 
01141             if (sendType == DISTRIBUTOR_RSEND) {
01142               readySend<int,Packet> (*comm_,tmpSend,imagesTo_[p]);
01143             }
01144             else if (sendType == DISTRIBUTOR_ISEND) {
01145               ArrayRCP<const Packet> tmpSendBuf =
01146                 sendArray.persistingView (0, numPacketsTo_p);
01147               requests_.push_back (isend<int, Packet> (*comm_, tmpSendBuf,
01148                                                        imagesTo_[p]));
01149             }
01150             else if (sendType == DISTRIBUTOR_SSEND) {
01151               ssend<int,Packet> (*comm_, tmpSend.size(), tmpSend.getRawPtr(), imagesTo_[p]);
01152             }
01153             else { // if (sendType == DISTRIBUTOR_SSEND)
01154               send<int,Packet> (*comm_, tmpSend.size(), tmpSend.getRawPtr(), imagesTo_[p]);
01155             }
01156           }
01157         }
01158         else { // "Sending" the message to myself
01159           selfNum = p;
01160           selfIndex = startsTo_[p];
01161         }
01162       }
01163 
01164       if (selfMessage_) {
01165         for (size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
01166           std::copy (exports.begin()+indicesOffsets[selfIndex],
01167                      exports.begin()+indicesOffsets[selfIndex]+numExportPacketsPerLID[selfIndex],
01168                      imports.begin() + selfReceiveOffset);
01169           selfReceiveOffset += numExportPacketsPerLID[selfIndex];
01170           ++selfIndex;
01171         }
01172       }
01173     }
01174   }
01175 
01176   template <class Packet>
01177   void Distributor::
01178   doReversePostsAndWaits (const ArrayView<const Packet>& exports,
01179                           size_t numPackets,
01180                           const ArrayView<Packet>& imports)
01181   {
01182     using Teuchos::as;
01183 
01184     // doReversePosts() takes exports and imports as ArrayRCPs,
01185     // requiring that the memory locations are persisting.  However,
01186     // they need only persist within the scope of that routine, so it
01187     // is safe for us to use nonpersisting references in this case.
01188 
01189     // mfh 04 Apr 2012: For some reason, calling arcp<const Packet>
01190     // for Packet=std::complex<T> (e.g., T=float) fails to compile
01191     // with some versions of GCC.  The issue only arises with the
01192     // exports array.  This is why we construct a separate nonowning
01193     // ArrayRCP.
01194     typedef typename ArrayRCP<const Packet>::size_type size_type;
01195     ArrayRCP<const Packet> exportsArcp (exports.getRawPtr(), as<size_type> (0),
01196                                         exports.size(), false);
01197     // mfh 04 Apr 2012: This is the offending code.  This statement
01198     // would normally be in place of "exportsArcp" in the
01199     // doReversePosts() call below.
01200     //arcp<const Packet> (exports.getRawPtr(), 0, exports.size(), false)
01201     doReversePosts (exportsArcp,
01202                     numPackets,
01203                     arcp<Packet> (imports.getRawPtr (), 0, imports.size (), false));
01204     doReverseWaits ();
01205   }
01206 
01207   template <class Packet>
01208   void Distributor::
01209   doReversePostsAndWaits (const ArrayView<const Packet>& exports,
01210                           const ArrayView<size_t> &numExportPacketsPerLID,
01211                           const ArrayView<Packet> &imports,
01212                           const ArrayView<size_t> &numImportPacketsPerLID)
01213   {
01214     using Teuchos::as;
01215     using Teuchos::arcp;
01216     using Teuchos::ArrayRCP;
01217 
01218     TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
01219       Teuchos::typeName(*this) << "::doReversePostsAndWaits(): There are "
01220       << requests_.size() << " outstanding nonblocking messages pending.  It is "
01221       "incorrect to call doReversePostsAndWaits with posts outstanding.");
01222 
01223     // doReversePosts() accepts the exports and imports arrays as
01224     // ArrayRCPs, requiring that the memory location is persisting (as
01225     // is necessary for nonblocking receives).  However, it need only
01226     // persist until doReverseWaits() completes, so it is safe for us
01227     // to use a nonpersisting reference in this case.  The use of a
01228     // nonpersisting reference is purely a performance optimization.
01229 
01230     // mfh 02 Apr 2012: For some reason, calling arcp<const Packet>
01231     // for Packet=std::complex<double> fails to compile with some
01232     // versions of GCC.  The issue only arises with the exports array.
01233     // This is why we construct a separate nonowning ArrayRCP.
01234     typedef typename ArrayRCP<const Packet>::size_type size_type;
01235     ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (), as<size_type> (0),
01236                                         exports.size (), false);
01237     doReversePosts (exportsArcp,
01238                     numExportPacketsPerLID,
01239                     arcp<Packet> (imports.getRawPtr (), 0, imports.size (), false),
01240                     numImportPacketsPerLID);
01241     doReverseWaits ();
01242   }
01243 
01244   template <class Packet>
01245   void Distributor::
01246   doReversePosts (const ArrayRCP<const Packet>& exports,
01247                   size_t numPackets,
01248                   const ArrayRCP<Packet>& imports)
01249   {
01250     // FIXME (mfh 29 Mar 2012) WHY?
01251     TEUCHOS_TEST_FOR_EXCEPTION(! indicesTo_.empty (), std::runtime_error,
01252       Teuchos::typeName (*this) << "::doReversePosts(): Can only do reverse "
01253       "communication when original data are blocked by process.");
01254     if (reverseDistributor_.is_null ()) {
01255       createReverseDistributor ();
01256     }
01257     reverseDistributor_->doPosts (exports, numPackets, imports);
01258   }
01259 
01260   template <class Packet>
01261   void Distributor::
01262   doReversePosts (const ArrayRCP<const Packet>& exports,
01263                   const ArrayView<size_t>& numExportPacketsPerLID,
01264                   const ArrayRCP<Packet>& imports,
01265                   const ArrayView<size_t>& numImportPacketsPerLID)
01266   {
01267     // FIXME (mfh 29 Mar 2012) WHY?
01268     TEUCHOS_TEST_FOR_EXCEPTION(! indicesTo_.empty (), std::runtime_error,
01269       Teuchos::typeName (*this) << "::doReversePosts(): Can only do reverse "
01270       "communication when original data are blocked by process.");
01271     if (reverseDistributor_.is_null ()) {
01272       createReverseDistributor ();
01273     }
01274     reverseDistributor_->doPosts (exports, numExportPacketsPerLID,
01275                                   imports, numImportPacketsPerLID);
01276   }
01277 
01278   template <class OrdinalType>
01279   void Distributor::
01280   computeSends (const ArrayView<const OrdinalType> & importIDs,
01281                 const ArrayView<const int> & importNodeIDs,
01282                 ArrayRCP<OrdinalType> & exportIDs,
01283                 ArrayRCP<int> & exportNodeIDs)
01284   {
01285     // NOTE (mfh 19 Apr 2012): There was a note on this code saying:
01286     // "assumes that size_t >= Ordinal".  The code certainly does
01287     // assume that sizeof(size_t) >= sizeof(OrdinalType) as well as
01288     // sizeof(size_t) >= sizeof(int).  This is because it casts the
01289     // OrdinalType elements of importIDs (along with their
01290     // corresponding process IDs, as int) to size_t, and does a
01291     // doPostsAndWaits<size_t>() to send the packed data.
01292     using Teuchos::as;
01293 
01294     const int myRank = comm_->getRank();
01295     const size_t numImports = importNodeIDs.size();
01296     TEUCHOS_TEST_FOR_EXCEPTION(as<size_t> (importIDs.size ()) < numImports,
01297       std::invalid_argument, "Tpetra::Distributor::computeSends: importNodeIDs."
01298       "size() = " << importNodeIDs.size () << " != importIDs.size() = "
01299       << importIDs.size () << ".");
01300 
01301     Array<size_t> importObjs (2*numImports);
01302     // Pack pairs (importIDs[i], my process ID) to send into importObjs.
01303     for (size_t i = 0; i < numImports; ++i ) {
01304       importObjs[2*i]   = as<size_t> (importIDs[i]);
01305       importObjs[2*i+1] = as<size_t> (myRank);
01306     }
01307     //
01308     // Use a temporary Distributor to send the (importIDs[i], myRank)
01309     // pairs to importNodeIDs[i].
01310     //
01311     size_t numExports;
01312     Distributor tempPlan (comm_);
01313     numExports = tempPlan.createFromSends (importNodeIDs);
01314     if (numExports > 0) {
01315       exportIDs = arcp<OrdinalType> (numExports);
01316       exportNodeIDs = arcp<int> (numExports);
01317     }
01318     Array<size_t> exportObjs (tempPlan.getTotalReceiveLength () * 2);
01319     tempPlan.doPostsAndWaits<size_t> (importObjs (), 2, exportObjs ());
01320 
01321     // Unpack received (GID, PID) pairs into exportIDs resp. exportNodeIDs.
01322     for (size_t i = 0; i < numExports; ++i) {
01323       exportIDs[i]     = as<OrdinalType>(exportObjs[2*i]);
01324       exportNodeIDs[i] = exportObjs[2*i+1];
01325     }
01326   }
01327 
01328   template <class OrdinalType>
01329   void Distributor::
01330   createFromRecvs (const ArrayView<const OrdinalType> &remoteIDs,
01331                    const ArrayView<const int> &remoteImageIDs,
01332                    ArrayRCP<OrdinalType> &exportGIDs,
01333                    ArrayRCP<int> &exportNodeIDs)
01334   {
01335     using Teuchos::outArg;
01336     using Teuchos::reduceAll;
01337 
01338     const int myRank = comm_->getRank();
01339     const int errProc =
01340       (remoteIDs.size () != remoteImageIDs.size ()) ? myRank : -1;
01341     int maxErrProc = -1;
01342     reduceAll (*comm_, Teuchos::REDUCE_MAX, errProc, outArg (maxErrProc));
01343     TEUCHOS_TEST_FOR_EXCEPTION(maxErrProc != -1, std::runtime_error,
01344       Teuchos::typeName (*this) << "::createFromRecvs(): lists of remote IDs "
01345       "and remote process IDs must have the same size on all participating "
01346       "processes.  Maximum process ID with error: " << maxErrProc << ".");
01347 
01348     computeSends (remoteIDs, remoteImageIDs, exportGIDs, exportNodeIDs);
01349     (void) createFromSends (exportNodeIDs ());
01350   }
01351 
01352 } // namespace Tpetra
01353 
01354 #endif // TPETRA_DISTRIBUTOR_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines