Sierra Toolkit Version of the Day
mpi_filebuf.cpp
00001 
00010 #include <cstdlib>
00011 #include <stk_util/parallel/mpi_filebuf.hpp>
00012 #include <assert.h>
00013 
00014 enum { buffer_default_length = 4096 };
00015 enum { buffer_putback_length =   16 };
00016 
00017 /*--------------------------------------------------------------------*/
00018 
00019 mpi_filebuf::mpi_filebuf()
00020   : std::streambuf(),
00021     comm( MPI_COMM_NULL ),
00022     comm_root( -1 ),
00023     comm_root_fp( NULL ),
00024     comm_output( 0 ),
00025     comm_buffer( NULL ),
00026     comm_buffer_len( buffer_default_length ),
00027     comm_time(0.0)
00028 {}
00029 
00030 mpi_filebuf::~mpi_filebuf()
00031 {
00032   close();
00033 }
00034 
00035 /*--------------------------------------------------------------------*/
00036 
00037 mpi_filebuf * mpi_filebuf::set_buffer_length( const size_t len )
00038 {
00039   // If already open then abort
00040   if ( NULL != comm_buffer ) return (mpi_filebuf *) NULL ;
00041 
00042   // Wait and verify upon the attempt to open
00043   comm_buffer_len = buffer_putback_length < len ? len : buffer_putback_length ;
00044 
00045   return this ;
00046 }
00047 
00048 /*--------------------------------------------------------------------*/
00049 
00050 mpi_filebuf * mpi_filebuf::open(
00051   MPI_Comm       communicator ,
00052   const int            root_processor ,
00053   const std::ios_base::openmode file_mode ,
00054   const char * const   file_name )
00055 {
00056   const double start_time = MPI_Wtime();
00057 
00058   // If already open then abort
00059   if ( NULL != comm_buffer ) return (mpi_filebuf *) NULL ;
00060 
00061   const int mode =
00062     ( std::ios::in  == file_mode ) ? 'r' : (
00063     ( std::ios::out == file_mode ) ? 'w' : (
00064     ( std::ios::app == file_mode ) ? 'a' : -1 ) );
00065 
00066   int err ;
00067   int rank ;
00068   int local, global ;
00069   int data[3] ;
00070 
00071   // Broadcast the selected root processor and 'C' file mode
00072 
00073   data[0] = root_processor ;
00074   data[1] = mode ;
00075   data[2] = comm_buffer_len ;
00076 
00077   if ( MPI_SUCCESS != ( err = MPI_Bcast(data,3,MPI_INT,0,communicator) ) )
00078     MPI_Abort( communicator , err );
00079 
00080   // Verify that all processors have the same root, mode, and buffer length:
00081 
00082   local = data[0] != root_processor || data[1] != mode || data[2] != (signed) comm_buffer_len ;
00083 
00084   if ( MPI_SUCCESS != ( err =
00085        MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) )
00086     MPI_Abort( communicator , err );
00087 
00088   if ( global ) {
00089     comm_time += MPI_Wtime() - start_time ;
00090     return (mpi_filebuf *) NULL ;
00091   }
00092 
00093   //--------------------------------------------------------------------
00094   // Root processor and mode are consistent.
00095   // All processors try to allocate buffers and the
00096   // root processor tries to open the file.
00097 
00098   if ( MPI_SUCCESS != ( err =  MPI_Comm_rank( communicator , &rank ) ) )
00099     MPI_Abort( communicator , err );
00100 
00101   char * const tmp_buf = (char *) std::malloc( comm_buffer_len );
00102   std::FILE *       tmp_fp  = NULL ;
00103 
00104   local = tmp_buf == NULL ; // Failed allocation ?
00105 
00106   if ( root_processor == rank && ! local ) {
00107     tmp_fp = std::fopen( file_name , ( ( ( mode == 'r' ) ? "r" :
00108             ( mode == 'w' ) ? "w" : "a" ) ) );
00109 #ifdef REDSTORM_SETVBUF
00110     if (tmp_fp) {
00111       if (std::setvbuf(tmp_fp, NULL, _IOFBF, 32768) != 0) {
00112   std::fclose(tmp_fp);
00113   tmp_fp = 0;
00114       }
00115     }
00116 #endif
00117     local = NULL == tmp_fp ;
00118   }
00119 
00120   if ( MPI_SUCCESS != ( err =
00121        MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) )
00122     MPI_Abort( communicator , err );
00123 
00124   if ( global ) {
00125     if ( NULL != tmp_buf ) std::free(   tmp_buf ); // Deallocate
00126     if ( NULL != tmp_fp  ) std::fclose( tmp_fp );  // Close the file
00127     comm_time += MPI_Wtime() - start_time ;
00128     return (mpi_filebuf *) NULL ;
00129   }
00130 
00131   //--------------------------------------------------------------------
00132   // All memory allocated and root processor openned the file
00133   // Update the internal members accordingly.
00134 
00135   comm         = communicator ;
00136   comm_root    = root_processor ;
00137   comm_root_fp = tmp_fp ;
00138   comm_buffer  = tmp_buf ;
00139   comm_output  = mode != 'r' ;
00140 
00141   // If output then set up put-buffer
00142 
00143   if ( comm_output ) setp( comm_buffer, comm_buffer + comm_buffer_len );
00144 
00145   comm_time += MPI_Wtime() - start_time ;
00146 
00147   return this ;
00148 }
00149 
00150 /*--------------------------------------------------------------------*/
00151 
00152 mpi_filebuf * mpi_filebuf::close()
00153 {
00154   mpi_filebuf * tmp = NULL ;
00155 
00156   if ( NULL != comm_buffer ) {
00157 
00158     flush(); // Flush the buffers
00159 
00160     if ( NULL != comm_root_fp ) std::fclose( comm_root_fp ); // Close the file
00161 
00162     std::free( comm_buffer ); // Free the buffer
00163 
00164     if ( comm_output ) setp(NULL,NULL);
00165     else               setg(NULL,NULL,NULL);
00166 
00167     // Reset the members:
00168 
00169     comm         = MPI_COMM_NULL ;
00170     comm_root    = -1 ;
00171     comm_root_fp = NULL ;
00172     comm_output  = 0 ;
00173     comm_buffer  = NULL ;
00174 
00175     tmp = this ;
00176   }
00177 
00178   return tmp ;
00179 }
00180 
00181 /*--------------------------------------------------------------------*/
00182 /* Underflow, a global call.
00183    Read more data from the root processor's file and
00184    broadcast it to all processors.
00185 */
00186 
00187 int mpi_filebuf::underflow()
00188 {
00189   const double start_time = MPI_Wtime();
00190 
00191   if ( NULL != comm_buffer && ! comm_output &&     // Open for read
00192        ( gptr() == NULL || gptr() >= egptr() ) ) { // valid get buffer
00193 
00194 
00195     // Length of the buffer, consistent on all processors
00196     // Entire buffer is offset to accomodate putbacks
00197 
00198     const size_t size = comm_buffer_len - buffer_putback_length ;
00199     char * const buf  = comm_buffer     + buffer_putback_length ;
00200 
00201     int nread ;
00202     int err ;
00203 
00204     // Root processor reads from the file and broadcasts the result
00205 
00206     if ( NULL != comm_root_fp ) nread = std::fread(buf,1,size,comm_root_fp);
00207 
00208     if ( MPI_SUCCESS != ( err =
00209    MPI_Bcast( &nread, 1, MPI_INT, comm_root, comm ) ) )
00210       MPI_Abort(comm,err);
00211 
00212     // If the read is successfull then update the get buffer pointers:
00213 
00214     if ( 0 < nread ) {
00215 
00216       // Broadcast the read buffer to all processors:
00217 
00218       if ( MPI_SUCCESS != ( err =
00219      MPI_Bcast( buf, nread, MPI_BYTE, comm_root, comm ) ) )
00220   MPI_Abort(comm,err);
00221 
00222       // Set the get buffer:
00223 
00224       setg( comm_buffer, buf, buf + nread );
00225 
00226       // Return the next character from the file:
00227 
00228       comm_time += MPI_Wtime() - start_time ;
00229 
00230       return *buf ;
00231     }
00232   }
00233 
00234   // Failed: set the get buffer to NULL and return EOF
00235   setg(NULL, NULL, NULL);
00236 
00237   comm_time += MPI_Wtime() - start_time ;
00238 
00239   return EOF;
00240 }
00241 
00242 /*--------------------------------------------------------------------*/
00243 /* Overflow, a local call.
00244     Output complete lines of data on the root processor.
00245     Increase the buffer size on all other processors.
00246 */
00247 
00248 int mpi_filebuf::overflow( int c )
00249 {
00250   if ( NULL != comm_buffer && comm_output ) { // open for write
00251 
00252     // Determine current offset and length:
00253     char * cur_buffer = comm_buffer ;
00254     size_t cur_offset = pptr()  - cur_buffer ;
00255     size_t cur_length = epptr() - cur_buffer ;
00256 
00257     assert( cur_offset <= cur_length /* detecting abuse by 'ostream' */ );
00258 
00259     if ( NULL != comm_root_fp ) {
00260       if ( std::fwrite(cur_buffer,1,cur_offset,comm_root_fp) != cur_offset ) {
00261   return EOF ; // Write failed
00262       }
00263       cur_offset = 0 ;
00264     }
00265     else if ( cur_length <= cur_offset ) {
00266       // Not root processor, ran out of buffer space and
00267       // cannot write so increase the buffer size:
00268       cur_buffer = (char *) std::realloc( cur_buffer , cur_length *= 2 );
00269     }
00270 
00271     // If buffer is still good then reset the put-buffer
00272 
00273     if ( NULL != cur_buffer ) {
00274 
00275       comm_buffer = cur_buffer ;
00276 
00277       setp( cur_buffer + cur_offset, cur_buffer + cur_length );
00278 
00279       if ( c != EOF ) {
00280 
00281   sputc(c);
00282   return c;
00283       }
00284       else {
00285   return 0;
00286       }
00287     }
00288   }
00289   return EOF ;
00290 }
00291 
00292 /*--------------------------------------------------------------------*/
00293 /* Send output buffers to root processor and
00294    write them to the output file.
00295 */
00296 
00297 mpi_filebuf * mpi_filebuf::flush()
00298 {
00299   const double start_time = MPI_Wtime();
00300 
00301   int result = -1 ; // Failure return value
00302 
00303   if ( NULL != comm_buffer && comm_output ) { // Open for write
00304 
00305     int err ;
00306 
00307     result = 0 ;
00308 
00309     // Determine the local length:
00310 
00311     char * cur_buf = comm_buffer ;
00312     unsigned int cur_len = pptr() - cur_buf ;
00313 
00314     // Determine the global lengths
00315 
00316     char * recv_buf  = NULL ;
00317     int  * recv_len  = NULL ;
00318     int  * recv_disp = NULL ;
00319 
00320     int nproc = 0 ;
00321 
00322 
00323 //  if ( NULL != comm_root_fp ) {
00324 
00325 //  It should no be neccessary to allocate recv_len on non-root
00326 //  nodes, but the MPI_Gatherv on Janus always accesses recv_len
00327 //  even on non-root processors which causes a segmentaion
00328 //  violation if recv_len is set to NULL.
00329 
00330     if ( MPI_SUCCESS != ( err = MPI_Comm_size(comm,&nproc) ) )
00331       MPI_Abort( comm , err );
00332 
00333     recv_len = (int*) std::malloc( sizeof(int) * nproc );
00334 
00335     if ( NULL == recv_len ) MPI_Abort( comm , MPI_ERR_UNKNOWN );
00336 
00337     for (int j = 0 ; j < nproc ; ++j )
00338       recv_len[j] = 0;
00339 //  }
00340 
00341     // Gather buffer lengths on the root processor
00342 
00343     if ( MPI_SUCCESS != ( err =
00344    MPI_Gather(&cur_len,1,MPI_INT,recv_len,1,MPI_INT,comm_root,comm)))
00345       MPI_Abort( comm , err );
00346 
00347     // Root processor must allocate enough buffer space:
00348 
00349     if ( NULL != comm_root_fp ) {
00350 
00351       recv_len[ comm_root ] = 0 ; // Don't send to self
00352 
00353       int i ;
00354 
00355       if ( NULL == ( recv_disp = (int*) std::malloc( sizeof(int) * (nproc + 1) ) ) )
00356   result = -1 ;
00357 
00358       if ( 0 == result ) { // Allocation succeeded
00359 
00360   recv_disp[0] = 0 ;
00361 
00362   for ( i = 0 ; i < nproc ; ++i )
00363     recv_disp[i+1] = recv_disp[i] + recv_len[i] ;
00364 
00365   if ( 0 < recv_disp[nproc] ) {
00366     if ( NULL == ( recv_buf = (char*) std::malloc( recv_disp[nproc] ) ) )
00367       result = -1 ;
00368   }
00369   else {
00370     result = 1 ; // No need to gather!
00371   }
00372 
00373   if ( -1 != result ) {
00374 
00375     // Write the root processor's buffer
00376 
00377     if ( 0 < cur_len ) {
00378       if ( std::fwrite(cur_buf,1,cur_len,comm_root_fp) != cur_len )
00379         result = -1 ; // Write failed
00380 
00381       cur_len = 0 ; // Wrote this buffer
00382     }
00383   }
00384       }
00385       std::fflush( comm_root_fp );
00386     }
00387 
00388     // Root process broadcasts that all is well with the allocation
00389 
00390     if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm)))
00391       MPI_Abort( comm , err );
00392 
00393     if ( 0 == result ) { // All-is-well, need to gather and write
00394 
00395       // Gather the buffers to the root processor
00396 
00397       if ( MPI_SUCCESS != ( err =
00398      MPI_Gatherv(cur_buf,  cur_len,             MPI_BYTE,
00399            recv_buf, recv_len, recv_disp, MPI_BYTE,
00400            comm_root, comm ) ) )
00401   MPI_Abort( comm , err );
00402 
00403        // Output the buffers, beginning with 'comm_root'
00404 
00405       if ( NULL != comm_root_fp ) {
00406 
00407   int i ;
00408 
00409   for ( i = 1 ; i < nproc && 0 == result ; ++i ) {
00410     const int j   = ( i + comm_root ) % nproc ;
00411     const unsigned int len = recv_len[j] ;
00412 
00413     if ( 0 < len )
00414       if ( std::fwrite(recv_buf+recv_disp[j],1,len,comm_root_fp) != len )
00415         result = -1 ; // Write failed
00416   }
00417 
00418   std::fflush( comm_root_fp );
00419       }
00420 
00421       // Broadcast that the write succeeded
00422 
00423       if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm)))
00424   MPI_Abort( comm , err );
00425     }
00426     else if ( 1 == result ) {
00427       // Did not need to gather
00428 
00429       result = 0 ;
00430     }
00431 
00432     // Reset the output buffer
00433 
00434     setp( comm_buffer , epptr() );
00435 
00436     // Clean up allocated memory
00437 
00438     if ( NULL != recv_buf  ) std::free( recv_buf );
00439     if ( NULL != recv_len  ) std::free( recv_len );
00440     if ( NULL != recv_disp ) std::free( recv_disp );
00441   }
00442 
00443   comm_time += MPI_Wtime() - start_time ;
00444 
00445   return -1 == result ? (mpi_filebuf *) NULL : this ;
00446 }
00447 
00448 /*--------------------------------------------------------------------*/
00449 
00450 int mpi_filebuf::sync()
00451 {
00452   // The root processor will push to file, all others ignore
00453 
00454   if ( NULL != comm_root_fp ) {
00455 
00456     // Determine the local length:
00457 
00458     char * cur_buf = comm_buffer ;
00459     int    cur_len = pptr() - cur_buf ;
00460 
00461     if ( 0 < cur_len ) std::fwrite(cur_buf,1,cur_len,comm_root_fp);
00462 
00463     std::fflush( comm_root_fp );
00464 
00465     setp( comm_buffer , epptr() );
00466   }
00467 
00468   return 0 ;
00469 }
00470 
00471 
00472 std::streambuf * mpi_filebuf::setbuf( char * s , std::streamsize n )
00473 {
00474   return this ;
00475 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines