Sierra Toolkit Version of the Day
ParallelInputStream.cpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #include <cstdio>
00010 #include <stdexcept>
00011 #include <stk_util/parallel/ParallelInputStream.hpp>
00012 
00013 /*--------------------------------------------------------------------*/
00014 
00015 namespace stk {
00016 namespace {
00017 
00018 #if defined( STK_HAS_MPI )
00019 
00020 void broadcast( ParallelMachine comm , void * buf , int n )
00021 { MPI_Bcast( buf , n , MPI_BYTE , 0 , comm ); }
00022 
00023 #else
00024 
00025 void broadcast( ParallelMachine , void * , int ) {}
00026 
00027 #endif
00028 
00029 }
00030 }
00031 
00032 /*--------------------------------------------------------------------*/
00033 
00034 namespace stk {
00035 namespace {
00036 
00037 //----------------------------------------------------------------------
00038 
00039 class ParInBuf : public std::streambuf {
00040 public:
00041   enum { BUFFER_LENGTH  = 0x010000 /* 64k bytes */ };
00042   enum { BUFFER_PUTBACK = 0x000010 /*  16 bytes */ };
00043   enum { MAX_READ       = BUFFER_LENGTH - BUFFER_PUTBACK };
00044   ParInBuf( ParallelMachine , const char * const );
00045   virtual ~ParInBuf();
00046 
00047 protected:
00048    virtual int underflow(); // refill the input buffer
00049    virtual int overflow( int c = EOF ); // Not called
00050    virtual int sync(); // No-op
00051    virtual std::streambuf * setbuf( char * , std::streamsize ); // No-op
00052 
00053 private:
00054   void close();
00055 
00056   ParallelMachine m_comm ;
00058   std::FILE     * m_root_fp ;
00059   char            m_buffer[ BUFFER_LENGTH ];
00060 };
00061 
00062 ParInBuf::ParInBuf( ParallelMachine comm , const char * const file_name )
00063   : m_comm( comm ), m_root_fp( NULL )
00064 {
00065   int result = 1 ;
00066 
00067   if ( 0 == parallel_machine_rank( comm ) && NULL != file_name ) {
00068     result = NULL != ( m_root_fp = std::fopen( file_name , "r" ) );
00069   }
00070 
00071   broadcast( m_comm , & result , sizeof(int) );
00072 
00073   if ( ! result ) {
00074     std::string msg;
00075     msg.append("stk::ParallelInputStream( " );
00076     if ( 0 == parallel_machine_rank( comm ) && NULL != file_name ) {
00077       msg.append( file_name );
00078     }
00079     else {
00080       msg.append( "<NULL>" );
00081     }
00082     msg.append( " ) FAILED" );
00083     throw std::runtime_error(msg);
00084   }
00085 }
00086 
00087 void ParInBuf::close()
00088 {
00089   if ( NULL != m_root_fp ) { std::fclose( m_root_fp ); m_root_fp = NULL ; }
00090   setg(NULL,NULL,NULL);
00091 }
00092 
00093 ParInBuf::~ParInBuf()
00094 { close(); }
00095 
00096 int ParInBuf::underflow()
00097 {
00098   char * const buf = m_buffer + BUFFER_PUTBACK ;
00099   int nread = 0 ;
00100 
00101   if ( gptr() == NULL || egptr() <= gptr() ) {
00102     if ( NULL != m_root_fp ) { nread = std::fread(buf,1,MAX_READ,m_root_fp); }
00103     broadcast( m_comm , & nread , sizeof(int) );
00104   }
00105 
00106   if ( 0 < nread ) {
00107     broadcast( m_comm , buf , nread );
00108     setg( m_buffer , buf , buf + nread );
00109   }
00110   else {
00111     close();
00112   }
00113 
00114   return 0 < nread ? *buf : EOF ;
00115 }
00116 
00117 namespace {
00118 
00119 void throw_overflow()
00120 {
00121   std::string msg ;
00122   msg.append("stk::ParallelInputStream::overflow CALL IS ERRONEOUS" );
00123   throw std::runtime_error(msg);
00124 }
00125 
00126 }
00127 
00128 int ParInBuf::overflow( int )
00129 { throw_overflow(); return EOF ; }
00130 
00131 int ParInBuf::sync()
00132 { return 0 ; }
00133 
00134 std::streambuf * ParInBuf::setbuf( char * , std::streamsize )
00135 {
00136   return this ;
00137 }
00138 
00139 //----------------------------------------------------------------------
00140 
00141 } // namespace
00142 
00143 
00144 ParallelInputStream::ParallelInputStream(
00145   ParallelMachine comm ,
00146   const char * const file_name )
00147   : std::istream( new ParInBuf( comm , file_name ) )
00148 {}
00149 
00150 ParallelInputStream::~ParallelInputStream()
00151 { delete rdbuf(); }
00152 
00153 } // namespace stk
00154 
00155 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines