Zoltan 2 Version 0.5
Zoltan2_IdentifierTraits.hpp
Go to the documentation of this file.
00001 // @HEADER
00002 //
00003 // ***********************************************************************
00004 //
00005 //   Zoltan2: A package of combinatorial algorithms for scientific computing
00006 //                  Copyright 2012 Sandia Corporation
00007 //
00008 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00009 // the U.S. Government retains certain rights in this software.
00010 //
00011 // Redistribution and use in source and binary forms, with or without
00012 // modification, are permitted provided that the following conditions are
00013 // met:
00014 //
00015 // 1. Redistributions of source code must retain the above copyright
00016 // notice, this list of conditions and the following disclaimer.
00017 //
00018 // 2. Redistributions in binary form must reproduce the above copyright
00019 // notice, this list of conditions and the following disclaimer in the
00020 // documentation and/or other materials provided with the distribution.
00021 //
00022 // 3. Neither the name of the Corporation nor the names of the
00023 // contributors may be used to endorse or promote products derived from
00024 // this software without specific prior written permission.
00025 //
00026 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00027 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00028 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00029 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00030 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00031 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00032 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00033 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00034 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00035 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00036 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037 //
00038 // Questions? Contact Karen Devine      (kddevin@sandia.gov)
00039 //                    Erik Boman        (egboman@sandia.gov)
00040 //                    Siva Rajamanickam (srajama@sandia.gov)
00041 //
00042 // ***********************************************************************
00043 //
00044 // @HEADER
00045 // @HEADER
00046 // ***********************************************************************
00047 //            copyright
00048 //
00049 // ***********************************************************************
00050 // @HEADER
00051 
00056 #ifndef _ZOLTAN2_IDENTIFIERTRAITS
00057 #define _ZOLTAN2_IDENTIFIERTRAITS
00058 
00059 #include <Zoltan2_Standards.hpp>
00060 #include <Zoltan2_AlltoAll.hpp>
00061 
00062 #include <Teuchos_SerializationTraits.hpp>
00063 #include <Teuchos_HashUtils.hpp>
00064 #include <Teuchos_ReductionOp.hpp>
00065 
00066 #include <utility>
00067 #include <iostream>
00068 #include <sstream>
00069 #include <string>
00070 #include <limits>
00071 #include <cstdlib>
00072 
00073 using Teuchos::SerializationTraits;
00074 
00075 namespace Zoltan2
00076 {
00079 template <typename T>
00080   std::pair<T, T> z2LocalMinMax(const T *val, size_t n)
00081 {
00082   T max = numeric_limits<T>::min();
00083   T min = numeric_limits<T>::max();
00084 
00085   for (size_t i=0; i < n; i++){
00086     if (val[i] < min) min = val[i];
00087     if (val[i] > max) max = val[i];
00088   }
00089   return std::pair<T,T>(min,max);
00090 }
00091 
00096 int getHashCode(const unsigned char *a, size_t len)
00097 {
00098   int total=0;
00099   unsigned char *to = reinterpret_cast<unsigned char *>(&total);
00100   int c=0;
00101   for (size_t i=0; i < len; i++){
00102     to[c++] += a[i];
00103     if (c == sizeof(int))
00104       c = 0;
00105   }
00106   if (total < 0)
00107     total *= -1;
00108   return total;
00109 }
00110 
00116 template <typename T> class Zoltan2_MinMaxOperation :
00117   public Teuchos::ValueTypeReductionOp<int, char>
00118 {
00119 public:
00120   void reduce( 
00121     const int count, const char inBuffer[], char inoutBuffer[]) const
00122   {
00123     const T *in = reinterpret_cast<const T*>(inBuffer);
00124     T *inout = reinterpret_cast<T*>(inoutBuffer);
00125 
00126     if (in[0] < inout[0]) inout[0] = in[0];
00127     if (in[1] > inout[1]) inout[1] = in[1];
00128   }
00129 };
00130 
00133 template <typename T>
00134   void z2GlobalMinMax(const Comm<int> &comm, bool noLocalValues,
00135     T localMin, T localMax, T &globalMin, T &globalMax)
00136 {
00137   if (noLocalValues){
00138     localMin = numeric_limits<T>::max();
00139     localMax = numeric_limits<T>::min();
00140   }
00141 
00142   if (comm.getSize() == 1){
00143     globalMin = localMin;
00144     globalMax = localMax;
00145     return;
00146   }
00147 
00148   T localValues[2] = {localMin, localMax};
00149   T globalValues[2];
00150 
00151   char *lv = reinterpret_cast<char *>(&localValues[0]);
00152   char *gv = reinterpret_cast<char *>(&globalValues[0]);
00153 
00154   Zoltan2_MinMaxOperation<T> reductionOp;
00155   Teuchos::reduceAll<int, char>(comm, reductionOp, 2*sizeof(T), lv, gv);
00156 
00157   globalMin = globalValues[0];
00158   globalMax = globalValues[1];
00159 }
00160 
00163 template <typename T>
00164   bool z2AreConsecutive(const T *val, size_t n)
00165 {
00166   if (n == 0) return true;
00167 
00168   if (val[n-1] - val[0] + 1 != T(n))
00169     return false;
00170 
00171   T nextval = val[0]+1;
00172   for (size_t i=1; i < n-1; i++){
00173     if (val[i] != nextval)
00174       return false;
00175     nextval++;
00176   }
00177   return true;
00178 }
00179 
00182 template <typename T>
00183   std::string stringifyOrdinal(T ordinal)
00184 {
00185   std::ostringstream oss;
00186   oss << ordinal;
00187   return oss.str();
00188 }
00189 
00192 template<typename T>
00193 struct UndefIdTraits
00194 {
00195 static inline void noop() { 
00196   T::UsingInvalidGlobalIdentifierDataType(); 
00197 }
00198 static inline T invalid() { 
00199   return T::UsingInvalidGlobalIdentifierDataType(); 
00200 }
00201 };
00202 
00203 
00255 template<typename T>
00256 struct IdentifierTraits {
00257 
00263   static int hashCode(const T id) {
00264    return UndefIdTraits<int>::invalid();
00265   }
00266 
00271   static inline bool hasUniqueKey(){
00272    return UndefIdTraits<bool>::invalid();}
00273 
00281   static inline double key(const T c){return static_cast<double>(c); }
00282 
00287   static inline std::string name() {
00288     return UndefIdTraits<std::string>::invalid();
00289   }
00290     
00296   static std::string stringify(T val) {
00297     return UndefIdTraits<std::string>::invalid();
00298   }
00299 
00306   static inline bool isGlobalOrdinal() {
00307     return UndefIdTraits<bool>::invalid();
00308   }
00309 
00319   static inline T difference(const T a, const T b) {
00320     return UndefIdTraits<bool>::invalid();
00321   }
00322 
00328   static inline bool is_valid_id_type() { return false; }
00329 
00339   static void minMax(const T *values, size_t numValues, T &min, T&max) {
00340     UndefIdTraits<T>::noop();
00341   }
00342 
00356   static void globalMinMax(const Comm<int> &comm, bool flag,
00357       T localMin, T localMax, T &globalMin, T &globalMax){
00358     UndefIdTraits<T>::noop();
00359   }
00360 
00371   static bool areConsecutive(const T *val, size_t n){
00372     return UndefIdTraits<bool>::invalid();
00373   }
00374 
00375 };
00376 
00377 #ifndef DOXYGEN_SHOULD_SKIP_THIS
00378 
00379 template<>
00380 struct IdentifierTraits<char> {
00381   typedef char T;
00382   static inline int hashCode(const T c) {return static_cast<int>(c);}
00383   static inline bool hasUniqueKey() { return true;}
00384   static inline double key(const T c){return static_cast<double>(c); }
00385   static inline std::string name()     {return("char");}
00386   static std::string stringify(T val) {return stringifyOrdinal(val);}
00387   static inline bool isGlobalOrdinal() {return true; }
00388 
00389   static inline T difference(const T a, const T b) { return (b-a); }
00390   static inline bool is_valid_id_type() {return true; }
00391   static void minMax(const T *values, size_t n, T &min, T &max) {
00392     std::pair<T, T> ext = z2LocalMinMax(values, n);
00393     min = ext.first;
00394     max = ext.second;
00395   }
00396   static void globalMinMax(const Comm<int> &comm, bool flag,
00397       T localMin, T localMax, T &globalMin, T &globalMax){
00398     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00399   static bool areConsecutive(const T *val, size_t n){ 
00400    return z2AreConsecutive(val, n); }
00401 };
00402 
00403 template<>
00404 struct IdentifierTraits<unsigned char> {
00405   typedef unsigned char T;
00406   static inline int hashCode(const T c) {return static_cast<int>(c);}
00407   static inline bool hasUniqueKey() { return true;}
00408   static inline double key(const T c){return static_cast<double>(c); }
00409   static inline std::string name()     {return("unsigned char");}
00410   static std::string stringify(T val) {return stringifyOrdinal(val);}
00411   static inline bool isGlobalOrdinal() {return true; }
00412 
00413   static inline T difference(const T a, const T b) { return (b-a); }
00414   static inline bool is_valid_id_type() {return true; }
00415   static void minMax(const T *values, size_t n, T &min, T &max) {
00416     std::pair<T, T> ext = z2LocalMinMax(values, n);
00417     min = ext.first;
00418     max = ext.second;
00419   }
00420   static void globalMinMax(const Comm<int> &comm, bool flag,
00421       T localMin, T localMax, T &globalMin, T &globalMax){
00422     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00423   static bool areConsecutive(const T *val, size_t n){ 
00424    return z2AreConsecutive(val, n); }
00425 };
00426 
00427 template<>
00428 struct IdentifierTraits<short> {
00429   typedef short T;
00430   static inline int hashCode(const T  a) {return static_cast<int>(a);}
00431   static inline bool hasUniqueKey() { return true;}
00432   static inline double key(const T a){return static_cast<double>(a); }
00433   static inline std::string name()   {return("short");}
00434   static std::string stringify(T val) {return stringifyOrdinal(val);}
00435   static inline bool isGlobalOrdinal() {return true; }
00436 
00437   static inline T difference(const T a, const T b) { return (b-a); }
00438   static inline bool is_valid_id_type() {return true; }
00439   static void minMax(const T *values, size_t n, T &min, T &max) {
00440     std::pair<T, T> ext = z2LocalMinMax(values, n);
00441     min = ext.first;
00442     max = ext.second;
00443   }
00444   static void globalMinMax(const Comm<int> &comm, bool flag,
00445       T localMin, T localMax, T &globalMin, T &globalMax){
00446     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00447   static bool areConsecutive(const T *val, size_t n){ 
00448   return z2AreConsecutive(val, n); }
00449 };
00450 
00451 template<>
00452 struct IdentifierTraits<unsigned short> {
00453   typedef unsigned short T;
00454   static inline int hashCode(const T  a) {return static_cast<int>(a);}
00455   static inline bool hasUniqueKey() { return true;}
00456   static inline double key(const T a){return static_cast<double>(a); }
00457   static inline std::string name()   {return("unsigned short");}
00458   static std::string stringify(T val) {return stringifyOrdinal(val);}
00459   static inline bool isGlobalOrdinal() {return true; }
00460 
00461   static inline T difference(const T a, const T b) { return (b-a); }
00462   static inline bool is_valid_id_type() {return true; }
00463   static void minMax(const T *values, size_t n, T &min, T &max) {
00464     std::pair<T, T> ext = z2LocalMinMax(values, n);
00465     min = ext.first;
00466     max = ext.second;
00467   }
00468   static void globalMinMax(const Comm<int> &comm, bool flag,
00469       T localMin, T localMax, T &globalMin, T &globalMax){
00470     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00471   static bool areConsecutive(const T *val, size_t n){ 
00472   return z2AreConsecutive(val, n); }
00473 };
00474 
00475 template<>
00476 struct IdentifierTraits<int> {
00477   typedef int T;
00478   static inline int hashCode(const T  a) {return static_cast<int>(a);}
00479   static inline bool hasUniqueKey() { return true;}
00480   static inline double key(const T a){return static_cast<double>(a); }
00481   static inline std::string name()   {return("int");}
00482   static std::string stringify(T val) {return stringifyOrdinal(val);}
00483   static inline bool isGlobalOrdinal() {return true; }
00484 
00485   static inline T difference(const T a, const T b) { return (b-a); }
00486   static inline bool is_valid_id_type() {return true; }
00487   static void minMax(const T *values, size_t n, T &min, T &max) {
00488     std::pair<T, T> ext = z2LocalMinMax(values, n);
00489     min = ext.first;
00490     max = ext.second;
00491   }
00492   static void globalMinMax(const Comm<int> &comm, bool flag,
00493       T localMin, T localMax, T &globalMin, T &globalMax){
00494     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00495   static bool areConsecutive(const T *val, size_t n){ 
00496   return z2AreConsecutive(val, n); }
00497 };
00498 
00499 template<>
00500 struct IdentifierTraits<unsigned int> {
00501   typedef unsigned int T;
00502   static inline int hashCode(const T  a) {return static_cast<int>(a);}
00503   static inline bool hasUniqueKey() { return true;}
00504   static inline double key(const T a){return static_cast<double>(a); }
00505   static inline std::string name()   {return("unsigned int");}
00506   static std::string stringify(T val) {return stringifyOrdinal(val);}
00507   static inline bool isGlobalOrdinal() {return true; }
00508 
00509   static inline T difference(const T a, const T b) { return (b-a); }
00510   static inline bool is_valid_id_type() {return true; }
00511   static void minMax(const T *values, size_t n, T &min, T &max) {
00512     std::pair<T, T> ext = z2LocalMinMax(values, n);
00513     min = ext.first;
00514     max = ext.second;
00515   }
00516   static void globalMinMax(const Comm<int> &comm, bool flag,
00517       T localMin, T localMax, T &globalMin, T &globalMax){
00518     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00519   static bool areConsecutive(const T *val, size_t n){ 
00520   return z2AreConsecutive(val, n); }
00521 };
00522 
00523 
00524 template<>
00525 struct IdentifierTraits<long> {
00526   typedef long T;
00527   static inline int hashCode(const T a) {
00528     return getHashCode(
00529       reinterpret_cast<const unsigned char *>(&a), sizeof(T));}
00530   static inline bool hasUniqueKey() { return true;}
00531   static inline double key(const T a){return static_cast<double>(a); }
00532   static inline std::string name()    {return("long");}
00533   static std::string stringify(T val) {return stringifyOrdinal(val);}
00534   static inline bool isGlobalOrdinal() {return true; }
00535   static inline T difference(const T a, const T b) { return (b-a); }
00536   static inline bool is_valid_id_type() {return true; }
00537   static void minMax(const T *values, size_t n, T &min, T &max) {
00538     std::pair<T, T> ext = z2LocalMinMax(values, n);
00539     min = ext.first;
00540     max = ext.second;
00541   }
00542   static void globalMinMax(const Comm<int> &comm, bool flag,
00543       T localMin, T localMax, T &globalMin, T &globalMax){
00544     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00545   static bool areConsecutive(const T *val, size_t n){
00546     return z2AreConsecutive(val, n); }
00547 };
00548 
00549 template<>
00550 struct IdentifierTraits<unsigned long> {
00551   typedef unsigned long T;
00552   static inline int hashCode(const T a) { 
00553     return getHashCode(
00554       reinterpret_cast<const unsigned char *>(&a), sizeof(T));}
00555   static inline bool hasUniqueKey() { return true;}
00556   static inline double key(const T a){return static_cast<double>(a);}
00557   static inline std::string name()   {return("unsigned long");}
00558   static std::string stringify(T val) {return stringifyOrdinal(val);}
00559   static inline bool isGlobalOrdinal() {return true; }
00560   static inline T difference(const T a, const T b) { return (b-a); }
00561   static inline bool is_valid_id_type() {return true; }
00562   static void minMax(const T *values, size_t n, T &min, T &max) {
00563     std::pair<T, T> ext = z2LocalMinMax(values, n);
00564     min = ext.first;
00565     max = ext.second;
00566   }
00567   static void globalMinMax(const Comm<int> &comm, bool flag,
00568       T localMin, T localMax, T &globalMin, T &globalMax){
00569     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00570   static bool areConsecutive(const T *val, size_t n){
00571     return z2AreConsecutive(val, n); }
00572 };
00573 
00574 #ifdef HAVE_ZOLTAN2_LONG_LONG
00575 
00576 template<>
00577 struct IdentifierTraits<long long> {
00578   typedef long long T;
00579   static inline int hashCode(const T a) {
00580     return getHashCode(
00581       reinterpret_cast<const unsigned char *>(&a), sizeof(T));}
00582   static inline bool hasUniqueKey() { return true;}
00583   static inline double key(const T a){return static_cast<double>(a); }
00584   static inline std::string name()    {return("long long");}
00585   static std::string stringify(T val) {return stringifyOrdinal(val);}
00586   static inline bool isGlobalOrdinal() {return true; }
00587   static inline T difference(const T a, const T b) { return (b-a); }
00588   static inline bool is_valid_id_type() {return true; }
00589   static void minMax(const T *values, size_t n, T &min, T &max) {
00590     std::pair<T, T> ext = z2LocalMinMax(values, n);
00591     min = ext.first;
00592     max = ext.second;
00593   }
00594   static void globalMinMax(const Comm<int> &comm, bool flag,
00595       T localMin, T localMax, T &globalMin, T &globalMax){
00596     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00597   static bool areConsecutive(const T *val, size_t n){ 
00598     return z2AreConsecutive(val, n); }
00599 };
00600 
00601 template<>
00602 struct IdentifierTraits<unsigned long long> {
00603   typedef unsigned long long T;
00604   static inline int hashCode(const T a) {
00605     return getHashCode(
00606       reinterpret_cast<const unsigned char *>(&a), sizeof(T));}
00607   static inline bool hasUniqueKey() { return true;}
00608   static inline double key(const T a){return static_cast<double>(a);}
00609   static inline std::string name()   {return("unsigned long long");}
00610   static std::string stringify(T val) {return stringifyOrdinal(val);}
00611   static inline bool isGlobalOrdinal() {return true; }
00612   static inline T difference(const T a, const T b) { return (b-a); }
00613   static inline bool is_valid_id_type() {return true; }
00614   static void minMax(const T *values, size_t n, T &min, T &max) {
00615     std::pair<T, T> ext = z2LocalMinMax(values, n);
00616     min = ext.first;
00617     max = ext.second;
00618   }
00619   static void globalMinMax(const Comm<int> &comm, bool flag,
00620       T localMin, T localMax, T &globalMin, T &globalMax){
00621     z2GlobalMinMax(comm, flag, localMin, localMax, globalMin, globalMax);}
00622   static bool areConsecutive(const T *val, size_t n){ 
00623     return z2AreConsecutive(val, n); }
00624 };
00625 
00626 #endif
00627 
00628 template<>
00629 struct IdentifierTraits<string> {
00630   typedef string T;
00631   static inline int hashCode(const T a) {
00632     return getHashCode(
00633       reinterpret_cast<const unsigned char *>(a.c_str()), a.size());}
00634   static inline bool hasUniqueKey() { return false;}
00635   static inline double key(const T a){ throw std::logic_error("invalid call");}
00636   static inline std::string name()   {return("string");}
00637   static std::string stringify(T val) {return val;}
00638   static inline bool isGlobalOrdinal() {return false; }
00639   static inline T difference(const T a, const T b) { 
00640     throw std::logic_error("invalid call");}
00641   static inline bool is_valid_id_type() {return true; }
00642   static void minMax(const T *values, size_t n, T &min, T &max) {
00643     throw std::logic_error("invalid call");}
00644   static void globalMinMax(const Comm<int> &comm, bool flag,
00645       T localMin, T localMax, T &globalMin, T &globalMax){
00646     throw std::logic_error("invalid call");}
00647   static bool areConsecutive(const T *val, size_t n){ 
00648     throw std::logic_error("invalid call");}
00649 };
00650 
00651 template<typename T1, typename T2>
00652 struct IdentifierTraits<std::pair<T1, T2> > {
00653   typedef std::pair<T1, T2> pair_t;
00654   typedef typename std::pair<pair_t, pair_t> pairPair_t;
00655 
00656   static inline int hashCode(const pair_t p)  {
00657     return IdentifierTraits<T1>::hashCode(p.first) +
00658       IdentifierTraits<T2>::hashCode(p.second);
00659   }
00660 
00661   static inline bool hasUniqueKey() { 
00662     if ((sizeof(T1)*2 <= sizeof(double))&&(sizeof(T2)*2 <= sizeof(double)))
00663       return true;
00664     else
00665       return false;
00666   }
00667 
00668   static inline double key(const pair_t p)  {
00669     size_t nbytes = sizeof(double) / 2;
00670     if ((sizeof(T1) > nbytes)||(sizeof(T2) > nbytes))
00671       throw std::logic_error("invalid call");
00672     else{
00673       double keyVal=0;
00674       char *cx = reinterpret_cast<char *>(&keyVal);
00675       char *cy = cx + nbytes;
00676       T1 *xpos = reinterpret_cast<T1 *>(cx + nbytes-sizeof(T1));
00677       T2 *ypos = reinterpret_cast<T2 *>(cy + nbytes-sizeof(T2));
00678       *xpos = p.first;
00679       *ypos = p.second;
00680       return keyVal;
00681     }
00682   }
00683 
00684   static inline std::string name() {
00685      std::string s("std::pair<");
00686      s += IdentifierTraits<T1>::name();
00687      s += std::string(", ");
00688      s += IdentifierTraits<T2>::name();
00689      s += std::string(">");
00690      return s;
00691   }
00692 
00693   static std::string stringify(pair_t val) {
00694     std::ostringstream oss;
00695     oss << "pair<" << IdentifierTraits<T1>::name();
00696     oss << "," << IdentifierTraits<T2>::name();
00697     oss << ">(" << val.first << "," << val.second << ")";
00698     return oss.str();
00699   }
00700 
00701   static inline bool isGlobalOrdinal() {return false; }
00702 
00703   static inline pair_t difference( const pair_t a, const pair_t b) {
00704       throw std::logic_error("invalid call");
00705       return pair_t();}
00706 
00707   static inline bool is_valid_id_type() {
00708     return (sizeof(T1)+sizeof(T2) <= sizeof(double)); }
00709 
00710   static void minMax(const pair_t *values, size_t n, pair_t &min, pair_t &max) {
00711       throw std::logic_error("invalid call");}
00712 
00713   static void globalMinMax(const Comm<int> &comm, bool flag,
00714       pair_t localMin, pair_t localMax, 
00715       pair_t &globalMin, pair_t &globalMax){
00716       throw std::logic_error("invalid call");}
00717 
00718   static bool areConsecutive(const pair_t *val, size_t n){ 
00719       throw std::logic_error("invalid call");
00720       return false; }
00721 };
00722 
00724 //  A helper function.  If T is an ordinal, are the values
00725 //    globally consecutive?  If so return true, otherwise
00726 //    return false.
00727 //
00728 //  On return, globalLen is set to the sum of the local lengths.
00729 //
00730 //  If T is an ordinal, but the list is not globally consecutive, 
00731 //    on return dist[0] is set to the global minimum of
00732 //    the values and dist[1] to the global maximum.
00733 //    
00734 //  If T is an ordinal and the list is globally consecutive,
00735 //    on return dist[p] is set to val[0] on process p.  dist[nprocs]
00736 //    is set to one past the global maximum value.
00738 
00739 template <typename T>
00740   bool globallyConsecutiveOrdinals(
00741     const Comm<int> &comm, const Environment &env, 
00742     const T* val, size_t len,
00743     ArrayRCP<T> &dist, size_t &globalLen)
00744 {
00745   // Non-ordinals can not be consecutive.
00746 
00747   if (!IdentifierTraits<T>::isGlobalOrdinal())
00748     return false;
00749 
00750   // return values
00751 
00752   T *distBuf= NULL;
00753   T *minMaxBuf= NULL;
00754   globalLen = 0;
00755 
00756   // Locally consecutive?
00757 
00758   int nprocs = comm.getSize();
00759   bool locallyConsecutive = IdentifierTraits<T>::areConsecutive(val, len);
00760   bool globallyConsecutive = false;
00761 
00762   if (nprocs == 1){
00763     if (locallyConsecutive){
00764       distBuf = new T [2];
00765       if (len > 0){
00766         distBuf[0] = val[0];
00767         distBuf[1] = val[len-1] + 1;
00768       }
00769       else{
00770         distBuf[0] = 0;
00771         distBuf[1] = 0;
00772       }
00773     }
00774     else{
00775       std::pair<T, T> extrema = z2LocalMinMax(val, len);
00776       minMaxBuf = new T [2];
00777       minMaxBuf[0] = extrema.first;
00778       minMaxBuf[1] = extrema.second;
00779     }
00780 
00781     globalLen = len;
00782     globallyConsecutive = locallyConsecutive;
00783   }
00784   else{   // nprocs > 1
00785     try{
00786       reduceAll<int, size_t>(comm, Teuchos::REDUCE_SUM, 1, &len, &globalLen);
00787     }
00788     Z2_THROW_OUTSIDE_ERROR(env);
00789   
00790     T v0, v1, gMin, gMax;
00791   
00792     if (len > 0){
00793       v0 = val[0];
00794       v1 = val[len-1];
00795     }
00796     else {
00797       v0 = numeric_limits<T>::max();
00798       v1 = numeric_limits<T>::min();
00799     }
00800   
00801     try{
00802       IdentifierTraits<T>::globalMinMax(comm, len==0, v0, v1, gMin, gMax);
00803     }
00804     Z2_FORWARD_EXCEPTIONS; 
00805   
00806     int lFlag = (locallyConsecutive ? 1 : 0);
00807     int gFlag = 0;
00808   
00809     try{
00810       reduceAll<int, int>(comm, Teuchos::REDUCE_MIN, 1, &lFlag, &gFlag);
00811     }
00812     Z2_THROW_OUTSIDE_ERROR(env);
00813   
00814     if (gFlag == 1){  // all processes have consecutive values
00815   
00816       size_t g0 = static_cast<size_t>(gMin);
00817       size_t g1 = static_cast<size_t>(gMax);
00818     
00819       if (g1 - g0 + 1 == globalLen){
00820         size_t sentinel = g1 + 1;   // invalid id
00821         size_t sendVal = sentinel;
00822         if (len > 0)
00823           sendVal = static_cast<size_t>(v0);
00824     
00825         Array<size_t> sendBuf(nprocs, sendVal);
00826         ArrayRCP<size_t> recvBuf;
00827     
00828         try{
00829           AlltoAll<size_t>(comm, env, sendBuf, 1, recvBuf);
00830         }
00831         Z2_FORWARD_EXCEPTIONS;
00832     
00833         int numNoIds = 0;  // number of procs with no ids
00834         for (int i=0; i < nprocs; i++)
00835           if (recvBuf[i] == sentinel)
00836             numNoIds++;
00837     
00838         globallyConsecutive = true;
00839     
00840         if (numNoIds == 0){
00841           for (int i=1; globallyConsecutive && i < nprocs; i++)
00842             if (recvBuf[i] < recvBuf[i-1])
00843               globallyConsecutive = false;
00844     
00845           if (globallyConsecutive){
00846             distBuf = new T [nprocs+1];
00847             for (int i=0; i < nprocs; i++)
00848               distBuf[i] = static_cast<T>(recvBuf[i]);
00849             distBuf[nprocs] = static_cast<T>(sentinel);
00850           }
00851         }
00852         else{
00853           Array<int> index;
00854           for (int i=0; i < nprocs; i++)
00855             if (recvBuf[i] != sentinel)
00856               index.push_back(i);
00857     
00858           for (int i=1; i < index.size(); i++){
00859             if (recvBuf[index[i]] < recvBuf[index[i-1]])
00860               globallyConsecutive = false;
00861           }
00862     
00863           if (globallyConsecutive){
00864             distBuf = new T [nprocs+1];
00865             for (int i=0; i < nprocs+1; i++)
00866               distBuf[i] = static_cast<T>(sentinel);
00867     
00868             for (int i=0; i < index.size(); i++)
00869               distBuf[index[i]] = static_cast<T>(recvBuf[index[i]]);
00870     
00871             T useValue = static_cast<T>(sentinel);
00872             for (int i = nprocs-1; i >= 0; i--){
00873               if (distBuf[i] == static_cast<T>(sentinel))
00874                 distBuf[i] = useValue;
00875               else
00876                 useValue = distBuf[i];
00877             }
00878           }
00879         }
00880       }
00881     }  // all processes have locally consecutive values
00882 
00883     if (!globallyConsecutive){
00884       minMaxBuf = new T [2];
00885       minMaxBuf[0] = gMin;
00886       minMaxBuf[1] = gMax;
00887     }
00888   }    // nprocs > 1
00889 
00890   if (minMaxBuf)
00891     dist = arcp(minMaxBuf, 0, 2, true);
00892   else if (distBuf)
00893     dist = arcp(distBuf, 0, nprocs+1, true);
00894   else
00895      throw std::logic_error("no buffer");  // should never get here
00896 
00897   return globallyConsecutive;
00898 }
00899 
00900 #endif  // DOXYGEN_SHOULD_SKIP_THIS
00901 
00902 } // namespace Z2
00903 
00904 #endif // _ZOLTAN2_IDENTIFIERTRAITS