DistributedIndex.cpp

00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 
00010 #include <stdexcept>
00011 #include <sstream>
00012 #include <algorithm>
00013 #include <limits>
00014 #include <stdint.h>
00015 #include <stk_util/parallel/ParallelComm.hpp>
00016 #include <stk_util/parallel/DistributedIndex.hpp>
00017 
00018 namespace stk {
00019 namespace parallel {
00020 
00021 //----------------------------------------------------------------------
00022 
00023 namespace {
00024 
00025 struct KeyProcLess {
00026 
00027   bool operator()( const DistributedIndex::KeyProc & lhs ,
00028                    const DistributedIndex::KeyProc & rhs ) const
00029   {
00030     return lhs.first != rhs.first ? lhs.first  < rhs.first
00031                                   : lhs.second < rhs.second ;
00032   }
00033 
00034   bool operator()( const DistributedIndex::KeyProc & lhs ,
00035                    const DistributedIndex::KeyType & rhs ) const
00036   { return lhs.first < rhs ; }
00037 
00038 };
00039 
00040 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage )
00041 {
00042   std::vector<DistributedIndex::KeyProc>::iterator
00043     i = key_usage.begin() ,
00044     j = key_usage.end() ;
00045 
00046   std::sort( i , j , KeyProcLess() );
00047 
00048   i = std::unique( i , j );
00049 
00050   key_usage.erase( i , j );
00051 }
00052 
00053 }
00054 
00055 //----------------------------------------------------------------------
00056 
00057 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 }; 
00058 
00059 enum { DISTRIBUTED_INDEX_CHUNK_SIZE =
00060          size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS };
00061 
00062 
00063 inline
00064 DistributedIndex::ProcType
00065 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const
00066 { return ( key >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ; }
00067 
00068 //----------------------------------------------------------------------
00069 
00070 DistributedIndex::~DistributedIndex() {}
00071 
00072 DistributedIndex::DistributedIndex (
00073   ParallelMachine comm ,
00074   const std::vector<KeySpan> & partition_bounds )
00075   : m_comm( comm ),
00076     m_comm_rank( parallel_machine_rank( comm ) ),
00077     m_comm_size( parallel_machine_size( comm ) ),
00078     m_span_count(0),
00079     m_key_span(),
00080     m_chunk_first(),
00081     m_key_usage()
00082 {
00083   unsigned info[2] ;
00084   info[0] = partition_bounds.size();
00085   info[1] = 0 ;
00086 
00087   // Check each span for validity
00088 
00089   for ( std::vector<KeySpan>::const_iterator
00090         i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00091     if ( i->second < i->first ||
00092          ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) {
00093       info[1] = 1 ;
00094     }
00095   }
00096 
00097 #if defined( STK_HAS_MPI )
00098   MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm );
00099 
00100   if ( 0 < info[0] ) {
00101     m_key_span.resize( info[0] );
00102     if ( 0 == parallel_machine_rank( comm ) ) {
00103       m_key_span = partition_bounds ;
00104     }
00105     MPI_Bcast( & m_key_span[0], info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm );
00106   }
00107 #else
00108   m_key_span = partition_bounds ;
00109 #endif
00110 
00111   if ( info[1] ) {
00112     std::ostringstream msg ;
00113     msg << "sierra::parallel::DistributedIndex ctor( comm , " ;
00114 
00115     for ( std::vector<KeySpan>::const_iterator
00116           i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00117       msg << " ( min = " << i->first << " , max = " << i->second << " )" ;
00118     }
00119     msg << " ) contains invalid span of keys" ;
00120     throw std::runtime_error( msg.str() );
00121   }
00122 
00123   m_span_count = info[0] ;
00124 
00125   if ( 0 == m_span_count ) {
00126     m_key_span.push_back(
00127       KeySpan( std::numeric_limits<KeyType>::min(),
00128                std::numeric_limits<KeyType>::max() ) );
00129     m_span_count = 1 ;
00130   }
00131 
00132   m_chunk_first.resize( m_span_count );
00133 
00134   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00135     const KeyType key_span_first = m_key_span[i].first ;
00136     size_t chunk_iter = 0 ;
00137 
00138     while ( m_comm_rank !=
00139             to_which_proc( key_span_first +
00140                            chunk_iter * DISTRIBUTED_INDEX_CHUNK_SIZE ) ) {
00141       ++chunk_iter ;
00142     }
00143     m_chunk_first[ i ] = chunk_iter ;
00144   }
00145 }
00146 
00147 //----------------------------------------------------------------------
00148 //----------------------------------------------------------------------
00149 
00150 namespace {
00151 
00152 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage ,
00153                  const std::vector<DistributedIndex::KeyProc> & request ,
00154                  CommAll & all )
00155 {
00156   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00157 
00158   for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00159         k =  request.begin() ;
00160         k != request.end() &&
00161         i != key_usage.end() ; ++k ) {
00162 
00163     for ( ; i != key_usage.end() && i->first < k->first ; ++i );
00164 
00165     for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00166           j != key_usage.end() && j->first == k->first ; ++j ) {
00167       all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j );
00168     }
00169   }
00170 }
00171 
00172 }
00173 
00174 void DistributedIndex::query(
00175   const std::vector<DistributedIndex::KeyProc> & request ,
00176         std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const
00177 {
00178   sharing_of_keys.clear();
00179 
00180   CommAll all( m_comm );
00181 
00182   query_pack( m_key_usage , request , all ); // Sizing
00183 
00184   all.allocate_buffers( m_comm_size / 4 , false );
00185 
00186   query_pack( m_key_usage , request , all ); // Packing
00187 
00188   all.communicate();
00189 
00190   for ( ProcType p = 0 ; p < m_comm_size ; ++p ) {
00191     CommBuffer & buf = all.recv_buffer( p );
00192     while ( buf.remaining() ) {
00193       KeyProc kp ;
00194       buf.unpack( kp );
00195       sharing_of_keys.push_back( kp );
00196     }
00197   }
00198 
00199   std::sort( sharing_of_keys.begin() , sharing_of_keys.end() );
00200 }
00201 
00202 
00203 void DistributedIndex::query(
00204   std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const
00205 {
00206   query( m_key_usage , sharing_of_local_keys );
00207 }
00208 
00209 void DistributedIndex::query(
00210   const std::vector<DistributedIndex::KeyType> & keys ,
00211         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00212 {
00213   std::vector<KeyProc> request ;
00214 
00215   {
00216     CommAll all( m_comm );
00217 
00218     for ( std::vector<KeyType>::const_iterator
00219           k = keys.begin() ; k != keys.end() ; ++k ) {
00220       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00221     }
00222 
00223     all.allocate_buffers( m_comm_size / 4 , false );
00224 
00225     for ( std::vector<KeyType>::const_iterator
00226           k = keys.begin() ; k != keys.end() ; ++k ) {
00227       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00228     }
00229 
00230     all.communicate();
00231 
00232     for ( ProcType p = 0 ; p < m_comm_size ; ++p ) {
00233       CommBuffer & buf = all.recv_buffer( p );
00234       KeyProc kp ;
00235       kp.second = p ;
00236       while ( buf.remaining() ) {
00237         buf.unpack<KeyType>( kp.first );
00238         request.push_back( kp );
00239       }
00240     }
00241   }
00242 
00243   sort_unique( request );
00244 
00245   query( request , sharing_keys );
00246 }
00247 
00248 //----------------------------------------------------------------------
00249 //----------------------------------------------------------------------
00250 
00251 namespace {
00252 
00253 struct RemoveKeyProc {
00254 
00255   bool operator()( const DistributedIndex::KeyProc & kp ) const
00256     { return kp.second < 0 ; }
00257 
00258   static void mark( std::vector<DistributedIndex::KeyProc> & key_usage ,
00259                     const DistributedIndex::KeyProc & kp )
00260   {
00261     std::vector<DistributedIndex::KeyProc>::iterator
00262       i = std::lower_bound( key_usage.begin(),
00263                             key_usage.end(), kp.first , KeyProcLess() );
00264     while ( i != key_usage.end() && kp != *i ) { ++i ; }
00265     if ( i != key_usage.end() && kp == *i ) {
00266       i->second = -1 ;
00267     }
00268   }
00269 
00270   static void clean( std::vector<DistributedIndex::KeyProc> & key_usage )
00271   {
00272     std::vector<DistributedIndex::KeyProc>::iterator end =
00273       std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() );
00274     key_usage.erase( end , key_usage.end() );
00275   }
00276 };
00277 
00278 }
00279 
00280 void DistributedIndex::update_keys(
00281   const std::vector<DistributedIndex::KeyType> & add_new_keys ,
00282   const std::vector<DistributedIndex::KeyType> & remove_existing_keys )
00283 {
00284   std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 );
00285   std::vector<unsigned long> count_add(    m_comm_size , (unsigned long)0 );
00286 
00287   for ( std::vector<KeyType>::const_iterator
00288         i = remove_existing_keys.begin();
00289         i != remove_existing_keys.end(); ++i ) {
00290     const ProcType p = to_which_proc( *i );
00291     if ( p != m_comm_rank ) {
00292       ++( count_remove[ to_which_proc( *i ) ] );
00293     }
00294   }
00295 
00296   size_t local_bad_input = 0 ;
00297 
00298   for ( std::vector<KeyType>::const_iterator
00299         i = add_new_keys.begin();
00300         i != add_new_keys.end(); ++i ) {
00301 
00302     // Input key must be within one of the spans:
00303 
00304     std::vector<KeySpan>::const_iterator j  = m_key_span.begin();
00305     std::vector<KeySpan>::const_iterator je = m_key_span.end();
00306     for ( ; j != je ; ++j ) {
00307       if ( j->first <= *i && *i <= j->second ) { break ; }
00308     }
00309     if ( j == je ) { ++local_bad_input ; }
00310 
00311     // Count
00312 
00313     const ProcType p = to_which_proc( *i );
00314     if ( p != m_comm_rank ) {
00315       ++( count_add[ to_which_proc( *i ) ] );
00316     }
00317   }
00318 
00319   if ( 0 < local_bad_input ) {
00320     // If this process knows it will throw
00321     // then don't bother communicating the add and remove requests.
00322     count_remove.clear();
00323     count_add.clear();
00324   }
00325 
00326   CommAll all( m_comm );
00327 
00328   // Sizing and add_new_keys bounds checking:
00329 
00330   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00331     if ( count_remove[p] || count_add[p] ) {
00332       CommBuffer & buf = all.send_buffer( p );
00333       buf.skip<unsigned long>( 1 );
00334       buf.skip<KeyType>( count_remove[p] );
00335       buf.skip<KeyType>( count_add[p] );
00336     }
00337   }
00338 
00339   // Allocate buffers and perform a global
00340   const bool symmetry_flag = false ;
00341   const bool error_flag = 0 < local_bad_input ;
00342 
00343   bool global_bad_input =
00344     all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag );
00345 
00346   if ( global_bad_input ) {
00347     std::ostringstream msg ;
00348 
00349     if ( 0 < local_bad_input ) {
00350       msg << "stk::parallel::DistributedIndex::update_keys ERROR Given "
00351           << local_bad_input << " of " << add_new_keys.size()
00352           << " add_new_keys outside of any span" ;
00353     }
00354 
00355     throw std::runtime_error( msg.str() );
00356   }
00357 
00358   // Packing:
00359 
00360   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00361     if ( count_remove[p] || count_add[p] ) {
00362       all.send_buffer( p ).pack<unsigned long>( count_remove[p] );
00363     }
00364   }
00365 
00366   for ( std::vector<KeyType>::const_iterator
00367         i = remove_existing_keys.begin();
00368         i != remove_existing_keys.end(); ++i ) {
00369     const ProcType p = to_which_proc( *i );
00370     if ( p != m_comm_rank ) {
00371       all.send_buffer( p ).pack<KeyType>( *i );
00372     }
00373   }
00374 
00375   for ( std::vector<KeyType>::const_iterator
00376         i = add_new_keys.begin();
00377         i != add_new_keys.end(); ++i ) {
00378     const ProcType p = to_which_proc( *i );
00379     if ( p != m_comm_rank ) {
00380       all.send_buffer( p ).pack<KeyType>( *i );
00381     }
00382   }
00383 
00384   all.communicate();
00385 
00386   //------------------------------
00387   // Remove for local keys
00388 
00389   for ( std::vector<KeyType>::const_iterator
00390         i = remove_existing_keys.begin();
00391         i != remove_existing_keys.end(); ++i ) {
00392     const ProcType p = to_which_proc( *i );
00393     if ( p == m_comm_rank ) {
00394       KeyProc kp( *i , p );
00395       RemoveKeyProc::mark( m_key_usage , kp );
00396     }
00397   }
00398 
00399   // Unpack the remove key and find it.
00400   // Set the process to a negative value for subsequent removal.
00401 
00402   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00403     CommBuffer & buf = all.recv_buffer( p );
00404     if ( buf.remaining() ) {
00405       unsigned long remove_count = 0 ;
00406 
00407       KeyProc kp ;
00408 
00409       kp.second = p ;
00410 
00411       buf.unpack<unsigned long>( remove_count );
00412 
00413       for ( ; 0 < remove_count ; --remove_count ) {
00414         buf.unpack<KeyType>( kp.first );
00415 
00416         RemoveKeyProc::mark( m_key_usage , kp );
00417       }
00418     }
00419   }
00420 
00421   RemoveKeyProc::clean( m_key_usage );
00422 
00423   //------------------------------
00424   // Append for local keys
00425 
00426   for ( std::vector<KeyType>::const_iterator
00427         i = add_new_keys.begin();
00428         i != add_new_keys.end(); ++i ) {
00429 
00430     const ProcType p = to_which_proc( *i );
00431     if ( p == m_comm_rank ) {
00432       KeyProc kp( *i , p );
00433       m_key_usage.push_back( kp );
00434     }
00435   }
00436 
00437   // Unpack and append for remote keys:
00438   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00439     CommBuffer & buf = all.recv_buffer( p );
00440 
00441     KeyProc kp ;
00442 
00443     kp.second = p ;
00444 
00445     while ( buf.remaining() ) {
00446       buf.unpack<KeyType>( kp.first );
00447       m_key_usage.push_back( kp );
00448     }
00449   }
00450 
00451   sort_unique( m_key_usage );
00452   //------------------------------
00453 }
00454 
00455 //----------------------------------------------------------------------
00456 //----------------------------------------------------------------------
00457 // For efficient communication merging three reductions:
00458 // 1) current global counts of keys used          [ m_span_count ]
00459 // 2) current global counts of new keys requested [ m_span_count ]
00460 // 3) an input error flag.
00461 
00462 void DistributedIndex::generate_new_keys_global_counts(
00463   const std::vector<size_t>  & requests ,
00464         std::vector<size_t>  & requests_global_sum ,
00465         std::vector<size_t>  & existing_global_sum ) const
00466 {
00467   bool bad_request = m_span_count != requests.size();
00468 
00469   std::ostringstream error_msg ;
00470 
00471   error_msg
00472   << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ;
00473 
00474   std::vector<unsigned long>
00475     local_counts( 2 * m_span_count + 1 , (unsigned long) 0 ),
00476     global_counts( 2 * m_span_count + 1 , (unsigned long) 0 );
00477 
00478   // Count unique keys in each span
00479 
00480   {
00481     std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00482 
00483     for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) {
00484       const KeyType key_span_last = m_key_span[i].second ;
00485       size_t count = 0 ;
00486       while ( j != m_key_usage.end() && j->first <= key_span_last ) {
00487         const KeyType key = j->first ;
00488         while ( j != m_key_usage.end() && key == j->first ) { ++j ; }
00489         ++count ;
00490       }
00491       local_counts[i] = count ;
00492     }
00493 
00494     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00495       local_counts[i + m_span_count] = i < requests.size() ? requests[i] : 0 ;
00496     }
00497   }
00498 
00499   // Append the error check to this communication to avoid
00500   // and extra reduction operation.
00501   local_counts[ 2 * m_span_count ] = m_span_count != requests.size();
00502 
00503 #if defined( STK_HAS_MPI )
00504   MPI_Allreduce( & local_counts[0] , & global_counts[0] ,
00505                  2 * m_span_count + 1 , MPI_UNSIGNED_LONG ,
00506                  MPI_SUM , m_comm );
00507 #else
00508   global_counts = local_counts ;
00509 #endif
00510 
00511   bad_request = global_counts[2*m_span_count] != 0 ;
00512 
00513   if ( bad_request ) {
00514     if ( m_span_count != requests.size() ) {
00515       error_msg << " requests.size() = " << requests.size()
00516                 << " != " << m_span_count << " )" ;
00517     }
00518   }
00519 
00520   if ( ! bad_request ) {
00521     for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00522       const size_t span_available =
00523         ( 1 + m_key_span[i].second - m_key_span[i].first ) - global_counts[i] ;
00524 
00525       const size_t span_requested = global_counts[ i + m_span_count ];
00526 
00527       if ( span_available < span_requested ) {
00528         bad_request = true ;
00529         error_msg << " global_sum( request[" << i << "] ) = "
00530                   << span_requested
00531                   << " > global_sum( span_available ) = "
00532                   << span_available ;
00533       }
00534     }
00535   }
00536 
00537   if ( bad_request ) {
00538     throw std::runtime_error( error_msg.str() );
00539   }
00540 
00541   existing_global_sum.assign( global_counts.begin() ,
00542                               global_counts.begin() + m_span_count );
00543 
00544   requests_global_sum.assign( global_counts.begin() + m_span_count ,
00545                               global_counts.begin() + m_span_count * 2 );
00546 }
00547 
00548 //--------------------------------------------------------------------
00549 //--------------------------------------------------------------------
00550 
00551 void DistributedIndex::generate_new_keys_local_planning(
00552   const std::vector<size_t>   & existing_global_sum ,
00553   const std::vector<size_t>   & requests_global_sum ,
00554   const std::vector<size_t>   & requests_local ,
00555         std::vector<long>     & new_request ,
00556         std::vector<KeyType>  & requested_keys ,
00557         std::vector<KeyType>  & contrib_keys ) const
00558 {
00559   new_request.assign( m_span_count , long(0) );
00560 
00561   contrib_keys.clear();
00562 
00563   std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00564 
00565   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00566     const size_t final_key_count =
00567       existing_global_sum[i] + requests_global_sum[ i ];
00568 
00569     const KeyType key_span_first = m_key_span[i].first ;
00570     const KeyType key_global_max = key_span_first + final_key_count - 1 ;
00571 
00572     const size_t init_size = contrib_keys.size();
00573 
00574     const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00575 
00576     for ( KeyType key_begin = key_span_first +
00577                   m_chunk_first[i] * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00578           key_begin <= key_global_max ; key_begin += chunk_inc ) {
00579 
00580       // What is the first key of the chunk
00581       KeyType key_iter = key_begin ;
00582 
00583       // What is the last key belonging to this process' chunk
00584       const KeyType key_last =
00585         std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_global_max );
00586 
00587       // Jump into the sorted used key vector to
00588       // the key which may be contributed
00589 
00590       j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() );
00591       // now know:  j == m_key_usage.end() OR
00592       //            key_iter <= j->first
00593 
00594       for ( ; key_iter <= key_last ; ++key_iter ) {
00595         if ( j == m_key_usage.end() || key_iter < j->first ) {
00596           // The current attempt 'key_iter' is not used, contribute it.
00597           contrib_keys.push_back( key_iter );
00598         }
00599         else { // j != m_key_usage.end() && key_iter == j->first
00600           // The current attempt 'key_iter' is already used,
00601           // increment the used-iterator to its next key value.
00602           while ( j != m_key_usage.end() && key_iter == j->first ) {
00603             ++j ;
00604           }
00605         }
00606       }
00607     }
00608 
00609     // Determine which local keys will be contributed,
00610     // keeping what this process could use from the contribution.
00611     // This can reduce the subsequent communication load when
00612     // donating keys to another process.
00613 
00614     const size_t this_contrib = contrib_keys.size() - init_size ;
00615 
00616     // How many keys will this process keep:
00617     const size_t keep = std::min( requests_local[i] , this_contrib );
00618 
00619     // Take the kept keys from the contributed key vector.
00620     requested_keys.insert( requested_keys.end() ,
00621                            contrib_keys.end() - keep ,
00622                            contrib_keys.end() );
00623 
00624     contrib_keys.erase( contrib_keys.end() - keep ,
00625                         contrib_keys.end() );
00626 
00627     // New request is positive for needed keys or negative for donated keys
00628     new_request[i] = requests_local[i] - this_contrib ;
00629   }
00630 }
00631 
00632 //----------------------------------------------------------------------
00633 
00634 void DistributedIndex::generate_new_keys_global_planning(
00635   const std::vector<KeyType> & contrib_keys ,
00636   const std::vector<long>    & new_request ,
00637         std::vector<long>    & my_donations ) const
00638 {
00639   my_donations.assign( m_comm_size * m_span_count , long(0) );
00640 
00641   // Gather the global request plan for receiving and donating keys
00642   // Positive values for receiving, negative values for donating.
00643 
00644   std::vector<long> new_request_global( m_comm_size * m_span_count );
00645 
00646 #if defined( STK_HAS_MPI )
00647 
00648   { // Gather requests into per-process spans
00649     // MPI doesn't do 'const' in its interface, but the send buffer is const
00650     void * send_buf = const_cast<void*>( (void *)( & new_request[0] ));
00651     void * recv_buf = & new_request_global[0] ;
00652     MPI_Allgather( send_buf , m_span_count , MPI_LONG ,
00653                    recv_buf , m_span_count , MPI_LONG , m_comm );
00654   }
00655 #else
00656   new_request_global = new_request ;
00657 #endif
00658 
00659   // Now have the global receive & donate plan.
00660   //--------------------------------------------------------------------
00661   // Generate my donate plan from the global receive & donate plan.
00662 
00663   std::vector<KeyType>::const_iterator ikey = contrib_keys.begin();
00664 
00665   for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00666 
00667     if ( new_request[i] < 0 ) { // This process is donating on this span
00668       long my_total_donate = - new_request[i] ;
00669 
00670       long previous_donate = 0 ;
00671 
00672       // Count what previous processes have donated:
00673       for ( int p = 0 ; p < m_comm_rank ; ++p ) {
00674         const long new_request_p = new_request_global[ p * m_span_count + i ] ;
00675         if ( new_request_p < 0 ) {
00676           previous_donate -= new_request_p ;
00677         }
00678       }
00679 
00680       // What the donation count will be with my donation:
00681       long end_donate = previous_donate + my_total_donate ;
00682 
00683       long previous_receive = 0 ;
00684 
00685       // Determine my donation to other processes (one to many).
00686 
00687       for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) {
00688 
00689         const long new_request_p = new_request_global[ p * m_span_count + i ];
00690 
00691         if ( 0 < new_request_p ) { // Process 'p' receives keys
00692 
00693           // Accumulation of requests:
00694 
00695           previous_receive += new_request_p ;
00696 
00697           if ( previous_donate < previous_receive ) {
00698             // I am donating to process 'p'
00699             const long n = std::min( previous_receive , end_donate )
00700                            - previous_donate ;
00701 
00702             my_donations[ p * m_span_count + i ] = n ;
00703             previous_donate += n ;
00704             my_total_donate -= n ;
00705           }
00706         }
00707       }
00708     }
00709   }
00710 }
00711 
00712 //--------------------------------------------------------------------
00713 
00714 void DistributedIndex::generate_new_keys(
00715   const std::vector<size_t>                 & requests ,
00716         std::vector< std::vector<KeyType> > & requested_keys )
00717 {
00718   //--------------------------------------------------------------------
00719   // Develop the plan:
00720 
00721   std::vector<size_t>  requests_global_sum ;
00722   std::vector<size_t>  existing_global_sum ;
00723   std::vector<long>    new_request ;
00724   std::vector<long>    my_donations ;
00725   std::vector<KeyType> contrib_keys ;
00726   std::vector<KeyType> new_keys ;
00727 
00728   // Verify input and generate global sum of
00729   // current key usage and requested new keys.
00730   // Throw a parallel consistent exception if the input is bad.
00731 
00732   generate_new_keys_global_counts( requests ,
00733                                    requests_global_sum ,
00734                                    existing_global_sum );
00735 
00736   //  No exception thrown means all inputs are good and parallel consistent
00737 
00738   // Determine which local keys will be contributed,
00739   // keeping what this process could use from the contribution.
00740   // This can reduce the subsequent communication load when
00741   // donating keys to another process.
00742 
00743   generate_new_keys_local_planning( existing_global_sum ,
00744                                     requests_global_sum ,
00745                                     requests ,
00746                                     new_request ,
00747                                     new_keys ,
00748                                     contrib_keys );
00749 
00750   // Determine where this process will be donating 'contrib_keys'
00751   generate_new_keys_global_planning( contrib_keys, new_request, my_donations );
00752 
00753   // Plan is done, communicate the new keys.
00754   //--------------------------------------------------------------------
00755   // Update counts by the number of keys this process is contributing.
00756   // Both kept and donated keys.
00757 
00758   // Add kept keys to this process' key index.
00759   // The key index is no longer properly ordered.
00760   // It must be sorted before completion,
00761   // but not until remotely donated keys are added.
00762 
00763   for ( std::vector<KeyType>::iterator
00764         ik  = new_keys.begin() ;
00765         ik != new_keys.end() ; ++ik ) {
00766     m_key_usage.push_back( KeyProc( *ik , m_comm_rank ) );
00767   }
00768 
00769   {
00770     size_t n = 0 ;
00771     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00772       for ( int p = 0 ; p < m_comm_size ; ++p ) {
00773         const size_t n_to_p = my_donations[ p * m_span_count + i ];
00774         if ( n_to_p ) {
00775           for ( size_t ik = 0; ik < n_to_p ; ++ik , ++n ) {
00776             m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) );
00777           }
00778         }
00779       }
00780     }
00781   }
00782 
00783   std::sort( m_key_usage.begin() , m_key_usage.end() , KeyProcLess() );
00784 
00785   //--------------------------------------------------------------------
00786 
00787   CommAll all( m_comm );
00788 
00789   // Sizing
00790 
00791   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00792     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00793       const size_t n_to_p = my_donations[ p * m_span_count + i ];
00794       if ( 0 < n_to_p ) {
00795         all.send_buffer(p).skip<KeyType>( n_to_p );
00796       }
00797     }
00798   }
00799 
00800   all.allocate_buffers( m_comm_size / 4 , false );
00801 
00802   // Packing
00803 
00804   {
00805     size_t n = 0 ;
00806     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00807       for ( int p = 0 ; p < m_comm_size ; ++p ) {
00808         const size_t n_to_p = my_donations[ p * m_span_count + i ];
00809         if ( 0 < n_to_p ) {
00810           all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p );
00811           n += n_to_p ;
00812         }
00813       }
00814     }
00815   }
00816 
00817   all.communicate();
00818 
00819   // Unpacking
00820 
00821   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00822     CommBuffer & buf = all.recv_buffer( p );
00823     while ( buf.remaining() ) {
00824       KeyType key ;
00825       buf.unpack<KeyType>( key );
00826       new_keys.push_back( key );
00827     }
00828   }
00829 
00830   std::sort( new_keys.begin() , new_keys.end() );
00831 
00832   requested_keys.resize( m_span_count );
00833 
00834   size_t n = 0 ;
00835   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00836     requested_keys[i].assign( new_keys.begin() + n ,
00837                               new_keys.begin() + n + requests[i] );
00838     n += requests[i] ;
00839   }
00840 
00841   return ;
00842 }
00843 
00844 //----------------------------------------------------------------------
00845 
00846 } // namespace util
00847 } // namespace stk
00848 
00849 

Generated on Tue Jul 13 09:27:32 2010 for Sierra Toolkit by  doxygen 1.4.7