Sierra Toolkit Version of the Day
DistributedIndex.cpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #include <stdexcept>
00010 #include <sstream>
00011 #include <algorithm>
00012 #include <limits>
00013 #include <stdint.h>
00014 
00015 #include <stk_util/environment/ReportHandler.hpp>
00016 
00017 #include <stk_util/parallel/ParallelComm.hpp>
00018 #include <stk_util/parallel/DistributedIndex.hpp>
00019 
00020 #include <stk_util/util/RadixSort.hpp>
00021 
00022 namespace stk {
00023 namespace parallel {
00024 
00025 //----------------------------------------------------------------------
00026 
00027 namespace {
00028 
00029 struct KeyProcLess {
00030 
00031   bool operator()( const DistributedIndex::KeyProc & lhs ,
00032                    const DistributedIndex::KeyType & rhs ) const
00033   { return lhs.first < rhs ; }
00034 
00035 };
00036 
00037 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage )
00038 {
00039   std::vector<DistributedIndex::KeyProc>::iterator
00040     i = key_usage.begin() ,
00041     j = key_usage.end() ;
00042 
00043   std::sort( i , j );
00044 
00045   i = std::unique( i , j );
00046 
00047   key_usage.erase( i , j );
00048 }
00049 
00050 void sort_unique( std::vector<DistributedIndex::KeyType> & keys )
00051 {
00052   stk::util::radix_sort_unsigned((keys.empty() ? NULL : &keys[0]), keys.size());
00053 
00054   std::vector<DistributedIndex::KeyType>::iterator
00055     i = keys.begin() ,
00056     j = keys.end() ;
00057 
00058   i = std::unique( i , j );
00059   keys.erase( i , j );
00060 }
00061 
00062 // reserve vector size (current size + rev_buffer remaining)
00063 template < class T >
00064 inline void reserve_for_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v)
00065 {
00066   unsigned num_remote = 0;
00067   for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
00068     CommBuffer & buf = all.recv_buffer( p );
00069     num_remote += buf.remaining() / sizeof(T);
00070   }
00071   v.reserve(v.size() + num_remote);
00072 }
00073 
00074 // unpack buffer into vector
00075 template < class T >
00076 inline void unpack_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v)
00077 {
00078   reserve_for_recv_buffer(all, comm_size, v);
00079   for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
00080     CommBuffer & buf = all.recv_buffer( p );
00081     while ( buf.remaining() ) {
00082       T kp;
00083       buf.unpack( kp );
00084       v.push_back( kp );
00085     }
00086   }
00087 }
00088 
00089 // unpack buffer into vector, where pair.second is the processor
00090 template < class T >
00091 inline void unpack_with_proc_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<std::pair<T,DistributedIndex::ProcType> >& v)
00092 {
00093   reserve_for_recv_buffer(all, comm_size, v);
00094   for ( DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
00095     CommBuffer & buf = all.recv_buffer( p );
00096     std::pair<T,DistributedIndex::ProcType> kp;
00097     kp.second = p;
00098     while ( buf.remaining() ) {
00099       buf.unpack( kp.first );
00100       v.push_back( kp );
00101     }
00102   }
00103 }
00104 
00105 } // namespace <unnamed>
00106 
00107 //----------------------------------------------------------------------
00108 
00109 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 }; 
00110 
00111 enum { DISTRIBUTED_INDEX_CHUNK_SIZE =
00112        size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS };
00113 
00114 DistributedIndex::ProcType
00115 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const
00116 {
00117   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00118     if ( m_key_span[i].first <= key && key <= m_key_span[i].second ) {
00119        const KeyType offset = key - m_key_span[i].first ;
00120        return ( offset >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ;
00121     }
00122   }
00123   return m_comm_size ;
00124 }
00125 
00126 //----------------------------------------------------------------------
00127 
00128 DistributedIndex::~DistributedIndex() {}
00129 
00130 DistributedIndex::DistributedIndex (
00131   ParallelMachine comm ,
00132   const std::vector<KeySpan> & partition_bounds )
00133   : m_comm( comm ),
00134     m_comm_rank( parallel_machine_rank( comm ) ),
00135     m_comm_size( parallel_machine_size( comm ) ),
00136     m_span_count(0),
00137     m_key_span(),
00138     m_key_usage()
00139 {
00140   unsigned info[2] ;
00141   info[0] = partition_bounds.size();
00142   info[1] = 0 ;
00143 
00144   // Check each span for validity
00145 
00146   for ( std::vector<KeySpan>::const_iterator
00147         i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00148     if ( i->second < i->first ||
00149          ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) {
00150       info[1] = 1 ;
00151     }
00152   }
00153 
00154 #if defined( STK_HAS_MPI )
00155   if (m_comm_size > 1) {
00156     MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm );
00157   }
00158 
00159   if ( 0 < info[0] ) {
00160     m_key_span.resize( info[0] );
00161     if ( 0 == parallel_machine_rank( comm ) ) {
00162       m_key_span = partition_bounds ;
00163     }
00164     if (m_comm_size > 1) {
00165       MPI_Bcast( (m_key_span.empty() ? NULL : & m_key_span[0]), info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm );
00166     }
00167   }
00168 #else
00169   m_key_span = partition_bounds ;
00170 #endif
00171 
00172   if ( info[1] ) {
00173     std::ostringstream msg ;
00174     msg << "sierra::parallel::DistributedIndex ctor( comm , " ;
00175 
00176     for ( std::vector<KeySpan>::const_iterator
00177           i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00178       msg << " ( min = " << i->first << " , max = " << i->second << " )" ;
00179     }
00180     msg << " ) contains invalid span of keys" ;
00181     throw std::runtime_error( msg.str() );
00182   }
00183 
00184   m_span_count = info[0] ;
00185 
00186   if ( 0 == m_span_count ) {
00187     m_key_span.push_back(
00188       KeySpan( std::numeric_limits<KeyType>::min(),
00189                std::numeric_limits<KeyType>::max() ) );
00190     m_span_count = 1 ;
00191   }
00192 }
00193 
00194 //----------------------------------------------------------------------
00195 //----------------------------------------------------------------------
00196 
00197 namespace {
00198 
00199 bool is_sorted_and_unique( const std::vector<DistributedIndex::KeyProc> & key_usage )
00200 {
00201   std::vector<DistributedIndex::KeyProc>::const_iterator itr = key_usage.begin();
00202   std::vector<DistributedIndex::KeyProc>::const_iterator end = key_usage.end();
00203   for ( ; itr != end; ++itr ) {
00204     if ( itr + 1 != end && *itr >= *(itr + 1) ) {
00205       return false;
00206     }
00207   }
00208   return true;
00209 }
00210 
00211 void query_pack_to_usage(
00212   const std::vector<DistributedIndex::KeyProc> & key_usage ,
00213   const std::vector<DistributedIndex::KeyType> & request ,
00214   CommAll & all )
00215 {
00216   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00217   std::vector<DistributedIndex::KeyType>::const_iterator k = request.begin();
00218 
00219   for ( ; k != request.end() && i != key_usage.end() ; ++k ) {
00220 
00221     for ( ; i != key_usage.end() && i->first < *k ; ++i );
00222 
00223     std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00224     for ( ; j != key_usage.end() && j->first == *k ; ++j );
00225 
00226     for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00227           jsend = i ; jsend != j ; ++jsend ) {
00228 
00229       for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00230             jinfo = i ; jinfo != j ; ++jinfo ) {
00231 
00232         all.send_buffer( jsend->second )
00233            .pack<DistributedIndex::KeyProc>( *jinfo );
00234       }
00235     }
00236   }
00237 }
00238 
00239 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage ,
00240                  const std::vector<DistributedIndex::KeyProc> & request ,
00241                  CommAll & all )
00242 {
00243   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00244 
00245   for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00246         k =  request.begin() ;
00247         k != request.end() &&
00248         i != key_usage.end() ; ++k ) {
00249 
00250     for ( ; i != key_usage.end() && i->first < k->first ; ++i );
00251 
00252     for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00253           j != key_usage.end() && j->first == k->first ; ++j ) {
00254       all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j );
00255     }
00256   }
00257 }
00258 
00259 }
00260 
00261 void DistributedIndex::query(
00262   const std::vector<DistributedIndex::KeyProc> & request ,
00263         std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const
00264 {
00265   sharing_of_keys.clear();
00266 
00267   CommAll all( m_comm );
00268 
00269   query_pack( m_key_usage , request , all ); // Sizing
00270 
00271   all.allocate_buffers( m_comm_size / 4 , false );
00272 
00273   query_pack( m_key_usage , request , all ); // Packing
00274 
00275   all.communicate();
00276 
00277   unpack_recv_buffer(all, m_comm_size, sharing_of_keys);
00278 
00279   std::sort( sharing_of_keys.begin() , sharing_of_keys.end() );
00280 }
00281 
00282 void DistributedIndex::query(
00283   std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const
00284 {
00285   query( m_key_usage , sharing_of_local_keys );
00286 }
00287 
00288 void DistributedIndex::query(
00289   const std::vector<DistributedIndex::KeyType> & keys ,
00290         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00291 {
00292   std::vector<KeyProc> request ;
00293 
00294   {
00295     bool bad_key = false ;
00296     CommAll all( m_comm );
00297 
00298     for ( std::vector<KeyType>::const_iterator
00299           k = keys.begin() ; k != keys.end() ; ++k ) {
00300       const ProcType p = to_which_proc( *k );
00301 
00302       if ( p < m_comm_size ) {
00303         all.send_buffer( p ).pack<KeyType>( *k );
00304       }
00305       else {
00306         bad_key = true ;
00307       }
00308     }
00309 
00310     // Error condition becomes global:
00311 
00312     bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
00313 
00314     if ( bad_key ) {
00315       throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range");
00316     }
00317 
00318     for ( std::vector<KeyType>::const_iterator
00319           k = keys.begin() ; k != keys.end() ; ++k ) {
00320       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00321     }
00322 
00323     all.communicate();
00324 
00325     unpack_with_proc_recv_buffer(all, m_comm_size, request);
00326   }
00327 
00328   sort_unique( request );
00329 
00330   query( request , sharing_keys );
00331 }
00332 
00333 void DistributedIndex::query_to_usage(
00334   const std::vector<DistributedIndex::KeyType> & keys ,
00335         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00336 {
00337   std::vector<KeyType> request ;
00338 
00339   {
00340     bool bad_key = false ;
00341     CommAll all( m_comm );
00342 
00343     for ( std::vector<KeyType>::const_iterator
00344           k = keys.begin() ; k != keys.end() ; ++k ) {
00345       const ProcType p = to_which_proc( *k );
00346 
00347       if ( p < m_comm_size ) {
00348         all.send_buffer( p ).pack<KeyType>( *k );
00349       }
00350       else {
00351         bad_key = true ;
00352       }
00353     }
00354 
00355     // Error condition becomes global:
00356 
00357     bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
00358 
00359     if ( bad_key ) {
00360       throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range");
00361     }
00362 
00363     for ( std::vector<KeyType>::const_iterator
00364           k = keys.begin() ; k != keys.end() ; ++k ) {
00365       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00366     }
00367 
00368     all.communicate();
00369 
00370     unpack_recv_buffer(all, m_comm_size, request);
00371   }
00372 
00373   sort_unique( request );
00374 
00375   {
00376     CommAll all( m_comm );
00377 
00378     query_pack_to_usage( m_key_usage , request , all ); // Sizing
00379 
00380     all.allocate_buffers( m_comm_size / 4 , false );
00381 
00382     query_pack_to_usage( m_key_usage , request , all ); // Packing
00383 
00384     all.communicate();
00385 
00386     unpack_recv_buffer(all, m_comm_size, sharing_keys);
00387 
00388     std::sort( sharing_keys.begin() , sharing_keys.end() );
00389   }
00390 }
00391 
00392 //----------------------------------------------------------------------
00393 //----------------------------------------------------------------------
00394 
00395 namespace {
00396 
00397 struct RemoveKeyProc {
00398 
00399   bool operator()( const DistributedIndex::KeyProc & kp ) const
00400   { return kp.second < 0 ; }
00401 
00402   static void mark( std::vector<DistributedIndex::KeyProc> & key_usage ,
00403                     const DistributedIndex::KeyProc & kp )
00404   {
00405     std::vector<DistributedIndex::KeyProc>::iterator
00406       i = std::lower_bound( key_usage.begin(),
00407                             key_usage.end(), kp.first, KeyProcLess() );
00408 
00409     // Iterate over the span of KeyProcs with matching key until an exact match
00410     // is found. We have to do it this way because marking a KeyProc unsorts it
00411     // in the key_usage vector, so we cannot look up KeyProcs directly once marking
00412     // has begun.
00413     while ( i != key_usage.end() && kp.first == i->first && kp.second != i->second) { ++i ; }
00414 
00415     if ( i != key_usage.end() && kp == *i ) {
00416       i->second = -1 ;
00417     }
00418   }
00419 
00420   static void clean( std::vector<DistributedIndex::KeyProc> & key_usage )
00421   {
00422     std::vector<DistributedIndex::KeyProc>::iterator end =
00423       std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() );
00424     key_usage.erase( end , key_usage.end() );
00425   }
00426 };
00427 
00428 }
00429 
00430 void DistributedIndex::update_keys(
00431   const std::vector<DistributedIndex::KeyType> & add_new_keys ,
00432   const std::vector<DistributedIndex::KeyType> & remove_existing_keys )
00433 {
00434   std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 );
00435   std::vector<unsigned long> count_add(    m_comm_size , (unsigned long)0 );
00436 
00437   size_t local_bad_input = 0 ;
00438 
00439   // Iterate over keys being removed and keep a count of keys being removed
00440   // from other processes
00441   for ( std::vector<KeyType>::const_iterator
00442         i = remove_existing_keys.begin();
00443         i != remove_existing_keys.end(); ++i ) {
00444     const ProcType p = to_which_proc( *i );
00445     if ( m_comm_size <= p ) {
00446       // Key is not within one of the span:
00447       ++local_bad_input ;
00448     }
00449     else if ( p != m_comm_rank ) {
00450       ++( count_remove[ p ] );
00451     }
00452   }
00453 
00454   // Iterate over keys being added and keep a count of keys being added
00455   // to other processes
00456   for ( std::vector<KeyType>::const_iterator
00457         i = add_new_keys.begin();
00458         i != add_new_keys.end(); ++i ) {
00459     const ProcType p = to_which_proc( *i );
00460     if ( p == m_comm_size ) {
00461       // Key is not within one of the span:
00462       ++local_bad_input ;
00463     }
00464     else if ( p != m_comm_rank ) {
00465       ++( count_add[ p ] );
00466     }
00467   }
00468 
00469   CommAll all( m_comm );
00470 
00471   // Sizing and add_new_keys bounds checking:
00472 
00473   // For each process, we are going to send the number of removed keys,
00474   // the removed keys, and the added keys. It will be assumed that any keys
00475   // beyond the number of removed keys will be added keys.
00476   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00477     if ( count_remove[p] || count_add[p] ) {
00478       CommBuffer & buf = all.send_buffer( p );
00479       buf.skip<unsigned long>( 1 );
00480       buf.skip<KeyType>( count_remove[p] );
00481       buf.skip<KeyType>( count_add[p] );
00482     }
00483   }
00484 
00485   // Allocate buffers and perform a global OR of error_flag
00486   const bool symmetry_flag = false ;
00487   const bool error_flag = 0 < local_bad_input ;
00488 
00489   bool global_bad_input =
00490     all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag );
00491 
00492   if ( global_bad_input ) {
00493     std::ostringstream msg ;
00494 
00495     if ( 0 < local_bad_input ) {
00496       msg << "stk::parallel::DistributedIndex::update_keys ERROR Given "
00497           << local_bad_input << " of " << add_new_keys.size()
00498           << " add_new_keys outside of any span" ;
00499     }
00500 
00501     throw std::runtime_error( msg.str() );
00502   }
00503 
00504   // Packing:
00505 
00506   // Pack the remove counts for each process
00507   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00508     if ( count_remove[p] || count_add[p] ) {
00509       all.send_buffer( p ).pack<unsigned long>( count_remove[p] );
00510     }
00511   }
00512 
00513   // Pack the removed keys for each process
00514   for ( std::vector<KeyType>::const_iterator
00515         i = remove_existing_keys.begin();
00516         i != remove_existing_keys.end(); ++i ) {
00517     const ProcType p = to_which_proc( *i );
00518     if ( p != m_comm_rank ) {
00519       all.send_buffer( p ).pack<KeyType>( *i );
00520     }
00521   }
00522 
00523   // Pack the added keys for each process
00524   for ( std::vector<KeyType>::const_iterator
00525         i = add_new_keys.begin();
00526         i != add_new_keys.end(); ++i ) {
00527     const ProcType p = to_which_proc( *i );
00528     if ( p != m_comm_rank ) {
00529       all.send_buffer( p ).pack<KeyType>( *i );
00530     }
00531   }
00532 
00533   // Communicate keys
00534   all.communicate();
00535 
00536   //------------------------------
00537   // Remove for local keys
00538 
00539   for ( std::vector<KeyType>::const_iterator
00540         i = remove_existing_keys.begin();
00541         i != remove_existing_keys.end(); ++i ) {
00542     const ProcType p = to_which_proc( *i );
00543     if ( p == m_comm_rank ) {
00544       RemoveKeyProc::mark( m_key_usage , KeyProc( *i , p ) );
00545     }
00546   }
00547 
00548   // Unpack the remove key and find it.
00549   // Set the process to a negative value for subsequent removal.
00550 
00551   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00552     CommBuffer & buf = all.recv_buffer( p );
00553     if ( buf.remaining() ) {
00554       unsigned long remove_count = 0 ;
00555 
00556       KeyProc kp ;
00557 
00558       kp.second = p ;
00559 
00560       buf.unpack<unsigned long>( remove_count );
00561 
00562       for ( ; 0 < remove_count ; --remove_count ) {
00563         buf.unpack<KeyType>( kp.first );
00564 
00565         RemoveKeyProc::mark( m_key_usage , kp );
00566       }
00567     }
00568   }
00569 
00570   RemoveKeyProc::clean( m_key_usage );
00571 
00572   //------------------------------
00573   // Append for local keys
00574 
00575   // Add new_keys going to this proc to local_key_usage
00576   std::vector<KeyProc> local_key_usage ;
00577   local_key_usage.reserve(add_new_keys.size());
00578   for ( std::vector<KeyType>::const_iterator
00579         i = add_new_keys.begin();
00580         i != add_new_keys.end(); ++i ) {
00581 
00582     const ProcType p = to_which_proc( *i );
00583     if ( p == m_comm_rank ) {
00584       local_key_usage.push_back( KeyProc( *i , p ) );
00585     }
00586   }
00587 
00588   // Merge local_key_usage and m_key_usage into temp_key
00589   std::vector<KeyProc> temp_key ;
00590   temp_key.reserve(local_key_usage.size() + m_key_usage.size());
00591   std::sort( local_key_usage.begin(), local_key_usage.end() );
00592   std::merge( m_key_usage.begin(),
00593               m_key_usage.end(),
00594               local_key_usage.begin(),
00595               local_key_usage.end(),
00596               std::back_inserter(temp_key) );
00597 
00598   // Unpack and append for remote keys:
00599   std::vector<KeyProc> remote_key_usage ;
00600 
00601   unpack_with_proc_recv_buffer(all, m_comm_size, remote_key_usage);
00602 
00603   std::sort( remote_key_usage.begin(), remote_key_usage.end() );
00604 
00605   m_key_usage.clear();
00606   m_key_usage.reserve(temp_key.size() + remote_key_usage.size());
00607 
00608   // Merge temp_key and remote_key_usage into m_key_usage, so...
00609   //   m_key_usage = local_key_usage + remote_key_usage + m_key_usage(orig)
00610   std::merge( temp_key.begin(),
00611               temp_key.end(),
00612               remote_key_usage.begin(),
00613               remote_key_usage.end(),
00614               std::back_inserter(m_key_usage) );
00615 
00616   // Unique m_key_usage
00617   m_key_usage.erase(std::unique( m_key_usage.begin(),
00618                                  m_key_usage.end()),
00619                     m_key_usage.end() );
00620 
00621   // Check invariant that m_key_usage is sorted
00622   ThrowAssertMsg( is_sorted_and_unique(m_key_usage), "Sorted&unique invariant violated!" );
00623 }
00624 
00625 //----------------------------------------------------------------------
00626 //----------------------------------------------------------------------
00627 
00628 void DistributedIndex::generate_new_global_key_upper_bound(
00629   const std::vector<size_t>  & requests ,
00630         std::vector<DistributedIndex::KeyType> & global_key_upper_bound ) const
00631 {
00632   bool bad_request = m_span_count != requests.size();
00633 
00634   std::ostringstream error_msg ;
00635 
00636   error_msg
00637   << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ;
00638 
00639   std::vector<unsigned long>
00640     local_counts( m_span_count + 1 , (unsigned long) 0 ),
00641     global_counts( m_span_count + 1 , (unsigned long) 0 );
00642 
00643   // Count unique keys in each span and add requested keys for
00644   // final total count of keys needed.
00645 
00646   // Append the error check to this communication to avoid
00647   // and extra reduction operation.
00648   local_counts[ m_span_count ] = m_span_count != requests.size();
00649 
00650   if ( m_span_count == requests.size() ) {
00651 
00652     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00653       local_counts[i] = requests[i] ;
00654     }
00655 
00656     std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00657 
00658     for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) {
00659       const KeyType key_span_last = m_key_span[i].second ;
00660       size_t count = 0 ;
00661       while ( j != m_key_usage.end() && j->first <= key_span_last ) {
00662         const KeyType key = j->first ;
00663         while ( j != m_key_usage.end() && key == j->first ) { ++j ; }
00664         ++count ;
00665       }
00666       local_counts[i] += count ;
00667     }
00668   }
00669 
00670 #if defined( STK_HAS_MPI )
00671   if (m_comm_size > 1) {
00672     MPI_Allreduce( (local_counts.empty() ? NULL : & local_counts[0]) , (global_counts.empty() ? NULL : & global_counts[0]) ,
00673                    m_span_count + 1 , MPI_UNSIGNED_LONG ,
00674                    MPI_SUM , m_comm );
00675   }
00676   else {
00677     global_counts = local_counts ;
00678   }
00679 #else
00680   global_counts = local_counts ;
00681 #endif
00682 
00683   bad_request = global_counts[m_span_count] != 0 ;
00684 
00685   if ( bad_request ) {
00686     if ( m_span_count != requests.size() ) {
00687       error_msg << " requests.size() = " << requests.size()
00688                 << " != " << m_span_count << " )" ;
00689     }
00690   }
00691 
00692   if ( ! bad_request ) {
00693     for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00694       const size_t span_available =
00695         ( 1 + m_key_span[i].second - m_key_span[i].first );
00696 
00697       const size_t span_requested = global_counts[i];
00698 
00699       if ( span_available < span_requested ) {
00700         bad_request = true ;
00701         error_msg << " global_sum( (existing+request)[" << i << "] ) = "
00702                   << span_requested
00703                   << " > global_sum( span_available ) = "
00704                   << span_available ;
00705       }
00706     }
00707   }
00708 
00709   if ( bad_request ) {
00710     throw std::runtime_error( error_msg.str() );
00711   }
00712 
00713   // Determine the maximum generated key
00714 
00715   global_key_upper_bound.resize( m_span_count );
00716 
00717   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00718      global_key_upper_bound[i] = m_key_span[i].first + global_counts[i] - 1 ;
00719   }
00720 }
00721 
00722 //--------------------------------------------------------------------
00723 //--------------------------------------------------------------------
00724 
00725 void DistributedIndex::generate_new_keys_local_planning(
00726   const std::vector<DistributedIndex::KeyType>  & key_global_upper_bound ,
00727   const std::vector<size_t>   & requests_local ,
00728         std::vector<long>     & new_request ,
00729         std::vector<KeyType>  & requested_keys ,
00730         std::vector<KeyType>  & contrib_keys ) const
00731 {
00732   new_request.assign( m_span_count , long(0) );
00733 
00734   contrib_keys.clear();
00735 
00736   std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00737 
00738   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00739     // The maximum generated key from any process will
00740     // not exceed this value.
00741     const KeyType key_upper_bound = key_global_upper_bound[i] ;
00742 
00743     const size_t init_size = contrib_keys.size();
00744 
00745     const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00746 
00747     const size_t chunk_rsize = m_comm_rank * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00748 
00749     for ( KeyType key_begin = m_key_span[i].first +
00750                   chunk_rsize ;
00751           key_begin <= key_upper_bound ; key_begin += chunk_inc ) {
00752 
00753       // What is the first key of the chunk
00754       KeyType key_iter = key_begin ;
00755 
00756       // What is the last key belonging to this process' chunk
00757       const KeyType key_last =
00758         std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_upper_bound );
00759 
00760       // Jump into the sorted used key vector to
00761       // the key which may be contributed
00762 
00763       j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() );
00764       // now know:  j == m_key_usage.end() OR
00765       //            key_iter <= j->first
00766 
00767       for ( ; key_iter <= key_last ; ++key_iter ) {
00768         if ( j == m_key_usage.end() || key_iter < j->first ) {
00769           // The current attempt 'key_iter' is not used, contribute it.
00770           contrib_keys.push_back( key_iter );
00771         }
00772         else { // j != m_key_usage.end() && key_iter == j->first
00773           // The current attempt 'key_iter' is already used,
00774           // increment the used-iterator to its next key value.
00775           while ( j != m_key_usage.end() && key_iter == j->first ) {
00776             ++j ;
00777           }
00778         }
00779       }
00780     }
00781 
00782     // Determine which local keys will be contributed,
00783     // keeping what this process could use from the contribution.
00784     // This can reduce the subsequent communication load when
00785     // donating keys to another process.
00786 
00787     const size_t this_contrib = contrib_keys.size() - init_size ;
00788 
00789     // How many keys will this process keep:
00790     const size_t keep = std::min( requests_local[i] , this_contrib );
00791 
00792     // Take the kept keys from the contributed key vector.
00793     requested_keys.insert( requested_keys.end() ,
00794                            contrib_keys.end() - keep ,
00795                            contrib_keys.end() );
00796 
00797     contrib_keys.erase( contrib_keys.end() - keep ,
00798                         contrib_keys.end() );
00799 
00800     // New request is positive for needed keys or negative for donated keys
00801     new_request[i] = requests_local[i] - this_contrib ;
00802   }
00803 }
00804 
00805 //----------------------------------------------------------------------
00806 
00807 void DistributedIndex::generate_new_keys_global_planning(
00808   const std::vector<long>    & new_request ,
00809         std::vector<long>    & my_donations ) const
00810 {
00811   my_donations.assign( m_comm_size * m_span_count , long(0) );
00812 
00813   // Gather the global request plan for receiving and donating keys
00814   // Positive values for receiving, negative values for donating.
00815 
00816   std::vector<long> new_request_global( m_comm_size * m_span_count );
00817 
00818 #if defined( STK_HAS_MPI )
00819 
00820   if (m_comm_size > 1) { // Gather requests into per-process spans
00821     // MPI doesn't do 'const' in its interface, but the send buffer is const
00822     void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) ));
00823     void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ;
00824     MPI_Allgather( send_buf , m_span_count , MPI_LONG ,
00825                    recv_buf , m_span_count , MPI_LONG , m_comm );
00826   }
00827   else {
00828     new_request_global = new_request ;
00829   }
00830 #else
00831   new_request_global = new_request ;
00832 #endif
00833 
00834   // Now have the global receive & donate plan.
00835   //--------------------------------------------------------------------
00836   // Generate my donate plan from the global receive & donate plan.
00837 
00838   for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00839 
00840     if ( new_request[i] < 0 ) { // This process is donating on this span
00841       long my_total_donate = - new_request[i] ;
00842 
00843       long previous_donate = 0 ;
00844 
00845       // Count what previous processes have donated:
00846       for ( int p = 0 ; p < m_comm_rank ; ++p ) {
00847         const long new_request_p = new_request_global[ p * m_span_count + i ] ;
00848         if ( new_request_p < 0 ) {
00849           previous_donate -= new_request_p ;
00850         }
00851       }
00852 
00853       // What the donation count will be with my donation:
00854       long end_donate = previous_donate + my_total_donate ;
00855 
00856       long previous_receive = 0 ;
00857 
00858       // Determine my donation to other processes (one to many).
00859 
00860       for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) {
00861 
00862         const long new_request_p = new_request_global[ p * m_span_count + i ];
00863 
00864         if ( 0 < new_request_p ) { // Process 'p' receives keys
00865 
00866           // Accumulation of requests:
00867 
00868           previous_receive += new_request_p ;
00869 
00870           if ( previous_donate < previous_receive ) {
00871             // I am donating to process 'p'
00872             const long n = std::min( previous_receive , end_donate )
00873                            - previous_donate ;
00874 
00875             my_donations[ p * m_span_count + i ] = n ;
00876             previous_donate += n ;
00877             my_total_donate -= n ;
00878           }
00879         }
00880       }
00881     }
00882   }
00883 }
00884 
00885 //--------------------------------------------------------------------
00886 
00887 void DistributedIndex::generate_new_keys(
00888   const std::vector<size_t>                 & requests ,
00889         std::vector< std::vector<KeyType> > & requested_keys )
00890 {
00891   //--------------------------------------------------------------------
00892   // Develop the plan:
00893 
00894   std::vector<KeyType> global_key_upper_bound ;
00895   std::vector<long>    new_request ;
00896   std::vector<long>    my_donations ;
00897   std::vector<KeyType> contrib_keys ;
00898   std::vector<KeyType> new_keys ;
00899 
00900   // Verify input and generate global sum of
00901   // current key usage and requested new keys.
00902   // Throw a parallel consistent exception if the input is bad.
00903 
00904   generate_new_global_key_upper_bound( requests , global_key_upper_bound );
00905 
00906   // No exception thrown means all inputs are good and parallel consistent
00907 
00908   // Determine which local keys will be contributed,
00909   // keeping what this process could use from the contribution.
00910   // This can reduce the subsequent communication load when
00911   // donating keys to another process.
00912 
00913   generate_new_keys_local_planning( global_key_upper_bound ,
00914                                     requests ,
00915                                     new_request ,
00916                                     new_keys ,
00917                                     contrib_keys );
00918 
00919   // Determine where this process will be donating 'contrib_keys'
00920   generate_new_keys_global_planning( new_request, my_donations );
00921 
00922   // Due to using an upper bound as opposed to an exact maximum
00923   // the contrib_keys is likely to contain more keys that are needed.
00924   // Remove unneeded keys.
00925 
00926   // Backwards to erase from the end
00927   for ( size_t i = m_span_count ; 0 < i ; ) {
00928     --i ;
00929     size_t count = 0 ;
00930     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00931       count += my_donations[ p * m_span_count + i ];
00932     }
00933     std::vector<KeyType>::iterator j_beg = contrib_keys.begin();
00934     std::vector<KeyType>::iterator j_end = contrib_keys.end();
00935     j_beg = std::lower_bound( j_beg , j_end , m_key_span[i].first );
00936     j_end = std::upper_bound( j_beg , j_end , m_key_span[i].second );
00937     const size_t n = std::distance( j_beg , j_end );
00938     if ( count < n ) {
00939       contrib_keys.erase( j_beg + count , j_end );
00940     }
00941   }
00942 
00943   // Plan is done, communicate the new keys.
00944   //--------------------------------------------------------------------
00945   // Put key this process is keeping into the index.
00946   m_key_usage.reserve(m_key_usage.size() + new_keys.size());
00947   for ( std::vector<KeyType>::iterator i = new_keys.begin();
00948         i != new_keys.end() ; ++i ) {
00949     m_key_usage.push_back( KeyProc( *i , m_comm_rank ) );
00950   }
00951 
00952   //--------------------------------------------------------------------
00953 
00954   CommAll all( m_comm );
00955 
00956   // Sizing
00957 
00958   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00959     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00960       const size_t n_to_p = my_donations[ p * m_span_count + i ];
00961       if ( 0 < n_to_p ) {
00962         all.send_buffer(p).skip<KeyType>( n_to_p );
00963       }
00964     }
00965   }
00966 
00967   all.allocate_buffers( m_comm_size / 4 , false );
00968 
00969   // Packing
00970 
00971   {
00972     size_t n = 0 ;
00973     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00974       for ( int p = 0 ; p < m_comm_size ; ++p ) {
00975         const size_t n_to_p = my_donations[ p * m_span_count + i ];
00976         if ( 0 < n_to_p ) {
00977           all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p );
00978           for ( size_t k = 0 ; k < n_to_p ; ++k , ++n ) {
00979             m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) );
00980           }
00981         }
00982       }
00983     }
00984   }
00985 
00986   std::sort( m_key_usage.begin() , m_key_usage.end() );
00987 
00988   all.communicate();
00989 
00990   // Unpacking
00991   unpack_recv_buffer( all, m_comm_size, new_keys);
00992 
00993   stk::util::radix_sort_unsigned((new_keys.empty() ? NULL : &new_keys[0]), new_keys.size());
00994 
00995   requested_keys.resize( m_span_count );
00996 
00997   {
00998     std::vector<KeyType>::iterator i_beg = new_keys.begin();
00999     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
01000       std::vector<KeyType>::iterator i_end = i_beg + requests[i] ;
01001       requested_keys[i].assign( i_beg , i_end );
01002       i_beg = i_end ;
01003     }
01004   }
01005 }
01006 
01007 //----------------------------------------------------------------------
01008 
01009 } // namespace util
01010 } // namespace stk
01011 
01012 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends