Sierra Toolkit Version of the Day
DistributedIndex.cpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #include <stdexcept>
00010 #include <sstream>
00011 #include <algorithm>
00012 #include <limits>
00013 #include <stdint.h>
00014 
00015 #include <stk_util/parallel/ParallelComm.hpp>
00016 #include <stk_util/parallel/DistributedIndex.hpp>
00017 
00018 #include <stk_util/util/RadixSort.hpp>
00019 
00020 namespace stk {
00021 namespace parallel {
00022 
00023 //----------------------------------------------------------------------
00024 
00025 namespace {
00026 
00027 struct KeyProcLess {
00028 
00029   bool operator()( const DistributedIndex::KeyProc & lhs ,
00030                    const DistributedIndex::KeyType & rhs ) const
00031   { return lhs.first < rhs ; }
00032 
00033 };
00034 
00035 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage )
00036 {
00037   std::vector<DistributedIndex::KeyProc>::iterator
00038     i = key_usage.begin() ,
00039     j = key_usage.end() ;
00040 
00041   std::sort( i , j );
00042 
00043   i = std::unique( i , j );
00044 
00045   key_usage.erase( i , j );
00046 }
00047 
00048 void sort_unique( std::vector<DistributedIndex::KeyType> & keys )
00049 {
00050   stk::util::radix_sort_unsigned((keys.empty() ? NULL : &keys[0]), keys.size());
00051 
00052   std::vector<DistributedIndex::KeyType>::iterator
00053     i = keys.begin() ,
00054     j = keys.end() ;
00055 
00056   i = std::unique( i , j );
00057   keys.erase( i , j );
00058 }
00059 
00060 // reserve vector size (current size + rev_buffer remaining)
00061 template < class T >
00062 inline void reserve_for_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v)
00063 {
00064   unsigned num_remote = 0;
00065   for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
00066     CommBuffer & buf = all.recv_buffer( p );
00067     num_remote += buf.remaining() / sizeof(T);
00068   }
00069   v.reserve(v.size() + num_remote);
00070 }
00071 
00072 // unpack buffer into vector
00073 template < class T >
00074 inline void unpack_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v)
00075 {
00076   reserve_for_recv_buffer(all, comm_size, v);
00077   for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
00078     CommBuffer & buf = all.recv_buffer( p );
00079     while ( buf.remaining() ) {
00080       T kp;
00081       buf.unpack( kp );
00082       v.push_back( kp );
00083     }
00084   }
00085 }
00086 
00087 // unpack buffer into vector, where pair.second is the processor
00088 template < class T >
00089 inline void unpack_with_proc_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<std::pair<T,DistributedIndex::ProcType> >& v)
00090 {
00091   reserve_for_recv_buffer(all, comm_size, v);
00092   for ( DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
00093     CommBuffer & buf = all.recv_buffer( p );
00094     std::pair<T,DistributedIndex::ProcType> kp;
00095     kp.second = p;
00096     while ( buf.remaining() ) {
00097       buf.unpack( kp.first );
00098       v.push_back( kp );
00099     }
00100   }
00101 }
00102 
00103 } // namespace <unnamed>
00104 
00105 //----------------------------------------------------------------------
00106 
00107 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 }; 
00108 
00109 enum { DISTRIBUTED_INDEX_CHUNK_SIZE =
00110        size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS };
00111 
00112 DistributedIndex::ProcType
00113 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const
00114 {
00115   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00116     if ( m_key_span[i].first <= key && key <= m_key_span[i].second ) {
00117        const KeyType offset = key - m_key_span[i].first ;
00118        return ( offset >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ;
00119     }
00120   }
00121   return m_comm_size ;
00122 }
00123 
00124 //----------------------------------------------------------------------
00125 
00126 DistributedIndex::~DistributedIndex() {}
00127 
00128 DistributedIndex::DistributedIndex (
00129   ParallelMachine comm ,
00130   const std::vector<KeySpan> & partition_bounds )
00131   : m_comm( comm ),
00132     m_comm_rank( parallel_machine_rank( comm ) ),
00133     m_comm_size( parallel_machine_size( comm ) ),
00134     m_span_count(0),
00135     m_key_span(),
00136     m_key_usage()
00137 {
00138   unsigned info[2] ;
00139   info[0] = partition_bounds.size();
00140   info[1] = 0 ;
00141 
00142   // Check each span for validity
00143 
00144   for ( std::vector<KeySpan>::const_iterator
00145         i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00146     if ( i->second < i->first ||
00147          ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) {
00148       info[1] = 1 ;
00149     }
00150   }
00151 
00152 #if defined( STK_HAS_MPI )
00153   if (m_comm_size > 1) {
00154     MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm );
00155   }
00156 
00157   if ( 0 < info[0] ) {
00158     m_key_span.resize( info[0] );
00159     if ( 0 == parallel_machine_rank( comm ) ) {
00160       m_key_span = partition_bounds ;
00161     }
00162     if (m_comm_size > 1) {
00163       MPI_Bcast( (m_key_span.empty() ? NULL : & m_key_span[0]), info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm );
00164     }
00165   }
00166 #else
00167   m_key_span = partition_bounds ;
00168 #endif
00169 
00170   if ( info[1] ) {
00171     std::ostringstream msg ;
00172     msg << "sierra::parallel::DistributedIndex ctor( comm , " ;
00173 
00174     for ( std::vector<KeySpan>::const_iterator
00175           i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00176       msg << " ( min = " << i->first << " , max = " << i->second << " )" ;
00177     }
00178     msg << " ) contains invalid span of keys" ;
00179     throw std::runtime_error( msg.str() );
00180   }
00181 
00182   m_span_count = info[0] ;
00183 
00184   if ( 0 == m_span_count ) {
00185     m_key_span.push_back(
00186       KeySpan( std::numeric_limits<KeyType>::min(),
00187                std::numeric_limits<KeyType>::max() ) );
00188     m_span_count = 1 ;
00189   }
00190 }
00191 
00192 //----------------------------------------------------------------------
00193 //----------------------------------------------------------------------
00194 
00195 namespace {
00196 
00197 bool is_sorted_and_unique( const std::vector<DistributedIndex::KeyProc> & key_usage )
00198 {
00199   std::vector<DistributedIndex::KeyProc>::const_iterator itr = key_usage.begin();
00200   std::vector<DistributedIndex::KeyProc>::const_iterator end = key_usage.end();
00201   for ( ; itr != end; ++itr ) {
00202     if ( itr + 1 != end && *itr >= *(itr + 1) ) {
00203       return false;
00204     }
00205   }
00206   return true;
00207 }
00208 
00209 void query_pack_to_usage(
00210   const std::vector<DistributedIndex::KeyProc> & key_usage ,
00211   const std::vector<DistributedIndex::KeyType> & request ,
00212   CommAll & all )
00213 {
00214   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00215   std::vector<DistributedIndex::KeyType>::const_iterator k = request.begin();
00216 
00217   for ( ; k != request.end() && i != key_usage.end() ; ++k ) {
00218 
00219     for ( ; i != key_usage.end() && i->first < *k ; ++i );
00220 
00221     std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00222     for ( ; j != key_usage.end() && j->first == *k ; ++j );
00223 
00224     for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00225           jsend = i ; jsend != j ; ++jsend ) {
00226 
00227       for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00228             jinfo = i ; jinfo != j ; ++jinfo ) {
00229 
00230         all.send_buffer( jsend->second )
00231            .pack<DistributedIndex::KeyProc>( *jinfo );
00232       }
00233     }
00234   }
00235 }
00236 
00237 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage ,
00238                  const std::vector<DistributedIndex::KeyProc> & request ,
00239                  CommAll & all )
00240 {
00241   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00242 
00243   for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00244         k =  request.begin() ;
00245         k != request.end() &&
00246         i != key_usage.end() ; ++k ) {
00247 
00248     for ( ; i != key_usage.end() && i->first < k->first ; ++i );
00249 
00250     for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00251           j != key_usage.end() && j->first == k->first ; ++j ) {
00252       all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j );
00253     }
00254   }
00255 }
00256 
00257 }
00258 
00259 void DistributedIndex::query(
00260   const std::vector<DistributedIndex::KeyProc> & request ,
00261         std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const
00262 {
00263   sharing_of_keys.clear();
00264 
00265   CommAll all( m_comm );
00266 
00267   query_pack( m_key_usage , request , all ); // Sizing
00268 
00269   all.allocate_buffers( m_comm_size / 4 , false );
00270 
00271   query_pack( m_key_usage , request , all ); // Packing
00272 
00273   all.communicate();
00274 
00275   unpack_recv_buffer(all, m_comm_size, sharing_of_keys);
00276 
00277   std::sort( sharing_of_keys.begin() , sharing_of_keys.end() );
00278 }
00279 
00280 void DistributedIndex::query(
00281   std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const
00282 {
00283   query( m_key_usage , sharing_of_local_keys );
00284 }
00285 
00286 void DistributedIndex::query(
00287   const std::vector<DistributedIndex::KeyType> & keys ,
00288         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00289 {
00290   std::vector<KeyProc> request ;
00291 
00292   {
00293     bool bad_key = false ;
00294     CommAll all( m_comm );
00295 
00296     for ( std::vector<KeyType>::const_iterator
00297           k = keys.begin() ; k != keys.end() ; ++k ) {
00298       const ProcType p = to_which_proc( *k );
00299 
00300       if ( p < m_comm_size ) {
00301         all.send_buffer( p ).pack<KeyType>( *k );
00302       }
00303       else {
00304         bad_key = true ;
00305       }
00306     }
00307 
00308     // Error condition becomes global:
00309 
00310     bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
00311 
00312     if ( bad_key ) {
00313       throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range");
00314     }
00315 
00316     for ( std::vector<KeyType>::const_iterator
00317           k = keys.begin() ; k != keys.end() ; ++k ) {
00318       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00319     }
00320 
00321     all.communicate();
00322 
00323     unpack_with_proc_recv_buffer(all, m_comm_size, request);
00324   }
00325 
00326   sort_unique( request );
00327 
00328   query( request , sharing_keys );
00329 }
00330 
00331 void DistributedIndex::query_to_usage(
00332   const std::vector<DistributedIndex::KeyType> & keys ,
00333         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00334 {
00335   std::vector<KeyType> request ;
00336 
00337   {
00338     bool bad_key = false ;
00339     CommAll all( m_comm );
00340 
00341     for ( std::vector<KeyType>::const_iterator
00342           k = keys.begin() ; k != keys.end() ; ++k ) {
00343       const ProcType p = to_which_proc( *k );
00344 
00345       if ( p < m_comm_size ) {
00346         all.send_buffer( p ).pack<KeyType>( *k );
00347       }
00348       else {
00349         bad_key = true ;
00350       }
00351     }
00352 
00353     // Error condition becomes global:
00354 
00355     bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
00356 
00357     if ( bad_key ) {
00358       throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range");
00359     }
00360 
00361     for ( std::vector<KeyType>::const_iterator
00362           k = keys.begin() ; k != keys.end() ; ++k ) {
00363       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00364     }
00365 
00366     all.communicate();
00367 
00368     unpack_recv_buffer(all, m_comm_size, request);
00369   }
00370 
00371   sort_unique( request );
00372 
00373   {
00374     CommAll all( m_comm );
00375 
00376     query_pack_to_usage( m_key_usage , request , all ); // Sizing
00377 
00378     all.allocate_buffers( m_comm_size / 4 , false );
00379 
00380     query_pack_to_usage( m_key_usage , request , all ); // Packing
00381 
00382     all.communicate();
00383 
00384     unpack_recv_buffer(all, m_comm_size, sharing_keys);
00385 
00386     std::sort( sharing_keys.begin() , sharing_keys.end() );
00387   }
00388 }
00389 
00390 //----------------------------------------------------------------------
00391 //----------------------------------------------------------------------
00392 
00393 namespace {
00394 
00395 struct RemoveKeyProc {
00396 
00397   bool operator()( const DistributedIndex::KeyProc & kp ) const
00398   { return kp.second < 0 ; }
00399 
00400   static void mark( std::vector<DistributedIndex::KeyProc> & key_usage ,
00401                     const DistributedIndex::KeyProc & kp )
00402   {
00403     std::vector<DistributedIndex::KeyProc>::iterator
00404       i = std::lower_bound( key_usage.begin(),
00405                             key_usage.end(), kp.first, KeyProcLess() );
00406 
00407     // Iterate over the span of KeyProcs with matching key until an exact match
00408     // is found. We have to do it this way because marking a KeyProc unsorts it
00409     // in the key_usage vector, so we cannot look up KeyProcs directly once marking
00410     // has begun.
00411     while ( i != key_usage.end() && kp.first == i->first && kp.second != i->second) { ++i ; }
00412 
00413     if ( i != key_usage.end() && kp == *i ) {
00414       i->second = -1 ;
00415     }
00416   }
00417 
00418   static void clean( std::vector<DistributedIndex::KeyProc> & key_usage )
00419   {
00420     std::vector<DistributedIndex::KeyProc>::iterator end =
00421       std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() );
00422     key_usage.erase( end , key_usage.end() );
00423   }
00424 };
00425 
00426 }
00427 
00428 void DistributedIndex::update_keys(
00429   const std::vector<DistributedIndex::KeyType> & add_new_keys ,
00430   const std::vector<DistributedIndex::KeyType> & remove_existing_keys )
00431 {
00432   std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 );
00433   std::vector<unsigned long> count_add(    m_comm_size , (unsigned long)0 );
00434 
00435   size_t local_bad_input = 0 ;
00436 
00437   // Iterate over keys being removed and keep a count of keys being removed
00438   // from other processes
00439   for ( std::vector<KeyType>::const_iterator
00440         i = remove_existing_keys.begin();
00441         i != remove_existing_keys.end(); ++i ) {
00442     const ProcType p = to_which_proc( *i );
00443     if ( m_comm_size <= p ) {
00444       // Key is not within one of the span:
00445       ++local_bad_input ;
00446     }
00447     else if ( p != m_comm_rank ) {
00448       ++( count_remove[ p ] );
00449     }
00450   }
00451 
00452   // Iterate over keys being added and keep a count of keys being added
00453   // to other processes
00454   for ( std::vector<KeyType>::const_iterator
00455         i = add_new_keys.begin();
00456         i != add_new_keys.end(); ++i ) {
00457     const ProcType p = to_which_proc( *i );
00458     if ( p == m_comm_size ) {
00459       // Key is not within one of the span:
00460       ++local_bad_input ;
00461     }
00462     else if ( p != m_comm_rank ) {
00463       ++( count_add[ p ] );
00464     }
00465   }
00466 
00467   CommAll all( m_comm );
00468 
00469   // Sizing and add_new_keys bounds checking:
00470 
00471   // For each process, we are going to send the number of removed keys,
00472   // the removed keys, and the added keys. It will be assumed that any keys
00473   // beyond the number of removed keys will be added keys.
00474   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00475     if ( count_remove[p] || count_add[p] ) {
00476       CommBuffer & buf = all.send_buffer( p );
00477       buf.skip<unsigned long>( 1 );
00478       buf.skip<KeyType>( count_remove[p] );
00479       buf.skip<KeyType>( count_add[p] );
00480     }
00481   }
00482 
00483   // Allocate buffers and perform a global OR of error_flag
00484   const bool symmetry_flag = false ;
00485   const bool error_flag = 0 < local_bad_input ;
00486 
00487   bool global_bad_input =
00488     all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag );
00489 
00490   if ( global_bad_input ) {
00491     std::ostringstream msg ;
00492 
00493     if ( 0 < local_bad_input ) {
00494       msg << "stk::parallel::DistributedIndex::update_keys ERROR Given "
00495           << local_bad_input << " of " << add_new_keys.size()
00496           << " add_new_keys outside of any span" ;
00497     }
00498 
00499     throw std::runtime_error( msg.str() );
00500   }
00501 
00502   // Packing:
00503 
00504   // Pack the remove counts for each process
00505   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00506     if ( count_remove[p] || count_add[p] ) {
00507       all.send_buffer( p ).pack<unsigned long>( count_remove[p] );
00508     }
00509   }
00510 
00511   // Pack the removed keys for each process
00512   for ( std::vector<KeyType>::const_iterator
00513         i = remove_existing_keys.begin();
00514         i != remove_existing_keys.end(); ++i ) {
00515     const ProcType p = to_which_proc( *i );
00516     if ( p != m_comm_rank ) {
00517       all.send_buffer( p ).pack<KeyType>( *i );
00518     }
00519   }
00520 
00521   // Pack the added keys for each process
00522   for ( std::vector<KeyType>::const_iterator
00523         i = add_new_keys.begin();
00524         i != add_new_keys.end(); ++i ) {
00525     const ProcType p = to_which_proc( *i );
00526     if ( p != m_comm_rank ) {
00527       all.send_buffer( p ).pack<KeyType>( *i );
00528     }
00529   }
00530 
00531   // Communicate keys
00532   all.communicate();
00533 
00534   //------------------------------
00535   // Remove for local keys
00536 
00537   for ( std::vector<KeyType>::const_iterator
00538         i = remove_existing_keys.begin();
00539         i != remove_existing_keys.end(); ++i ) {
00540     const ProcType p = to_which_proc( *i );
00541     if ( p == m_comm_rank ) {
00542       RemoveKeyProc::mark( m_key_usage , KeyProc( *i , p ) );
00543     }
00544   }
00545 
00546   // Unpack the remove key and find it.
00547   // Set the process to a negative value for subsequent removal.
00548 
00549   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00550     CommBuffer & buf = all.recv_buffer( p );
00551     if ( buf.remaining() ) {
00552       unsigned long remove_count = 0 ;
00553 
00554       KeyProc kp ;
00555 
00556       kp.second = p ;
00557 
00558       buf.unpack<unsigned long>( remove_count );
00559 
00560       for ( ; 0 < remove_count ; --remove_count ) {
00561         buf.unpack<KeyType>( kp.first );
00562 
00563         RemoveKeyProc::mark( m_key_usage , kp );
00564       }
00565     }
00566   }
00567 
00568   RemoveKeyProc::clean( m_key_usage );
00569 
00570   //------------------------------
00571   // Append for local keys
00572 
00573   // Add new_keys going to this proc to local_key_usage
00574   std::vector<KeyProc> local_key_usage ;
00575   local_key_usage.reserve(add_new_keys.size());
00576   for ( std::vector<KeyType>::const_iterator
00577         i = add_new_keys.begin();
00578         i != add_new_keys.end(); ++i ) {
00579 
00580     const ProcType p = to_which_proc( *i );
00581     if ( p == m_comm_rank ) {
00582       local_key_usage.push_back( KeyProc( *i , p ) );
00583     }
00584   }
00585 
00586   // Merge local_key_usage and m_key_usage into temp_key
00587   std::vector<KeyProc> temp_key ;
00588   temp_key.reserve(local_key_usage.size() + m_key_usage.size());
00589   std::sort( local_key_usage.begin(), local_key_usage.end() );
00590   std::merge( m_key_usage.begin(),
00591               m_key_usage.end(),
00592               local_key_usage.begin(),
00593               local_key_usage.end(),
00594               std::back_inserter(temp_key) );
00595 
00596   // Unpack and append for remote keys:
00597   std::vector<KeyProc> remote_key_usage ;
00598 
00599   unpack_with_proc_recv_buffer(all, m_comm_size, remote_key_usage);
00600 
00601   std::sort( remote_key_usage.begin(), remote_key_usage.end() );
00602 
00603   m_key_usage.clear();
00604   m_key_usage.reserve(temp_key.size() + remote_key_usage.size());
00605 
00606   // Merge temp_key and remote_key_usage into m_key_usage, so...
00607   //   m_key_usage = local_key_usage + remote_key_usage + m_key_usage(orig)
00608   std::merge( temp_key.begin(),
00609               temp_key.end(),
00610               remote_key_usage.begin(),
00611               remote_key_usage.end(),
00612               std::back_inserter(m_key_usage) );
00613 
00614   // Unique m_key_usage
00615   m_key_usage.erase(std::unique( m_key_usage.begin(),
00616                                  m_key_usage.end()),
00617                     m_key_usage.end() );
00618 
00619   // Check invariant that m_key_usage is sorted
00620   if (!is_sorted_and_unique(m_key_usage)) {
00621     throw std::runtime_error( "Sorted&unique invariant violated!" );
00622   }
00623 }
00624 
00625 //----------------------------------------------------------------------
00626 //----------------------------------------------------------------------
00627 
00628 void DistributedIndex::generate_new_global_key_upper_bound(
00629   const std::vector<size_t>  & requests ,
00630         std::vector<DistributedIndex::KeyType> & global_key_upper_bound ) const
00631 {
00632   bool bad_request = m_span_count != requests.size();
00633 
00634   std::ostringstream error_msg ;
00635 
00636   error_msg
00637   << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ;
00638 
00639   std::vector<unsigned long>
00640     local_counts( m_span_count + 1 , (unsigned long) 0 ),
00641     global_counts( m_span_count + 1 , (unsigned long) 0 );
00642 
00643   // Count unique keys in each span and add requested keys for
00644   // final total count of keys needed.
00645 
00646   // Append the error check to this communication to avoid
00647   // and extra reduction operation.
00648   local_counts[ m_span_count ] = m_span_count != requests.size();
00649 
00650   if ( m_span_count == requests.size() ) {
00651 
00652     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00653       local_counts[i] = requests[i] ;
00654     }
00655 
00656     std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00657 
00658     for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) {
00659       const KeyType key_span_last = m_key_span[i].second ;
00660       size_t count = 0 ;
00661       while ( j != m_key_usage.end() && j->first <= key_span_last ) {
00662         const KeyType key = j->first ;
00663         while ( j != m_key_usage.end() && key == j->first ) { ++j ; }
00664         ++count ;
00665       }
00666       local_counts[i] += count ;
00667     }
00668   }
00669 
00670 #if defined( STK_HAS_MPI )
00671   if (m_comm_size > 1) {
00672     MPI_Allreduce( (local_counts.empty() ? NULL : & local_counts[0]) , (global_counts.empty() ? NULL : & global_counts[0]) ,
00673                    m_span_count + 1 , MPI_UNSIGNED_LONG ,
00674                    MPI_SUM , m_comm );
00675   }
00676   else {
00677     global_counts = local_counts ;
00678   }
00679 #else
00680   global_counts = local_counts ;
00681 #endif
00682 
00683   bad_request = global_counts[m_span_count] != 0 ;
00684 
00685   if ( bad_request ) {
00686     if ( m_span_count != requests.size() ) {
00687       error_msg << " requests.size() = " << requests.size()
00688                 << " != " << m_span_count << " )" ;
00689     }
00690   }
00691 
00692   if ( ! bad_request ) {
00693     for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00694       const size_t span_available =
00695         ( 1 + m_key_span[i].second - m_key_span[i].first );
00696 
00697       const size_t span_requested = global_counts[i];
00698 
00699       if ( span_available < span_requested ) {
00700         bad_request = true ;
00701         error_msg << " global_sum( (existing+request)[" << i << "] ) = "
00702                   << span_requested
00703                   << " > global_sum( span_available ) = "
00704                   << span_available ;
00705       }
00706     }
00707   }
00708 
00709   if ( bad_request ) {
00710     throw std::runtime_error( error_msg.str() );
00711   }
00712 
00713   // Determine the maximum generated key
00714 
00715   global_key_upper_bound.resize( m_span_count );
00716 
00717   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00718      global_key_upper_bound[i] = m_key_span[i].first + global_counts[i] - 1 ;
00719   }
00720 }
00721 
00722 //--------------------------------------------------------------------
00723 //--------------------------------------------------------------------
00724 
00725 void DistributedIndex::generate_new_keys_local_planning(
00726   const std::vector<DistributedIndex::KeyType>  & key_global_upper_bound ,
00727   const std::vector<size_t>   & requests_local ,
00728         std::vector<long>     & new_request ,
00729         std::vector<KeyType>  & requested_keys ,
00730         std::vector<KeyType>  & contrib_keys ) const
00731 {
00732   new_request.assign( m_span_count , long(0) );
00733 
00734   contrib_keys.clear();
00735 
00736   std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00737 
00738   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00739     // The maximum generated key from any process will
00740     // not exceed this value.
00741     const KeyType key_upper_bound = key_global_upper_bound[i] ;
00742 
00743     const size_t init_size = contrib_keys.size();
00744 
00745     const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00746 
00747     const size_t chunk_rsize = m_comm_rank * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00748 
00749     for ( KeyType key_begin = m_key_span[i].first +
00750                   chunk_rsize ;
00751           key_begin <= key_upper_bound ; key_begin += chunk_inc ) {
00752 
00753       // What is the first key of the chunk
00754       KeyType key_iter = key_begin ;
00755 
00756       // What is the last key belonging to this process' chunk
00757       const KeyType key_last =
00758         std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_upper_bound );
00759 
00760       // Jump into the sorted used key vector to
00761       // the key which may be contributed
00762 
00763       j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() );
00764       // now know:  j == m_key_usage.end() OR
00765       //            key_iter <= j->first
00766 
00767       for ( ; key_iter <= key_last ; ++key_iter ) {
00768         if ( j == m_key_usage.end() || key_iter < j->first ) {
00769           // The current attempt 'key_iter' is not used, contribute it.
00770           contrib_keys.push_back( key_iter );
00771         }
00772         else { // j != m_key_usage.end() && key_iter == j->first
00773           // The current attempt 'key_iter' is already used,
00774           // increment the used-iterator to its next key value.
00775           while ( j != m_key_usage.end() && key_iter == j->first ) {
00776             ++j ;
00777           }
00778         }
00779       }
00780     }
00781 
00782     // Determine which local keys will be contributed,
00783     // keeping what this process could use from the contribution.
00784     // This can reduce the subsequent communication load when
00785     // donating keys to another process.
00786 
00787     const size_t this_contrib = contrib_keys.size() - init_size ;
00788 
00789     // How many keys will this process keep:
00790     const size_t keep = std::min( requests_local[i] , this_contrib );
00791 
00792     // Take the kept keys from the contributed key vector.
00793     requested_keys.insert( requested_keys.end() ,
00794                            contrib_keys.end() - keep ,
00795                            contrib_keys.end() );
00796 
00797     contrib_keys.erase( contrib_keys.end() - keep ,
00798                         contrib_keys.end() );
00799 
00800     // New request is positive for needed keys or negative for donated keys
00801     new_request[i] = requests_local[i] - this_contrib ;
00802   }
00803 }
00804 
00805 //----------------------------------------------------------------------
00806 
00807 void DistributedIndex::generate_new_keys_global_planning(
00808   const std::vector<long>    & new_request ,
00809         std::vector<long>    & my_donations ) const
00810 {
00811   my_donations.assign( m_comm_size * m_span_count , long(0) );
00812 
00813   // Gather the global request plan for receiving and donating keys
00814   // Positive values for receiving, negative values for donating.
00815 
00816   std::vector<long> new_request_global( m_comm_size * m_span_count );
00817 
00818 #if defined( STK_HAS_MPI )
00819 
00820   if (m_comm_size > 1) { // Gather requests into per-process spans
00821 
00822     // There is a possible bug in MPI_Allgather, for Intel 12;  use MPI_Gather instead
00823 #if defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1200)
00824       {
00825         // MPI doesn't do 'const' in its interface, but the send buffer is const
00826         void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) ));
00827         void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ;
00828         for (int root = 0; root < m_comm_size; ++root)
00829           {
00830             MPI_Gather( send_buf , m_span_count , MPI_LONG ,
00831                         recv_buf , m_span_count , MPI_LONG , root, m_comm );
00832           }
00833       }
00834 #else
00835       {
00836         // MPI doesn't do 'const' in its interface, but the send buffer is const
00837         void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) ));
00838         void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ;
00839         MPI_Allgather( send_buf , m_span_count , MPI_LONG ,
00840                        recv_buf , m_span_count , MPI_LONG , m_comm );
00841       }
00842 #endif
00843 
00844   }
00845   else {
00846     new_request_global = new_request ;
00847   }
00848 #else
00849   new_request_global = new_request ;
00850 #endif
00851 
00852   // Now have the global receive & donate plan.
00853   //--------------------------------------------------------------------
00854   // Generate my donate plan from the global receive & donate plan.
00855 
00856   for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00857 
00858     if ( new_request[i] < 0 ) { // This process is donating on this span
00859       long my_total_donate = - new_request[i] ;
00860 
00861       long previous_donate = 0 ;
00862 
00863       // Count what previous processes have donated:
00864       for ( int p = 0 ; p < m_comm_rank ; ++p ) {
00865         const long new_request_p = new_request_global[ p * m_span_count + i ] ;
00866         if ( new_request_p < 0 ) {
00867           previous_donate -= new_request_p ;
00868         }
00869       }
00870 
00871       // What the donation count will be with my donation:
00872       long end_donate = previous_donate + my_total_donate ;
00873 
00874       long previous_receive = 0 ;
00875 
00876       // Determine my donation to other processes (one to many).
00877 
00878       for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) {
00879 
00880         const long new_request_p = new_request_global[ p * m_span_count + i ];
00881 
00882         if ( 0 < new_request_p ) { // Process 'p' receives keys
00883 
00884           // Accumulation of requests:
00885 
00886           previous_receive += new_request_p ;
00887 
00888           if ( previous_donate < previous_receive ) {
00889             // I am donating to process 'p'
00890             const long n = std::min( previous_receive , end_donate )
00891                            - previous_donate ;
00892 
00893             my_donations[ p * m_span_count + i ] = n ;
00894             previous_donate += n ;
00895             my_total_donate -= n ;
00896           }
00897         }
00898       }
00899     }
00900   }
00901 }
00902 
00903 //--------------------------------------------------------------------
00904 
00905 void DistributedIndex::generate_new_keys(
00906   const std::vector<size_t>                 & requests ,
00907         std::vector< std::vector<KeyType> > & requested_keys )
00908 {
00909   //--------------------------------------------------------------------
00910   // Develop the plan:
00911 
00912   std::vector<KeyType> global_key_upper_bound ;
00913   std::vector<long>    new_request ;
00914   std::vector<long>    my_donations ;
00915   std::vector<KeyType> contrib_keys ;
00916   std::vector<KeyType> new_keys ;
00917 
00918   // Verify input and generate global sum of
00919   // current key usage and requested new keys.
00920   // Throw a parallel consistent exception if the input is bad.
00921 
00922   generate_new_global_key_upper_bound( requests , global_key_upper_bound );
00923 
00924   // No exception thrown means all inputs are good and parallel consistent
00925 
00926   // Determine which local keys will be contributed,
00927   // keeping what this process could use from the contribution.
00928   // This can reduce the subsequent communication load when
00929   // donating keys to another process.
00930 
00931   generate_new_keys_local_planning( global_key_upper_bound ,
00932                                     requests ,
00933                                     new_request ,
00934                                     new_keys ,
00935                                     contrib_keys );
00936 
00937   // Determine where this process will be donating 'contrib_keys'
00938   generate_new_keys_global_planning( new_request, my_donations );
00939 
00940   // Due to using an upper bound as opposed to an exact maximum
00941   // the contrib_keys is likely to contain more keys that are needed.
00942   // Remove unneeded keys.
00943 
00944   // Backwards to erase from the end
00945   for ( size_t i = m_span_count ; 0 < i ; ) {
00946     --i ;
00947     size_t count = 0 ;
00948     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00949       count += my_donations[ p * m_span_count + i ];
00950     }
00951     std::vector<KeyType>::iterator j_beg = contrib_keys.begin();
00952     std::vector<KeyType>::iterator j_end = contrib_keys.end();
00953     j_beg = std::lower_bound( j_beg , j_end , m_key_span[i].first );
00954     j_end = std::upper_bound( j_beg , j_end , m_key_span[i].second );
00955     const size_t n = std::distance( j_beg , j_end );
00956     if ( count < n ) {
00957       contrib_keys.erase( j_beg + count , j_end );
00958     }
00959   }
00960 
00961   // Plan is done, communicate the new keys.
00962   //--------------------------------------------------------------------
00963   // Put key this process is keeping into the index.
00964   m_key_usage.reserve(m_key_usage.size() + new_keys.size());
00965   for ( std::vector<KeyType>::iterator i = new_keys.begin();
00966         i != new_keys.end() ; ++i ) {
00967     m_key_usage.push_back( KeyProc( *i , m_comm_rank ) );
00968   }
00969 
00970   //--------------------------------------------------------------------
00971 
00972   CommAll all( m_comm );
00973 
00974   // Sizing
00975 
00976   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00977     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00978       const size_t n_to_p = my_donations[ p * m_span_count + i ];
00979       if ( 0 < n_to_p ) {
00980         all.send_buffer(p).skip<KeyType>( n_to_p );
00981       }
00982     }
00983   }
00984 
00985   all.allocate_buffers( m_comm_size / 4 , false );
00986 
00987   // Packing
00988 
00989   {
00990     size_t n = 0 ;
00991     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00992       for ( int p = 0 ; p < m_comm_size ; ++p ) {
00993         const size_t n_to_p = my_donations[ p * m_span_count + i ];
00994         if ( 0 < n_to_p ) {
00995           all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p );
00996           for ( size_t k = 0 ; k < n_to_p ; ++k , ++n ) {
00997             m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) );
00998           }
00999         }
01000       }
01001     }
01002   }
01003 
01004   std::sort( m_key_usage.begin() , m_key_usage.end() );
01005 
01006   all.communicate();
01007 
01008   // Unpacking
01009   unpack_recv_buffer( all, m_comm_size, new_keys);
01010 
01011   stk::util::radix_sort_unsigned((new_keys.empty() ? NULL : &new_keys[0]), new_keys.size());
01012 
01013   requested_keys.resize( m_span_count );
01014 
01015   {
01016     std::vector<KeyType>::iterator i_beg = new_keys.begin();
01017     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
01018       std::vector<KeyType>::iterator i_end = i_beg + requests[i] ;
01019       requested_keys[i].assign( i_beg , i_end );
01020       i_beg = i_end ;
01021     }
01022   }
01023 }
01024 
01025 //----------------------------------------------------------------------
01026 
01027 } // namespace util
01028 } // namespace stk
01029 
01030 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends