UltraScan III
parallel_masters.cpp
Go to the documentation of this file.
1 #include "us_mpi_analysis.h"
2 #include "us_math2.h"
3 #include "us_settings.h"
4 #include "us_tar.h"
5 #include "us_sleep.h"
6 #include "mpi.h"
7 
8 // Analysis jobs with Supervisor and 2 or more Master+Workers parallel groups
10 {
11  // Determine group (0,...), worker count in group, rank within group
13  my_workers = ( my_group == 0 ) ? gcores_count - 2 : gcores_count - 1;
15 
16  if ( my_rank == 0 )
17  { // Supervisor is its own group
18  my_group = MPI_UNDEFINED;
19  group_rank = 0;
20  }
21 
22  else if ( my_group == 0 )
23  { // Group 0 Master(1) and workers(2,...): set group rank 0, 1,...
24  group_rank--;
25  }
26 
27 DbgLv(1) << "my_group" << my_group << "my_workers" << my_workers
28  << "group_rank" << group_rank << "my_rank" << my_rank
29  << "HOST=" << QHostInfo::localHostName();
30 
31 DbgLv(1) << "MPI_Barrier" << my_rank;
32  // Synch up everyone then split into communicator groups
33  MPI_Barrier( MPI_COMM_WORLD );
34  int sstat = MPI_Comm_split( MPI_COMM_WORLD, my_group, group_rank,
35  &my_communicator );
36 DbgLv(1) << "COMM_SPLIT (g r m)" << my_group << group_rank << my_rank
37  << "stat" << sstat;
38 
39  if ( my_rank == 0 )
40  { // Run parallel-masters supervisor (world rank 0)
42  }
43 
44  else if ( group_rank == 0 )
45  { // Run parallel-masters master within a group (group rank 0)
47  }
48 
49  else
50  { // Run parallel-masters worker within a group (group rank 1,...)
52  }
53 
54 DbgLv(1) << "Final-my_rank" << my_rank << " msecs=" << startTime.msecsTo(QDateTime::currentDateTime());
55  MPI_Finalize();
56  exit( 0 );
57 }
58 
59 // Parse Task XML, mainly for max wall-time and master-groups-count
60 void US_MPI_Analysis::task_parse( const QString& xmlfile )
61 {
62  if ( xmlfile.isEmpty() )
63  {
64  if ( my_rank == 0 ) DbgLv(0) << "No separate jobxmlfile";
65  return;
66  }
67 
68 if (my_rank==0) DbgLv(0) << "task_parse xmlfile" << xmlfile;
69  QFile file ( xmlfile );
70  task_params[ "walltime" ] = "1440";
71  task_params[ "mgroupcount" ] = "1";
72 
73  if ( ! file.open( QIODevice::ReadOnly | QIODevice::Text) )
74  { // If no job xml or us3.pbs, return now
75  if ( my_rank == 0 ) DbgLv(0) << "Cannot open file " << xmlfile;
76 
77  if ( proc_count > 64 )
78  {
79  task_params[ "mgroupcount" ] = parameters[ "req_mgroupcount" ];
80  if ( my_rank == 0 ) DbgLv(0) << " Value of mgroupcount set to"
81  << parameters[ "req_mgroupcount" ];
82  }
83  return;
84  }
85 
86  if ( xmlfile == "us3.pbs" )
87  { // If no jobxmlfile.xml, but us3.pbs (-local), get values from it
88  QTextStream ts( &file );
89 
90  while ( ! ts.atEnd() )
91  { // Read us3.pbs lines
92  QString line = ts.readLine();
93  int jwt = line.indexOf( "walltime=" );
94  int jgr = line.indexOf( "pmgroups=" );
95 
96  if ( jwt > 0 )
97  { // Parse PBS line with walltime=
98  QString svalu = line.mid( jwt ).section( "=", 1, 1 );
99  int valu1 = svalu.section( ":", 0, 0 ).toInt();
100  int valu2 = svalu.section( ":", 1, 1 ).toInt();
101  svalu = QString::number( valu1 * 60 + valu2 );
102  task_params[ "walltime" ] = svalu;
103 if (my_rank==0) DbgLv(0) << "h m" << valu1 << valu2 << "svalu" << svalu;
104  }
105 
106  if ( jgr > 0 )
107  { // Parse comment line with pmgroups=
108  QString svalu = line.mid( jgr ).section( "=", 1, 1 );
109  int valu1 = svalu.toInt();
110  valu1 = valu1 < 0 ? 1 : valu1;
111  svalu = QString::number( valu1 );
112  task_params[ "mgroupcount" ] = svalu;
113 if (my_rank==0) DbgLv(0) << "group" << svalu << "valu1" << valu1;
114  }
115  }
116 
117  file.close();
118 if ( my_rank == 0 ) DbgLv(0) << "walltime=" << task_params["walltime"]
119  << "mgroupcount=" << task_params["mgroupcount"];
120  return;
121  }
122 
123  QXmlStreamReader xml( &file );
124 
125  while ( ! xml.atEnd() )
126  { // Parse the *jobxmlfile.xml file (mainly for walltime,mgroupcount)
127  xml.readNext();
128 
129  if ( xml.isStartElement() )
130  {
131  QString name = xml.name().toString();
132 
133  if ( name != "Message" && name != "Header" )
134  {
135  QString value = xml.readElementText();
136 
137  task_params[ name ] = value;
138  }
139  }
140 
141  if ( xml.isEndElement() && xml.name() == "Header" )
142  break;
143  }
144 
145  file.close();
146 if ( my_rank == 0 ) DbgLv(1) << "walltime=" << task_params["walltime"]
147  << "mgroupcount=" << task_params["mgroupcount"];
148 }
149 
150 // Parallel-masters supervisor
152 {
153  // Initialize masters states to READY
154  QVector< int > mstates( mgroup_count, READY );
155  QByteArray msg;
156  MPI_Status status;
157  long int maxrssma = 0L;
158 
159  mc_iteration = 0;
160  int master = 1;
161  int iwork = 0;
162  int tag;
163  max_rss();
164 
165  // Start off all masters doing iteration 1.
166  // Send each group's master the analysis date for use with the model
167  // description. Then, send the iteration index as a trigger to begin
168  // an iteration loop; all groups do iteration 1.
169 
170  for ( int ii = 0; ii < mgroup_count; ii++ )
171  {
172  mc_iteration++;
173  master = ( ii == 0 ) ? 1 : ( ii * gcores_count ); // Master world rank
174 
175 DbgLv(1) << "SUPER: master msgs" << master << "analysisDate" << analysisDate;
176  msg = analysisDate.toAscii();
177  iwork = msg.size();
178  MPI_Send( &iwork,
179  1,
180  MPI_INT,
181  master,
182  ADATESIZE,
183  MPI_COMM_WORLD );
184 
185  MPI_Send( msg.data(),
186  iwork,
187  MPI_BYTE,
188  master,
189  ADATE,
190  MPI_COMM_WORLD );
191 
192  MPI_Send( &mc_iteration,
193  1,
194  MPI_INT,
195  master,
196  STARTITER,
197  MPI_COMM_WORLD );
198 
199  mstates[ ii ] = WORKING; // Mark all masters busy
200  }
201 
202  // Loop waiting on masters results.
203  // Each time through this masters loop, wait for an integer sent
204  // from a master.
205  // Three types (tags) may be received:
206  // (1) UDP message size (message itself will follow);
207  // (2) Iteration-done;
208  // (3) Last-iteration-done (integer received is max group memory used).
209 
210  while ( true )
211  {
212  max_rss();
213 
214  // Get UDP message or iteration-done flag
215 DbgLv(1) << "SUPER: wait on iter done/udp";
216  MPI_Recv( &iwork,
217  1,
218  MPI_INT,
219  MPI_ANY_SOURCE,
220  MPI_ANY_TAG,
221  MPI_COMM_WORLD,
222  &status );
223  bool udpmsg = false;
224  bool islast = false;
225  int isize = 0;
226  master = status.MPI_SOURCE;
227  tag = status.MPI_TAG;
228  QByteArray msg;
229 DbgLv(1) << "SUPER: wait recv'd iwork" << iwork << "master tag" << master << tag;
230 
231  switch( tag )
232  {
233  case UDPSIZE: // UDP message size: get the following UDP message string
234  isize = iwork;
235  msg.resize( isize );
236 
237 DbgLv(1) << "SUPER: wait on UDPmsg - master" << master;
238  MPI_Recv( msg.data(),
239  isize,
240  MPI_BYTE,
241  master,
242  UDPMSG,
243  MPI_COMM_WORLD,
244  &status );
245 
246 DbgLv(1) << "SUPER: UDPmsg:" << QString(msg);
247  send_udp( QString( msg ) ); // Send forwarded UDP message
248 
249  udpmsg = true;
250  break;
251 
252  case DONELAST: // Iteration done and it was the last in group
253  islast = true;
254 
255  case DONEITER: // Iteration done
256  islast = ( mc_iteration >= mc_iterations ) ? true : islast;
257  break;
258 
259  default:
260  DbgLv(0) << "Unknown message type in supervisor" << tag;
261  break;
262  }
263 
264  if ( udpmsg ) continue; // Loop for next master message
265 
266  // If here, the message was a signal that an MC iteration is complete.
267  // Most commonly, the group master that just completed an iteration
268  // is sent the next iteration to begin.
269  // As the total iterations nears the limit, the message tag used may be
270  // one that signals that the master should treat the iteration as the
271  // last one for the group; the group next alerted may not be the same
272  // as the one that just finished (since it may have shut down).
273  // The data received with a done-iteration message is the max memory
274  // used in the master group.
275 
276  master = status.MPI_SOURCE; // Master that has completed
277  int kgroup = master / gcores_count; // Group to which master belongs
278  int mgroup = mstates.indexOf( READY ); // Find next ready master
279 
280  int jgroup = ( mgroup < 0 ) ? kgroup : mgroup; // Ready master index
281  mstates[ kgroup ] = READY; // Mark current as ready
282  int nileft = mstates.count( WORKING ); // Iterations left working
283 DbgLv(1) << "SUPER: mgr kgr jgr" << mgroup << kgroup << jgroup
284  << "left iter iters" << nileft << mc_iteration << mc_iterations;
285 
286  if ( islast )
287  { // Just finished was last iteration for that group
288  mstates[ kgroup ] = INIT; // Mark as finished
289  maxrssma += (long)( iwork ); // Sum in master maxrss
290 DbgLv(1) << "SUPER: (A)maxrssma" << maxrssma << "iwork" << iwork;
291 
292  if ( nileft == 0 )
293  {
294  mc_iterations = mc_iteration; // All are complete
295  break;
296  }
297 
298  if ( mgroup < 0 )
299  continue; // Some iters are still working
300  }
301 
302 
303  // Alert the next available master to do an iteration. Use a
304  // different tag when it is to be the last iteration for a group.
305  // This enables the master and its workers to do normal shutdown.
306  int iter_wk = ++mc_iteration; // Next iteration to do
307  tag = STARTITER; // Flag as normal iteration
308 
309  if ( mc_iteration > ( mc_iterations - mgroup_count ) )
310  tag = STARTLAST; // Flag as last iter for group
311 
312  master = ( jgroup == 0 ) ? 1 : ( jgroup * gcores_count );
313 
314 DbgLv(1) << "SUPER: Send next iter" << iter_wk << "gr ma tg" << jgroup << master << tag;
315  MPI_Send( &iter_wk,
316  1,
317  MPI_INT,
318  master,
319  tag,
320  MPI_COMM_WORLD );
321 
322  mstates[ jgroup ] = WORKING; // Mark group as busy
323  max_rss(); // Memory use of supervisor
324  }
325 
326  // In the parallel-masters case, the supervisor handles end-of-job
327  // messages and file creation.
328 
329  // Get job end time (after waiting, so it has the greatest time stamp)
330  US_Sleep::msleep( 900 );
331  QDateTime endTime = QDateTime::currentDateTime();
332 
333  // Send message and build file with run-time statistics
334  max_rss(); // Max memory for supervisor
335 DbgLv(1) << "SUPER: maxrss maxrssma" << maxrss << maxrssma;
336  maxrss += maxrssma; // Sum in master groups' use
337 
338  int walltime = qRound(
339  submitTime.msecsTo( endTime ) / 1000.0 );
340  int cputime = qRound(
341  startTime .msecsTo( endTime ) / 1000.0 );
342  int maxrssmb = qRound( (double)maxrss / 1024.0 );
343  int kc_iters = parameters[ "mc_iterations" ].toInt();
344 
345  // Output job statistics
346  stats_output( walltime, cputime, maxrssmb,
347  submitTime, startTime, endTime );
348 
349  // Create output archive and remove other output files
350  update_outputs( true );
351 
352  // Send 'Finished' message
353  int wt_hr = walltime / 3600;
354  int wt_min = ( walltime - wt_hr * 3600 ) / 60;
355  int wt_sec = walltime - wt_hr * 3600 - wt_min * 60;
356  int ct_hr = cputime / 3600;
357  int ct_min = ( cputime - ct_hr * 3600 ) / 60;
358  int ct_sec = cputime - ct_hr * 3600 - ct_min * 60;
359  printf( "Us_Mpi_Analysis has finished successfully"
360  " (Wall=%d:%02d:%02d Cpu=%d:%02d:%02d).\n"
361  , wt_hr, wt_min, wt_sec, ct_hr, ct_min, ct_sec );
362  fflush( stdout );
363 
364  if ( mc_iterations < kc_iters )
365  {
366  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
367  + " MB, total run seconds " + QString::number( cputime )
368  + " (Reduced MC Iterations)" );
369  DbgLv(0) << "Finished: maxrss " << maxrssmb
370  << "MB, total run seconds " << cputime
371  << " (Reduced MC Iterations)";
372  }
373 
374  else
375  {
376  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
377  + " MB, total run seconds " + QString::number( cputime ) );
378  DbgLv(0) << "Finished: maxrss " << maxrssmb
379  << "MB, total run seconds " << cputime;
380  }
381 }
382 
383 // Parallel-masters master within a group
385 {
386  MPI_Status status;
387  QByteArray msg;
388 
389  // Get analysis date for model descriptions from supervisor
390 DbgLv(1) << " MASTER Recv from super. my_rank" << my_rank
391  << "group" << my_group;
392  int super = 0;
393  int isize = 0;
394 
395  MPI_Recv( &isize,
396  1,
397  MPI_INT,
398  super,
399  ADATESIZE,
400  MPI_COMM_WORLD,
401  &status );
402 DbgLv(1) << " MASTER: wait recv'd isize" << isize;
403 
404  msg.resize( isize );
405 
406  MPI_Recv( msg.data(),
407  isize,
408  MPI_BYTE,
409  super,
410  ADATE,
411  MPI_COMM_WORLD,
412  &status );
413 
414  analysisDate = QString( msg );
415 DbgLv(1) << " MASTER: Recv'd from super. (g r m)" << my_group << group_rank
416  << my_rank << "analysisDate" << analysisDate << "atype" << analysis_type;
417 
418  // Do the master loop for MC 2DSA or GA
419  if ( analysis_type.startsWith( "2DSA" ) )
420  {
421  pm_2dsa_master();
422  }
423 
424  else if ( analysis_type.startsWith( "GA" ) )
425  {
426  pm_ga_master();
427  }
428 
429  else if ( analysis_type.startsWith( "DMGA" ) )
430  {
431  pm_dmga_master();
432  }
433 }
434 
435 // Parallel-masters worker within a group
437 {
438  if ( analysis_type.startsWith( "2DSA" ) )
439  { // Standard 2DSA worker
440  _2dsa_worker();
441  }
442 
443  else if ( analysis_type.startsWith( "GA" ) )
444  { // Standard GA worker
445  ga_worker();
446  }
447 
448  else if ( analysis_type.startsWith( "DMGA" ) )
449  {
450  dmga_worker();
451  }
452 
453  else
454  { // What??? Should not get here
455  DbgLv(0) << "INVALID ANALYSIS TYPE" << analysis_type;
456  }
457 }
458 
459 // Test time for MC iterations left; compare to walltime
461 {
462  if ( mc_iteration < ( mgroup_count * 4 ) )
463  return; // Don't bother until MC iteration pass 4
464 
465  if ( is_composite_job )
466  return; // Don't check MC iteration if composite
467 
468  QDateTime currTime = QDateTime::currentDateTime();
469  int mins_so_far = ( startTime.secsTo( currTime ) + 59 ) / 60;
470  int mins_left_allow = max_walltime - mins_so_far;
471  int mc_iters_left = ( mins_left_allow * mc_iteration ) / mins_so_far;
472  mc_iters_left = ( mc_iters_left / mgroup_count ) * mgroup_count;
473  int mc_iters_estim = mc_iteration + mc_iters_left;
474 
475  if ( mc_iters_estim < mc_iterations && mc_iters_left < 4 )
476  { // In danger of exceeding allowed time: reduce MC iterations
477  int old_mciters = mc_iterations;
478  mc_iterations = qMax( mc_iteration, mc_iters_estim - 2 );
479  int ac_iters_left = mc_iterations - mc_iteration;
480  ac_iters_left = ( ac_iters_left / mgroup_count ) * mgroup_count;
481  mc_iterations = mc_iteration + ac_iters_left;
482 
483  QString msg = tr( "MC Iterations reduced from %1 to %2, "
484  "due to max. time restrictions." )
485  .arg( old_mciters ).arg( mc_iterations );
486  send_udp( msg );
487 
488  DbgLv(0) << " Specified Maximum Wall-time minutes:" << max_walltime;
489  DbgLv(0) << " Number of minutes used so far: " << mins_so_far;
490  DbgLv(0) << " Allowed minutes remaining: " << mins_left_allow;
491  DbgLv(0) << " MC iterations run so far: " << mc_iteration;
492  DbgLv(0) << " Estimated allowed iterations left: " << mc_iters_left;
493  DbgLv(0) << " Actual adjusted iterations left: " << ac_iters_left;
494  DbgLv(0) << "MC Iterations reduced from" << old_mciters << "to"
495  << mc_iterations << ", due to max. time restrictions.";
496 
497  // Just to be sure, create tar file right now
498  if ( my_group == 0 )
499  update_outputs();
500  }
501 
502  return;
503 }
504 
505 // Parallel-masters version of group 2DSA master
507 {
508 DbgLv(1) << "master start 2DSA" << startTime;
509  init_solutes();
510  fill_queue();
511 
512  work_rss.resize( gcores_count );
513 
514  current_dataset = 0;
515  datasets_to_process = 1; // Process one dataset at a time for now
516 
517  int iter = 1;
518  int super = 0;
519  MPI_Status status;
520 
521  // Get 1st iteration (1) from supervisor
522  MPI_Recv( &iter,
523  1,
524  MPI_INT,
525  super,
526  MPI_ANY_TAG,
527  MPI_COMM_WORLD,
528  &status );
529 
530  int tag = status.MPI_TAG;
531  mc_iteration = iter;
532 
533  while ( true )
534  {
535  int worker;
536 //if ( max_depth > 1 )
537 // DbgLv(1) << " master loop-TOP: jq-empty?" << job_queue.isEmpty() << " areReady?" << worker_status.contains(READY)
538 // << " areWorking?" << worker_status.contains(WORKING);
539 
540  // Give the jobs to the workers
541  while ( ! job_queue.isEmpty() && worker_status.contains( READY ) )
542  {
543  worker = ready_worker();
544 
545  Sa_Job job = job_queue.takeFirst();
546  submit( job, worker );
547  worker_depth [ worker ] = job.mpi_job.depth;
548  worker_status[ worker ] = WORKING;
549  }
550 
551  // All done with the pass if no jobs are ready or running
552  if ( job_queue.isEmpty() && ! worker_status.contains( WORKING ) )
553  {
554  QString progress =
555  "Iteration: " + QString::number( iterations ) +
556  "; Dataset: " + QString::number( current_dataset + 1 ) +
557  "; Meniscus: (Run 1 of 1)" +
558  "; MonteCarlo: " + QString::number( mc_iteration );
559 
560  send_udp( progress );
561 
562  // Manage multiple data sets
563  if ( data_sets.size() > 1 && datasets_to_process == 1 )
564  {
565  global_fit();
566  }
567 
568  if ( ! job_queue.isEmpty() ) continue;
569 
570  // Write out the model
571  max_rss();
572 DbgLv(1) << "2dMast: mc_iter" << mc_iteration
573  << "variance" << simulation_values.variance << "my_group" << my_group;
574 
575  qSort( simulation_values.solutes );
576 
578 
579  if ( mc_iteration >= mc_iterations )
580  {
581  for ( int jj = 1; jj <= my_workers; jj++ )
582  maxrss += work_rss[ jj ];
583  }
584 
585  // Tell the supervisor that an iteration is done
586  iter = (int)maxrss;
587  tag = ( mc_iteration < mc_iterations ) ?
588  DONEITER : DONELAST;
589 
590  MPI_Send( &iter,
591  1,
592  MPI_INT,
593  super,
594  tag,
595  MPI_COMM_WORLD );
596 
597  if ( mc_iteration < mc_iterations )
598  {
600 
601  if ( mc_iteration < mc_iterations )
602  {
603  set_monteCarlo();
604 
605  // Get new Monte Carlo iteration index from supervisor
606  MPI_Recv( &iter,
607  1,
608  MPI_INT,
609  super,
610  MPI_ANY_TAG,
611  MPI_COMM_WORLD,
612  &status );
613 
614  tag = status.MPI_TAG;
615 
616  if ( tag == STARTLAST )
617  mc_iterations = iter;
618 
619  else if ( tag != STARTITER )
620  {
621  DbgLv(0) << "Unexpected tag in PMG 2DSA Master" << tag;
622  continue;
623  }
624 
625  mc_iteration = iter;
626  }
627  }
628 
629  if ( ! job_queue.isEmpty() ) continue;
630 
631  shutdown_all(); // All done
632  break; // Break out of main loop.
633  }
634 
635  // Wait for worker to send a message
636  int sizes[ 4 ];
637 
638  MPI_Recv( sizes,
639  4,
640  MPI_INT,
641  MPI_ANY_SOURCE,
642  MPI_ANY_TAG,
644  &status);
645 
646  worker = status.MPI_SOURCE;
647 
648 //if ( max_depth > 0 )
649 // DbgLv(1) << " PMG master loop-BOTTOM: status TAG" << status.MPI_TAG
650 // << MPI_Job::READY << MPI_Job::RESULTS << " source" << status.MPI_SOURCE;
651  switch( status.MPI_TAG )
652  {
653  case MPI_Job::READY: // Ready for work
654  worker_status[ worker ] = READY;
655  break;
656 
657  case MPI_Job::RESULTS: // Return solute data
658  process_results( worker, sizes );
659  work_rss[ worker ] = sizes[ 3 ];
660  break;
661 
662  default: // Should never happen
663  QString msg = "Master 2DSA: Received invalid status " +
664  QString::number( status.MPI_TAG );
665  abort( msg );
666  break;
667  }
668 
669  max_rss();
670  }
671 }
672 
673 // Parallel-masters version of GA group master
675 {
676  current_dataset = 0;
678  max_depth = 0;
679  calculated_solutes.clear();
680 //DbgLv(1) << "master start GA" << startTime;
681 
682  // Set noise and debug flags
683  simulation_values.noisflag = 0;
684  simulation_values.dbg_level = dbg_level;
685  simulation_values.dbg_timing = dbg_timing;
686 DbgLv(0) << "DEBUG_LEVEL" << simulation_values.dbg_level;
687 
688  // Initialize best fitness
689  best_genes .reserve( gcores_count );
690  best_fitness.reserve( gcores_count );
691 
692  Fitness empty_fitness;
693  empty_fitness.fitness = LARGE;
694 
695  Gene working_gene( buckets.count(), US_Solute() );
696 
697  // Initialize arrays
698  for ( int i = 0; i < gcores_count; i++ )
699  {
700  best_genes << working_gene;
701 
702  empty_fitness.index = i;
703  best_fitness << empty_fitness;
704  }
705 
706  QDateTime time = QDateTime::currentDateTime();
707 
708  // Handle Monte Carlo iterations. There will always be at least 1.
709  while ( true )
710  {
711  // Get MC iteration index from supervisor
712  int iter = 1;
713  int super = 0;
714  MPI_Status status;
715 
716  MPI_Recv( &iter,
717  1,
718  MPI_INT,
719  super,
720  MPI_ANY_TAG,
721  MPI_COMM_WORLD,
722  &status );
723 
724  int tag = status.MPI_TAG;
725 DbgLv(1) << " MASTER: iter" << iter << "gr" << my_group << "tag" << tag;
726  switch ( tag )
727  {
728  case STARTLAST:
729  mc_iterations = iter;
730 
731  case STARTITER:
732  mc_iteration = iter;
733  break;
734 
735  default:
736  DbgLv(0) << "Unknown message to PMG Master" << tag;
737  break;
738  }
739 
740  ga_master_loop();
741 
742  qSort( best_fitness );
743  simulation_values.solutes = best_genes[ best_fitness[ 0 ].index ];
744 
745  int nisols = simulation_values.solutes.size();
746 
747  solutes_from_gene( simulation_values.solutes, nisols );
748 
749 DbgLv(1) << "GaMast: sols size" << simulation_values.solutes.size()
750  << "buck size" << buckets.size();
751 DbgLv(1) << "GaMast: dset size" << data_sets.size();
752 DbgLv(1) << "GaMast: sol0.s" << simulation_values.solutes[0].s;
754 
755 DbgLv(1) << "GaMast: calc_resids return";
756 
757  // Write out the model
758 DbgLv(1) << "2dMast: mc_iter" << mc_iteration
759  << "variance" << simulation_values.variance << "my_group" << my_group;
760 
761  qSort( simulation_values.solutes );
762 
763  // Convert given solute points to s,k for model output
764  double vbar20 = data_sets[ 0 ]->vbar20;
765  QList< int > attrxs;
766  attrxs << attr_x << attr_y << attr_z;
767  bool have_s = ( attrxs.indexOf( ATTR_S ) >= 0 );
768  bool have_k = ( attrxs.indexOf( ATTR_K ) >= 0 );
769  bool have_w = ( attrxs.indexOf( ATTR_W ) >= 0 );
770  bool have_d = ( attrxs.indexOf( ATTR_D ) >= 0 );
771  bool have_f = ( attrxs.indexOf( ATTR_F ) >= 0 );
772  bool vary_v = ( attr_z != ATTR_V );
773 
774  for ( int gg = 0; gg < simulation_values.solutes.size(); gg++ )
775  {
776  US_Solute* solu = &simulation_values.solutes[ gg ];
778  mcomp.s = have_s ? solu->s : 0.0;
779  mcomp.f_f0 = have_k ? solu->k : 0.0;
780  mcomp.mw = have_w ? solu->d : 0.0;
781  mcomp.vbar20 = vary_v ? solu->v : vbar20;
782  mcomp.D = have_d ? solu->d : 0.0;
783  mcomp.f = have_f ? solu->d : 0.0;
784 
786 
787  solu->s = mcomp.s;
788  solu->k = mcomp.f_f0;
789  solu->v = mcomp.vbar20;
790  }
791 
792  calculated_solutes.clear();
794 
795  if ( data_sets.size() == 1 )
796  {
797  write_output();
798  }
799  else
800  {
801  write_global();
802  }
803 
804 #if 0
805  if ( my_group == 0 && ( mc_iteration + mgroup_count ) < mc_iterations )
806  { // Update the tar file of outputs in case of an aborted run
807  update_outputs();
808  }
809 #endif
810 
811  if ( mc_iteration < mc_iterations )
812  { // Before last iteration: check if max is reset based on time limit
813  time_mc_iterations(); // Test if near time limit
814 
815  tag = ( mc_iteration < mc_iterations ) ?
816  DONEITER : DONELAST;
817  }
818 
819  else
820  { // Mark that max iterations reached
821  tag = DONELAST;
822  }
823 
824  // Tell the supervisor that an iteration is done
825  iter = (int)maxrss;
826 DbgLv(1) << "GaMast: iter done: maxrss" << iter << "tag" << tag << DONELAST;
827 
828  MPI_Send( &iter,
829  1,
830  MPI_INT,
831  super,
832  tag,
833  MPI_COMM_WORLD );
834 
835 DbgLv(1) << "GaMast: mc_iter iters" << mc_iteration << mc_iterations;
836  if ( mc_iteration < mc_iterations )
837  { // Set up for next iteration
838  if ( mc_iteration <= mgroup_count )
839  { // Set scaled_data the first time
840  scaled_data = simulation_values.sim_data;
841  }
842 
844  }
845 
846  else // Break out of the loop if all iterations have been done
847  break;
848 
849  } // END: MC iterations loop
850 
851  MPI_Job job;
852 
853  // Send finish to workers ( in the tag )
854  for ( int worker = 1; worker <= my_workers; worker++ )
855  {
856  MPI_Send( &job, // MPI #0
857  sizeof( job ),
858  MPI_BYTE,
859  worker,
860  FINISHED,
861  my_communicator );
862  }
863 }
864 
865 // Parallel-masters version of DMGA group master
867 {
868  current_dataset = 0;
870  max_depth = 0;
871  calculated_solutes.clear();
872 //DbgLv(1) << "master start DMGA" << startTime;
873 
874  // Set noise and debug flags
875  simulation_values.noisflag = 0;
876  simulation_values.dbg_level = dbg_level;
877  simulation_values.dbg_timing = dbg_timing;
878 DbgLv(0) << "DEBUG_LEVEL" << simulation_values.dbg_level;
879 
880  // Build up the base structure of simulations and residuals
881  if ( data_sets.size() == 1 )
882  {
884  data_sets[ 0 ]->run_data, 0.0 );
886  data_sets[ 0 ]->run_data, 0.0 );
887  }
888  else
889  {
890  int ntscan = data_sets[ 0 ]->run_data.scanCount();
891  for ( int ii = 1; ii < data_sets.size(); ii++ )
892  ntscan += data_sets[ ii ]->run_data.scanCount();
893 
894  simulation_values.sim_data .scanData.resize( ntscan );
895  simulation_values.residuals.scanData.resize( ntscan );
896  }
897 
898  // Initialize best fitness
899  best_dgenes .reserve( gcores_count );
900  best_fitness.reserve( gcores_count );
901 
902  // Read in the constraints model and build constraints
903  QString cmfname = "../" + parameters[ "DC_model" ];
904  wmodel.load( cmfname ); // Load the constraints model
905  constraints.load_constraints( &wmodel ); // Build the constraints object
906  constraints.get_work_model ( &wmodel ); // Get the base work model
907 
908  Fitness empty_fitness;
909  empty_fitness.fitness = LARGE;
910  dgene = wmodel;
912  nfvari = ( 1 << nfloatc ) - 1;
913  dgmarker.resize( nfloatc );
914  do_astfem = ( wmodel.components[ 0 ].sigma == 0.0 &&
915  wmodel.components[ 0 ].delta == 0.0 &&
916  wmodel.coSedSolute < 0 &&
917  data_sets[ 0 ]->compress == 0.0 );
918 
919  // Initialize arrays
920  for ( int ii = 0; ii < gcores_count; ii++ )
921  {
922  best_dgenes << dgene;
923 
924  empty_fitness.index = ii;
925  best_fitness << empty_fitness;
926  }
927 
928  QDateTime time = QDateTime::currentDateTime();
929 
930  // Handle Monte Carlo iterations. There will always be at least 1.
931  while ( true )
932  {
933  // Get MC iteration index from supervisor
934  int iter = 1;
935  int super = 0;
936  MPI_Status status;
937 
938  MPI_Recv( &iter,
939  1,
940  MPI_INT,
941  super,
942  MPI_ANY_TAG,
943  MPI_COMM_WORLD,
944  &status );
945 
946  int tag = status.MPI_TAG;
947 DbgLv(1) << " MASTER: iter" << iter << "gr" << my_group << "tag" << tag;
948  switch ( tag )
949  {
950  case STARTLAST:
951  mc_iterations = iter;
952 
953  case STARTITER:
954  mc_iteration = iter;
955  break;
956 
957  default:
958  DbgLv(0) << "Unknown message to PMG Master" << tag;
959  break;
960  }
961 
962  // Compute all generations of an MC iteration
963 
965 
966  // Get the best-fit gene
967 
968  qSort( best_fitness );
969  dgene = best_dgenes[ best_fitness[ 0 ].index ];
970 
971  // Compute the variance (fitness) for the final best-fit model
972 
974 
975  // Write out the model
976 
977  if ( data_sets.size() == 1 )
978  { // Output the single-data model
979  write_output();
980  }
981  else
982  { // Output the global model
983  write_global();
984  }
985 
986 #if 0
987  if ( my_group == 0 )
988  { // Update the tar file of outputs in case of an aborted run
989  update_outputs();
990  }
991 #endif
992 
993  if ( mc_iteration < mc_iterations )
994  { // Before last iteration: check if max is reset based on time limit
995  time_mc_iterations(); // Test if near time limit
996 
997  tag = ( mc_iteration < mc_iterations ) ?
998  DONEITER : DONELAST;
999  }
1000 
1001  else
1002  { // Mark that max iterations reached
1003  tag = DONELAST;
1004  }
1005 
1006  // Tell the supervisor that an iteration is done
1007  iter = (int)maxrss;
1008 DbgLv(1) << "GaMast: iter done: maxrss" << iter << "tag" << tag << DONELAST;
1009 
1010  MPI_Send( &iter,
1011  1,
1012  MPI_INT,
1013  super,
1014  tag,
1015  MPI_COMM_WORLD );
1016 
1017 DbgLv(1) << "GaMast: mc_iter iters" << mc_iteration << mc_iterations;
1018  if ( mc_iteration < mc_iterations )
1019  { // Set up for next iteration
1020  if ( mc_iteration <= mgroup_count )
1021  { // Set scaled_data the first time
1022  scaled_data = simulation_values.sim_data;
1023  }
1024 
1026  }
1027 
1028  else // Break out of the loop if all iterations have been done
1029  break;
1030 
1031  } // END: MC iterations loop
1032 
1033 
1034  MPI_Job job;
1035 
1036  // Send finish to workers ( in the tag )
1037  for ( int worker = 1; worker <= my_workers; worker++ )
1038  {
1039  MPI_Send( &job, // MPI #0
1040  sizeof( job ),
1041  MPI_BYTE,
1042  worker,
1043  FINISHED,
1044  my_communicator );
1045  }
1046 }
1047