Sierra Toolkit Version of the Day
ParallelIndex.hpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #ifndef stk_util_parallel_ParallelIndex_hpp
00010 #define stk_util_parallel_ParallelIndex_hpp
00011 
00012 #include <utility>
00013 #include <vector>
00014 #include <algorithm>
00015 
00016 #include <stdint.h>
00017 #include <stk_util/parallel/Parallel.hpp>
00018 #include <stk_util/parallel/ParallelComm.hpp>
00019 #include <stk_util/util/PairIter.hpp>
00020 
00021 namespace stk {
00022 namespace util {
00023 
00024 template <class K, class P>
00025 struct ParallelIndexDecomp
00026 {
00027   typedef K Key ;
00028   typedef P Proc ;
00029 
00030   Proc operator()( const Proc proc_size, const Key key ) const {
00031     return ( key >> 8 ) % proc_size ;
00032   };
00033 };
00034 
00035 
00045 template <class K = uint64_t, class P = unsigned, class D = ParallelIndexDecomp<K, P> >
00046 class ParallelIndex {
00047 public:
00048   typedef K Key ;
00049   typedef P Proc ;
00050   typedef D Decomp ;
00051   typedef std::pair<Key, Proc> KeyProc ;
00052 
00053 #ifndef DOXYGEN_COMPILE
00054   struct LessKeyProc {
00055     bool operator()( const KeyProc & lhs, const KeyProc & rhs ) const {
00056       return lhs < rhs ;
00057     }
00058 
00059     bool operator()( const KeyProc & lhs, const Key rhs ) const {
00060       return lhs.first < rhs ;
00061     }
00062   };
00063 
00064   struct EqualKeyProc {
00065     bool operator()( const KeyProc & lhs, const KeyProc & rhs ) const {
00066       return lhs == rhs ;
00067     }
00068   };
00069 #endif /* DOXYGEN_COMPILE */
00070 
00071 
00073   ParallelIndex( ParallelMachine comm, 
00074                  const std::vector<Key> & local )
00075     : m_comm( comm ), 
00076       m_key_proc(), 
00077       m_decomp()
00078   {
00079     const unsigned p_size = parallel_machine_size( comm );
00080 
00081     CommAll all( comm );
00082 
00083     pack_map( all, local );
00084 
00085     all.allocate_buffers( p_size / 4, false );
00086   
00087     pack_map( all, local );
00088 
00089     all.communicate();
00090 
00091     unpack_map( all, m_key_proc );
00092 
00093     sort_unique( m_key_proc );
00094   }
00095 
00096   ~ParallelIndex()
00097   {}
00098   
00099 
00103   void query(std::vector<KeyProc> & global ) const
00104   {
00105     const unsigned p_size = parallel_machine_size( m_comm );
00106 
00107     CommAll all( m_comm );
00108 
00109     pack_query( all, m_key_proc );
00110 
00111     all.allocate_buffers( p_size / 4, false );
00112 
00113     pack_query( all, m_key_proc );
00114 
00115     all.communicate();
00116 
00117     unpack_query( all, global );
00118 
00119     sort_unique( global );
00120   }
00121 
00122 
00127   void query(const std::vector<Key> & local, 
00128              std::vector<KeyProc> & global ) const
00129   {
00130     const unsigned p_size = parallel_machine_size( m_comm );
00131 
00132     std::vector<KeyProc> tmp ;
00133 
00134     {
00135       CommAll all( m_comm );
00136 
00137       pack_map( all, local );
00138 
00139       all.allocate_buffers( p_size / 4, false );
00140 
00141       pack_map( all, local );
00142 
00143       all.communicate();
00144 
00145       unpack_map( all, tmp ); // { ( key, querying_processor ) }
00146 
00147       sort_unique( tmp );
00148     }
00149 
00150     {
00151       CommAll all( m_comm );
00152 
00153       pack_query( all, m_key_proc, tmp );
00154 
00155       all.allocate_buffers( p_size / 4, false );
00156 
00157       pack_query( all, m_key_proc, tmp );
00158 
00159       all.communicate();
00160 
00161       unpack_query( all, global );
00162 
00163       sort_unique( global );
00164     }
00165   }
00166 
00167 private:
00168   void sort_unique( std::vector<KeyProc> & key_proc ) const
00169   {
00170     typename std::vector<KeyProc>::iterator i = key_proc.begin();
00171     typename  std::vector<KeyProc>::iterator j = key_proc.end();
00172 
00173     std::sort( i, j, LessKeyProc() );
00174     i = std::unique( i, j, EqualKeyProc() );
00175     key_proc.erase( i, j );
00176   }
00177 
00178   void pack_map( CommAll & all, const std::vector<Key> & local ) const
00179   {
00180     const unsigned p_size = all.parallel_size();
00181 
00182     typename std::vector<Key>::const_iterator i ;
00183 
00184     for ( i = local.begin() ; i != local.end() ; ++i ) {
00185       const Key value = *i ;
00186       const unsigned proc = m_decomp( p_size, value );
00187       CommBuffer & buf = all.send_buffer(proc);
00188       buf.pack<Key>( value );
00189     }
00190   }
00191 
00192   void unpack_map( CommAll & all, std::vector< KeyProc > & key_proc ) const
00193   {
00194     const unsigned p_size = all.parallel_size();
00195 
00196     unsigned count = 0 ;
00197     for ( unsigned p = 0 ; p < p_size ; ++p ) {
00198       count += all.recv_buffer( p ).capacity() / sizeof(Key);
00199     }
00200 
00201     key_proc.clear();
00202     key_proc.reserve( count );
00203 
00204     KeyProc value ;
00205 
00206     for ( unsigned p = 0 ; p < p_size ; ++p ) {
00207       CommBuffer & buf = all.recv_buffer( p );
00208       value.second = p ;
00209       while ( buf.remaining() ) {
00210         buf.unpack<Key>( value.first );
00211         key_proc.push_back( value );
00212       }
00213     }
00214   }
00215 
00216   void pack_query( CommAll & all, const std::vector< KeyProc > & key_proc ) const
00217   {
00218     KeyProc value ;
00219 
00220     typename std::vector< KeyProc >::const_iterator i ;
00221 
00222     for ( i = key_proc.begin() ; i != key_proc.end() ; ) {
00223       value.first = i->first ;
00224 
00225       const typename std::vector< KeyProc >::const_iterator i_beg = i ;
00226 
00227       for ( ; i != key_proc.end() && value.first == i->first ; ++i )
00228         ;
00229 
00230       const typename std::vector< KeyProc >::const_iterator i_end = i ;
00231     
00232       for ( i = i_beg ; i != i_end ; ++i ) {
00233         CommBuffer & buf = all.send_buffer( i->second );
00234 
00235         typename std::vector< KeyProc >::const_iterator j ;
00236 
00237         for ( j = i_beg ; j != i_end ; ++j ) {
00238           if ( j != i ) {
00239             value.second = j->second ;
00240             buf.pack<KeyProc>( value );
00241           }
00242         }
00243       }
00244     }
00245   }
00246 
00247   void pack_query( CommAll & all, 
00248                    const std::vector< KeyProc > & key_proc_map, 
00249                    const std::vector< KeyProc > & query ) const
00250   {
00251     KeyProc value ;
00252 
00253     for (typename std::vector< KeyProc >::const_iterator i = query.begin() ; i != query.end() ; ) {
00254       value.first = i->first ;
00255 
00256       typename std::vector< KeyProc >::const_iterator key_begin = std::lower_bound( key_proc_map.begin(), key_proc_map.end(), value.first, LessKeyProc() );
00257 
00258       typename std::vector< KeyProc >::const_iterator key_end = key_begin;
00259       while ( key_end != key_proc_map.end() && key_end->first == value.first)
00260         ++key_end;
00261 
00262       for ( ; i != query.end() && value.first == i->first ; ++i ) {
00263         CommBuffer & buf = all.send_buffer( i->second );
00264 
00265         for ( typename std::vector< KeyProc >::const_iterator j = key_begin ; j != key_end ; ++j ) {
00266           value.second = j->second ;
00267           buf.pack<KeyProc>( value );
00268         }
00269       }
00270     }
00271   }
00272 
00273   void unpack_query( CommAll & all, std::vector< KeyProc > & key_proc ) const
00274   {
00275     const unsigned p_size = all.parallel_size();
00276 
00277     KeyProc entry ;
00278 
00279     for ( unsigned p = 0 ; p < p_size ; ++p ) {
00280       CommBuffer & buf = all.recv_buffer( p );
00281       while ( buf.remaining() ) {
00282         buf.unpack<KeyProc>( entry );
00283         key_proc.push_back( entry );
00284       }
00285     }
00286   }
00287 
00288 private:
00289   ParallelIndex();
00290   ParallelIndex( const ParallelIndex & );
00291   ParallelIndex & operator = ( const ParallelIndex & );
00292 
00293 private:
00294   ParallelMachine               m_comm ;
00295   std::vector<KeyProc>          m_key_proc ;
00296   Decomp                        m_decomp;
00297 };
00298 
00299 } // namespace util
00300 } // namespace stk
00301 
00302 //----------------------------------------------------------------------
00303 
00304 #endif
00305 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines