Sierra Toolkit Version of the Day
RuntimeMessage.cpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 #include <list>
00010 #include <string>
00011 #include <sstream>
00012 #include <utility>
00013 #include <vector>
00014 #include <boost/unordered_map.hpp>
00015 
00016 #include <stk_util/environment/RuntimeMessage.hpp>
00017 #include <stk_util/environment/ReportHandler.hpp>
00018 #include <stk_util/util/Bootstrap.hpp>
00019 #include <stk_util/util/Marshal.hpp>
00020 
00021 namespace stk {
00022 
00023 MessageCode
00024 MessageCode::s_defaultMessageCode(100000000);
00025 
00026 namespace {
00027 
00028 void bootstrap()
00029 {
00030   register_message_type(MSG_WARNING, 10000000, "warning");
00031   register_message_type(MSG_DOOMED, 10000000, "error");
00032   register_message_type(MSG_EXCEPTION, 1000000, "exception");
00033   register_message_type(MSG_INFORMATION, 1000000, "informational");
00034 }
00035 
00036 stk::Bootstrap x(bootstrap);
00037 
00038 typedef std::pair<MessageId, std::string> MessageKey;
00039 
00040 typedef boost::unordered_map<MessageKey, Throttle> MessageIdMap;
00041 
00042 MessageIdMap s_messageIdMap;
00043 
00044 MessageIdMap s_deferredMessageIdMap;
00045 
00046 struct DeferredMessage 
00047 {
00048   DeferredMessage()
00049   {}
00050   
00051   DeferredMessage(
00052     size_t              type,
00053     MessageId           message_id,
00054     size_t              throttle_cutoff,
00055     int                 throttle_group,
00056     const std::string & header,
00057     const std::string & aggregate)
00058     : m_type(type),
00059       m_messageId(message_id),
00060       m_rank(0),
00061       m_throttleCutoff(throttle_cutoff),
00062       m_throttleGroup(throttle_group),
00063       m_header(header),
00064       m_aggregate(aggregate)
00065   {}
00066   
00067   size_t                m_type;
00068   MessageId             m_messageId;
00069   int                   m_rank;
00070   size_t                m_throttleCutoff;
00071   int                   m_throttleGroup;
00072   std::string           m_header;
00073   std::string           m_aggregate;
00074 };
00075 
00076 typedef std::vector<DeferredMessage> DeferredMessageVector;
00077 
00078 struct DeferredMessageLess : public std::binary_function<DeferredMessage, DeferredMessage, bool>
00079 {
00080   bool operator()(const DeferredMessage &key_1, const DeferredMessage &key_2) const {
00081     return (key_1.m_type < key_2.m_type)
00082       || (!(key_2.m_type < key_1.m_type) && key_1.m_messageId < key_2.m_messageId)
00083       || (!(key_2.m_type < key_1.m_type) && !(key_2.m_messageId < key_1.m_messageId) && key_1.m_header < key_2.m_header);
00084   }
00085 };
00086       
00087 DeferredMessageVector s_deferredMessageVector;
00088 
00089 struct MessageTypeInfo
00090 {
00091   MessageTypeInfo()
00092     : m_count(0),
00093       m_maxCount(10000000),
00094       m_name("unknown")
00095   {}
00096   
00097   unsigned              m_count;
00098   unsigned              m_maxCount;
00099   std::string           m_name;
00100 };
00101 
00102 typedef boost::unordered_map<unsigned, MessageTypeInfo> MessageTypeInfoMap;
00103 
00104 MessageTypeInfoMap s_messageTypeInfo;
00105 
00106 MessageTypeInfo &
00107 get_message_type_info(
00108   unsigned              type)
00109 {
00110   MessageTypeInfoMap::iterator it = s_messageTypeInfo.find(type & MSG_TYPE_MASK);
00111   if (it != s_messageTypeInfo.end())
00112     return (*it).second;
00113   else
00114     return s_messageTypeInfo[type & MSG_TYPE_MASK];
00115 }
00116 
00117 
00118 enum CutoffStatus {
00119   MSG_DISPLAY           = 0,
00120   MSG_CUTOFF            = 1,
00121   MSG_CUTOFF_EXCEEDED   = 2
00122 };
00123 
00124 
00125 CutoffStatus
00126 count_message(
00127   MessageId             message_id,
00128   const char *          message, 
00129   const Throttle &      throttle)
00130 {
00131   std::pair<MessageIdMap::iterator, bool> res = s_messageIdMap.insert(MessageIdMap::value_type(MessageIdMap::key_type(message_id, message), throttle));
00132   size_t count = ++(*res.first).second.m_count;
00133 
00134   if (count < (*res.first).second.m_cutoff)
00135     return MSG_DISPLAY;
00136   else if (count == (*res.first).second.m_cutoff)
00137     return MSG_CUTOFF;
00138   else
00139     return MSG_CUTOFF_EXCEEDED;
00140 }
00141 
00142 Marshal &operator<<(Marshal &mout, const DeferredMessage &s)  {
00143   mout << s.m_type << s.m_messageId << s.m_rank << s.m_throttleGroup << s.m_throttleCutoff << s.m_header << s.m_aggregate;
00144   return mout;
00145 }
00146 
00147 Marshal &operator>>(Marshal &min, DeferredMessage &s)  {
00148   min >> s.m_type >> s.m_messageId >> s.m_rank >> s.m_throttleGroup >> s.m_throttleCutoff >> s.m_header >> s.m_aggregate;
00149   return min;
00150 }
00151 
00152 } // namespace <empty>
00153 
00154 
00155 void
00156 register_message_type(
00157   unsigned              message_type,
00158   unsigned              max_count,
00159   const char *          name)
00160 {
00161   MessageTypeInfo &message_info = get_message_type_info(message_type);
00162 
00163   message_info.m_maxCount = max_count;
00164   message_info.m_name = name;
00165 }
00166 
00167   
00168 unsigned
00169 get_message_count(
00170   unsigned              message_type)
00171 {
00172   return get_message_type_info(message_type).m_count;
00173 }
00174 
00175 
00176 unsigned
00177 increment_message_count(
00178   unsigned              message_type)
00179 {
00180   return ++get_message_type_info(message_type).m_count;
00181 }
00182 
00183 
00184 void
00185 reset_message_count(
00186   unsigned              message_type)
00187 {
00188   get_message_type_info(message_type).m_count = 0;
00189 }
00190 
00191 
00192 const std::string &
00193 get_message_name(
00194   unsigned              message_type)
00195 {
00196   return get_message_type_info(message_type).m_name;
00197 }
00198 
00199 
00200 void
00201 set_max_message_count(
00202   unsigned              message_type,
00203   unsigned              max_count)
00204 {
00205   get_message_type_info(message_type).m_maxCount = max_count;
00206 }
00207 
00208 
00209 unsigned
00210 get_max_message_count(
00211   unsigned              message_type)
00212 {
00213   return get_message_type_info(message_type).m_maxCount;
00214 }
00215 
00216 
00217 void
00218 report_message(
00219   const char *          message,
00220   unsigned              message_type, 
00221   const MessageCode &   message_code)
00222 {
00223   if (message_type & MSG_DEFERRED)
00224     report(message, message_type);
00225   
00226   else { 
00227     unsigned count = increment_message_count(message_type);
00228     unsigned max_count = get_max_message_count(message_type); 
00229   
00230     if (count == max_count) {
00231       report(message, message_type);
00232 
00233       std::ostringstream s;
00234       s << "Maximum " << get_message_name(message_type) << " count has been exceeded and will no longer be displayed";
00235       report(s.str().c_str(), MSG_WARNING | MSG_SYMMETRIC);
00236     }
00237 
00238     else if (count < max_count) {
00239       CutoffStatus cutoff = count_message(message_code.m_id, "", message_code.m_throttle);
00240     
00241       if (cutoff == MSG_CUTOFF) {
00242         report(message, message_type);
00243 
00244         std::ostringstream s;
00245         s << "Maximum count for this " << get_message_name(message_type) << " has been exceeded and will no longer be displayed";
00246         report(s.str().c_str(), MSG_WARNING | MSG_SYMMETRIC);
00247       }
00248     
00249       else if (cutoff == MSG_DISPLAY)
00250         report(message, message_type);
00251     }
00252   }
00253 }
00254 
00255 
00256 void
00257 reset_throttle_group(
00258   int                   throttle_group)
00259 {
00260   for (MessageIdMap::iterator it = s_messageIdMap.begin(); it != s_messageIdMap.end(); ++it)
00261     if ((*it).second.m_group == throttle_group)
00262       (*it).second.m_count = 0;
00263 }
00264 
00265 
00266 void
00267 add_deferred_message(
00268   int                   message_type,
00269   MessageId             message_id,
00270   size_t                throttle_cutoff,
00271   int                   throttle_group,
00272   const char *          header,
00273   const char *          aggegrate)
00274 {
00275   std::ostringstream s;
00276   s << header << " " << aggegrate;
00277   
00278   report(s.str().c_str(), message_type | MSG_DEFERRED);
00279 
00280   std::pair<MessageIdMap::iterator, bool> res = s_deferredMessageIdMap.insert(MessageIdMap::value_type(MessageIdMap::key_type(message_id, header), Throttle(throttle_cutoff, throttle_group)));
00281   size_t count = ++(*res.first).second.m_count;
00282 
00283   if (count <= throttle_cutoff)
00284     s_deferredMessageVector.push_back(DeferredMessage(message_type, message_id, throttle_cutoff, throttle_group, header, aggegrate));
00285 }
00286 
00287 
00289 
00290 void
00291 report_deferred_messages(
00292   ParallelMachine       comm)
00293 {
00294 #ifdef STK_HAS_MPI
00295   const int p_root = 0 ;
00296   const int p_size = parallel_machine_size(comm);
00297   const int p_rank = parallel_machine_rank(comm);
00298 
00299   for (DeferredMessageVector::iterator it = s_deferredMessageVector.begin(); it != s_deferredMessageVector.end(); ++it)
00300     (*it).m_rank = p_rank;
00301   
00302   Marshal mout;
00303   mout << s_deferredMessageVector;
00304 
00305   DeferredMessageVector deferred_message_vector;
00306 
00307   // Gather the send counts on root processor
00308   std::string send_string(mout.stream.str());
00309   int send_count = send_string.size();
00310   std::vector<int> recv_count(p_size, 0);
00311   int * const recv_count_ptr = &recv_count[0] ;
00312 
00313   int result = MPI_Gather(&send_count, 1, MPI_INT,
00314                           recv_count_ptr, 1, MPI_INT,
00315                           p_root, comm);
00316   if (MPI_SUCCESS != result) {
00317     std::ostringstream message ;
00318     message << "stk::report_deferred_messages FAILED: MPI_Gather = " << result ;
00319     throw std::runtime_error(message.str());
00320   }
00321 
00322   // Receive counts are only non-zero on the root processor:
00323   std::vector<int> recv_displ(p_size + 1, 0);
00324 
00325   for (int i = 0 ; i < p_size ; ++i) {
00326     recv_displ[i + 1] = recv_displ[i] + recv_count[i] ;
00327   }
00328 
00329   const int recv_size = recv_displ[p_size] ;
00330  
00331   std::vector<char> buffer(recv_size);
00332 
00333   {
00334     const char * const send_ptr = send_string.data();
00335     char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ;
00336     int * const recv_displ_ptr = & recv_displ[0] ;
00337 
00338     result = MPI_Gatherv((void *) send_ptr, send_count, MPI_CHAR,
00339                          recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR,
00340                          p_root, comm);
00341     if (MPI_SUCCESS != result) {
00342       std::ostringstream message ;
00343       message << "stk::report_deferred_messages FAILED: MPI_Gatherv = " << result ;
00344       throw std::runtime_error(message.str());
00345     }
00346 
00347 
00348     if (p_rank == p_root) {
00349       for (int i = 0; i < p_size; ++i) {
00350         Marshal min(std::string(recv_ptr + recv_displ[i], recv_ptr + recv_displ[i + 1]));
00351         min >> deferred_message_vector;
00352       }
00353 
00354       std::stable_sort(deferred_message_vector.begin(), deferred_message_vector.end(), DeferredMessageLess());      
00355 
00356       DeferredMessageVector::const_iterator current_message_it = deferred_message_vector.begin();
00357       while (current_message_it != deferred_message_vector.end()) {
00358         const DeferredMessage &current_message = (*current_message_it);
00359         
00360         DeferredMessageVector::const_iterator end = current_message_it + 1;
00361         while (end != deferred_message_vector.end()
00362                && current_message.m_messageId == (*end).m_messageId
00363                && current_message.m_header == (*end).m_header)
00364           ++end;
00365         
00366         std::ostringstream s;
00367         
00368         s << current_message.m_header << current_message.m_aggregate;
00369 
00370         for (DeferredMessageVector::const_iterator it1 = current_message_it + 1; it1 != end; ++it1) {  
00371           bool print = true;
00372           for (DeferredMessageVector::const_iterator it2 = current_message_it; it2 != it1; ++it2)
00373             if ((*it1).m_aggregate == (*it2).m_aggregate) {
00374               print = false;
00375               break;
00376             }
00377           if (print) {
00378             if (!(*it1).m_aggregate.find('\n'))
00379               s << ", ";
00380             s << (*it1).m_aggregate;
00381           }
00382         }
00383 
00384         report_message(s.str().c_str(), current_message.m_type | stk::MSG_SYMMETRIC, MessageCode(current_message.m_messageId, current_message.m_throttleCutoff, current_message.m_throttleGroup));
00385         
00386         current_message_it = end;
00387       }
00388     }
00389   }
00390 
00391   s_deferredMessageIdMap.clear();
00392   s_deferredMessageVector.clear();
00393 #endif
00394 }
00395 
00396 
00397 void
00398 aggregate_messages(
00399   ParallelMachine       comm,
00400   std::ostringstream &  os,
00401   const char *          separator)
00402 {
00403 #ifdef STK_HAS_MPI
00404   std::string message = os.str();
00405   os.str("");
00406   
00407   const int p_root = 0 ;
00408   const int p_size = parallel_machine_size(comm);
00409   const int p_rank = parallel_machine_rank(comm);
00410   
00411   int result ;
00412 
00413   // Gather the send counts on root processor
00414 
00415   int send_count = message.size();
00416 
00417   std::vector<int> recv_count(p_size, 0);
00418 
00419   int * const recv_count_ptr = & recv_count[0] ;
00420 
00421   result = MPI_Gather(& send_count, 1, MPI_INT,
00422                       recv_count_ptr, 1, MPI_INT,
00423                       p_root, comm);
00424 
00425   if (MPI_SUCCESS != result) {
00426     std::ostringstream s;
00427     s << "stk::all_write FAILED: MPI_Gather = " << result ;
00428     throw std::runtime_error(s.str());
00429   }
00430 
00431   // Receive counts are only non-zero on the root processor:
00432   std::vector<int> recv_displ(p_size + 1, 0);
00433 
00434   for (int i = 0 ; i < p_size ; ++i) {
00435     recv_displ[i + 1] = recv_displ[i] + recv_count[i] ;
00436   }
00437 
00438   const int recv_size = recv_displ[ p_size ] ;
00439  
00440   std::vector<char> buffer(recv_size);
00441 
00442   {
00443     const char * const send_ptr = message.c_str();
00444     char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ;
00445     int * const recv_displ_ptr = & recv_displ[0] ;
00446 
00447     result = MPI_Gatherv((void*) send_ptr, send_count, MPI_CHAR,
00448                          recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR,
00449                          p_root, comm);
00450   }
00451 
00452   if (MPI_SUCCESS != result) {
00453     std::ostringstream s ;
00454     s << "stk::all_write FAILED: MPI_Gatherv = " << result ;
00455     throw std::runtime_error(s.str());
00456   }
00457 
00458   if (p_root == (int) p_rank) {
00459     bool first = true;
00460     for (int i = 0 ; i < p_size ; ++i) {
00461       if (recv_count[i]) {
00462         if (!first)
00463           os << separator;
00464         first = false;
00465         char * const ptr = & buffer[ recv_displ[i] ];
00466         os.write(ptr, recv_count[i]);
00467       }
00468     }
00469     os.flush();
00470   }
00471   else
00472     os << message;
00473 #endif
00474 }
00475 
00476 
00477 std::ostream &
00478 operator<<(
00479   std::ostream &        os,
00480   const MessageType &   message_type) 
00481 {
00482 //   if (message_type & MSG_SYMMETRIC)
00483 //     os << "parallel ";
00484   os << get_message_type_info(message_type).m_name;
00485 
00486   return os;
00487 }
00488 
00489 } // namespace stk
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines