UltraScan III
pmasters_compjob.cpp
Go to the documentation of this file.
1 #include "us_mpi_analysis.h"
2 #include "us_math2.h"
3 #include "us_settings.h"
4 #include "us_tar.h"
5 #include "us_sleep.h"
6 #include "mpi.h"
7 
8 // Analysis jobs with Supervisor and 2 or more Master+Workers parallel groups
10 {
11  // Determine group (0,...), worker count in group, rank within group
13  my_workers = ( my_group == 0 ) ? gcores_count - 2 : gcores_count - 1;
15 
16  if ( my_rank == 0 )
17  { // Supervisor is its own group
18  my_group = MPI_UNDEFINED;
19  group_rank = 0;
20  }
21 
22  else if ( my_group == 0 )
23  { // Group 0 Master(1) and workers(2,...): set group rank 0, 1,...
24  group_rank--;
25  }
26 
27 DbgLv(1) << "my_group" << my_group << "my_workers" << my_workers
28  << "group_rank" << group_rank << "my_rank" << my_rank
29  << "HOST=" << QHostInfo::localHostName();
30 
31 DbgLv(1) << "MPI_Barrier" << my_rank;
32  // Synch up everyone then split into communicator groups
33  MPI_Barrier( MPI_COMM_WORLD );
34  int sstat = MPI_Comm_split( MPI_COMM_WORLD, my_group, group_rank,
35  &my_communicator );
36 DbgLv(1) << "COMM_SPLIT (g r m)" << my_group << group_rank << my_rank
37  << "stat" << sstat;
38 
39  if ( my_rank == 0 )
40  { // Run parallel-masters supervisor (world rank 0)
42  }
43 
44  else if ( group_rank == 0 )
45  { // Run parallel-masters master within a group (group rank 0)
47  }
48 
49  else
50  { // Run parallel-masters worker within a group (group rank 1,...)
52  }
53 
54 DbgLv(1) << "Final-my_rank" << my_rank << " msecs=" << startTime.msecsTo(QDateTime::currentDateTime());
55  MPI_Finalize();
56  exit( 0 );
57 }
58 
59 // Parallel-masters supervisor
61 {
62  // Initialize masters states to READY
63  QVector< int > mstates( mgroup_count, READY );
64  QByteArray msg;
65  MPI_Status status;
66  long int maxrssma = 0L;
67 
68  current_dataset = 0;
69  int master = 1;
70  int iwork = 0;
71  int tag;
72  max_rss();
73 
74  // Start off all masters doing each one's first dataset.
75  // Send each group's master the analysis date for use with the model
76  // description. Then, send the dataset index as a trigger to begin
77  // a dataset loop.
78 
79  for ( int ii = 0; ii < mgroup_count; ii++ )
80  {
81  master = ( ii == 0 ) ? 1 : ( ii * gcores_count ); // Master world rank
82 
83 DbgLv(1) << "SUPER: master msgs" << master << "analysisDate" << analysisDate;
84  msg = analysisDate.toAscii();
85  iwork = msg.size();
86  MPI_Send( &iwork,
87  1,
88  MPI_INT,
89  master,
90  ADATESIZE,
91  MPI_COMM_WORLD );
92 
93  MPI_Send( msg.data(),
94  iwork,
95  MPI_BYTE,
96  master,
97  ADATE,
98  MPI_COMM_WORLD );
99 
100  MPI_Send( &current_dataset,
101  1,
102  MPI_INT,
103  master,
104  STARTITER,
105  MPI_COMM_WORLD );
106 
107  mstates[ ii ] = WORKING; // Mark all masters busy
108  current_dataset++;
109  }
110 
111  // Loop waiting on masters results.
112  // Each time through this masters loop, wait for an integer sent
113  // from a master.
114  // Three types (tags) may be received:
115  // (1) UDP message size (message itself will follow);
116  // (2) Iteration-done;
117  // (3) Last-iteration-done (integer received is max group memory used).
118 
119  while ( true )
120  {
121  max_rss();
122 
123  // Get UDP message or dataset-done flag
124 DbgLv(1) << "SUPER: wait on iter done/udp";
125  MPI_Recv( &iwork,
126  1,
127  MPI_INT,
128  MPI_ANY_SOURCE,
129  MPI_ANY_TAG,
130  MPI_COMM_WORLD,
131  &status );
132  bool udpmsg = false;
133  bool islast = false;
134  int isize = 0;
135  int ittest = current_dataset + mgroup_count;
136  master = status.MPI_SOURCE;
137  tag = status.MPI_TAG;
138  QByteArray msg;
139 DbgLv(1) << "SUPER: wait recv'd iwork" << iwork << "master tag" << master << tag;
140 
141  switch( tag )
142  {
143  case UDPSIZE: // UDP message size: get the following UDP message string
144  isize = iwork;
145  msg.resize( isize );
146 
147 DbgLv(1) << "SUPER: wait on UDPmsg - master" << master;
148  MPI_Recv( msg.data(),
149  isize,
150  MPI_BYTE,
151  master,
152  UDPMSG,
153  MPI_COMM_WORLD,
154  &status );
155 
156 DbgLv(1) << "SUPER: UDPmsg:" << QString(msg);
157  send_udp( QString( msg ) ); // Send forwarded UDP message
158 
159  udpmsg = true;
160  break;
161 
162  case DONELAST: // Dataset done and it was the last in group
163  islast = true;
164 DbgLv(1) << "SUPER: DONELAST";
165  break;
166 
167  case DONEITER: // Dataset done
168 // islast = ( ittest >= count_datasets ) ? true : islast;
169 DbgLv(1) << "SUPER: DONEITER islast" << islast << "ittest currds cntds"
170  << ittest << current_dataset << count_datasets;
171  break;
172 
173  default:
174  DbgLv(0) << "Unknown message type in supervisor" << tag;
175  break;
176  }
177 
178  if ( udpmsg ) continue; // Loop for next master message
179 
180  // If here, the message was a signal that a group iteration is complete.
181  // Most commonly, the group master that just completed an iteration
182  // is sent the next iteration to begin.
183  // As the total iterations nears the limit, the message tag used may be
184  // one that signals that the master should treat the iteration as the
185  // last one for the group; the group next alerted may not be the same
186  // as the one that just finished (since it may have shut down).
187  // The data received with a done-iteration message is the max memory
188  // used in the master group.
189 
190  master = status.MPI_SOURCE; // Master that has completed
191  int kgroup = master / gcores_count; // Group to which master belongs
192  int mgroup = mstates.indexOf( READY ); // Find next ready master
193 
194  int jgroup = ( mgroup < 0 ) ? kgroup : mgroup; // Ready master index
195  mstates[ kgroup ] = READY; // Mark current as ready
196  int nileft = mstates.count( WORKING ); // Masters left working
197 DbgLv(1) << "SUPER: mgr kgr jgr" << mgroup << kgroup << jgroup
198  << "left dsets dset" << nileft << count_datasets << current_dataset;
199  int kdset = ( current_dataset / mgroup_count ) * mgroup_count;
200 
201  if ( islast )
202  { // Just finished was last dataset for that group
203  mstates[ kgroup ] = INIT; // Mark as finished
204  maxrssma += (long)( iwork ); // Sum in master maxrss
205 DbgLv(1) << "SUPER: (A)maxrssma" << maxrssma << "iwork" << iwork;
206 
207  if ( nileft == 0 )
208  { // All are complete
209  count_datasets = kdset + mgroup_count;
210 DbgLv(1) << "SUPER: nileft==0 count_datasets" << count_datasets;
211  break;
212  }
213 
214  if ( mgroup < 0 )
215  continue; // Some iters are still working
216  }
217 
218 
219  // Alert the next available master to do an iteration. Use a
220  // different tag when it is to be the last iteration for a group.
221  // This enables the master and its workers to do normal shutdown.
222  ittest = current_dataset + mgroup_count;
223  tag = ( ittest < count_datasets ) ? STARTITER : STARTLAST;
224  master = ( jgroup == 0 ) ? 1 : ( jgroup * gcores_count );
225 
226 DbgLv(1) << "SUPER: Send curr_ds cnt_ds" << current_dataset << count_datasets
227  << "gr ma tg" << jgroup << master << tag;
228  MPI_Send( &current_dataset,
229  1,
230  MPI_INT,
231  master,
232  tag,
233  MPI_COMM_WORLD );
234 
235  mstates[ jgroup ] = WORKING; // Mark group as busy
236  max_rss(); // Memory use of supervisor
237 
238  current_dataset++;
239  }
240 
241  // In the parallel-masters case, the supervisor handles end-of-job
242  // messages and file creation.
243 
244  // Get job end time (after waiting, so it has the greatest time stamp)
245  US_Sleep::msleep( 900 );
246  QDateTime endTime = QDateTime::currentDateTime();
247 
248  // Send message and build file with run-time statistics
249  max_rss(); // Max memory for supervisor
250 DbgLv(1) << "SUPER: maxrss maxrssma" << maxrss << maxrssma;
251  maxrss += maxrssma; // Sum in master groups' use
252 
253  int walltime = qRound(
254  submitTime.msecsTo( endTime ) / 1000.0 );
255  int cputime = qRound(
256  startTime .msecsTo( endTime ) / 1000.0 );
257  int maxrssmb = qRound( (double)maxrss / 1024.0 );
258  int kc_iters = data_sets.size();
259 
260  stats_output( walltime, cputime, maxrssmb,
261  submitTime, startTime, endTime );
262 
263  // Create output archive file and remove other output files
264  update_outputs( true );
265 
266  // Send 'Finished' message.
267  int wt_hr = walltime / 3600;
268  int wt_min = ( walltime - wt_hr * 3600 ) / 60;
269  int wt_sec = walltime - wt_hr * 3600 - wt_min * 60;
270  int ct_hr = cputime / 3600;
271  int ct_min = ( cputime - ct_hr * 3600 ) / 60;
272  int ct_sec = cputime - ct_hr * 3600 - ct_min * 60;
273  printf( "Us_Mpi_Analysis has finished successfully"
274  " (Wall=%d:%02d:%02d Cpu=%d:%02d:%02d).\n"
275  , wt_hr, wt_min, wt_sec, ct_hr, ct_min, ct_sec );
276  fflush( stdout );
277 
278  if ( count_datasets < kc_iters )
279  {
280  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
281  + " MB, total run seconds " + QString::number( cputime )
282  + " (Reduced Datasets Count)" );
283  DbgLv(0) << "Finished: maxrss " << maxrssmb
284  << "MB, total run seconds " << cputime
285  << " (Reduced Datasets Count)";
286  }
287 
288  else
289  {
290  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
291  + " MB, total run seconds " + QString::number( cputime ) );
292  DbgLv(0) << "Finished: maxrss " << maxrssmb
293  << "MB, total run seconds " << cputime;
294  }
295 }
296 
297 // Parallel-masters master within a group
299 {
300  MPI_Status status;
301  QByteArray msg;
302 
303  // Get analysis date for model descriptions from supervisor
304 DbgLv(1) << " MASTER Recv from super. my_rank" << my_rank
305  << "group" << my_group;
306  int super = 0;
307  int isize = 0;
308 
309  MPI_Recv( &isize,
310  1,
311  MPI_INT,
312  super,
313  ADATESIZE,
314  MPI_COMM_WORLD,
315  &status );
316 DbgLv(1) << " MASTER: wait recv'd isize" << isize;
317 
318  msg.resize( isize );
319 
320  MPI_Recv( msg.data(),
321  isize,
322  MPI_BYTE,
323  super,
324  ADATE,
325  MPI_COMM_WORLD,
326  &status );
327 
328  analysisDate = QString( msg );
329 DbgLv(1) << " MASTER: Recv'd from super. (g r m)" << my_group << group_rank
330  << my_rank << "analysisDate" << analysisDate << "atype" << analysis_type;
331 
332  // Do the master loop for MC 2DSA or GA
333  if ( analysis_type.startsWith( "2DSA" ) )
334  {
335  pm_2dsa_cjmast();
336  }
337 
338  else if ( analysis_type.startsWith( "GA" ) )
339  {
340  pm_ga_cjmast();
341  }
342 
343  else if ( analysis_type.startsWith( "DMGA" ) )
344  {
345  pm_dmga_cjmast();
346  }
347 
348  else if ( analysis_type.startsWith( "PCSA" ) )
349  {
350  pm_pcsa_cjmast();
351  }
352 }
353 
354 // Parallel-masters worker within a group
356 {
357  if ( analysis_type.startsWith( "2DSA" ) )
358  { // Standard 2DSA worker
359  _2dsa_worker();
360  }
361 
362  else if ( analysis_type.startsWith( "GA" ) )
363  { // Standard GA worker
364  ga_worker();
365  }
366 
367  else if ( analysis_type.startsWith( "DMGA" ) )
368  {
369  dmga_worker();
370  }
371 
372  else if ( analysis_type.startsWith( "PCSA" ) )
373  {
374  pcsa_worker();
375  }
376 
377  else
378  { // What??? Should not get here
379  DbgLv(0) << "INVALID ANALYSIS TYPE" << analysis_type;
380  }
381 }
382 
383 // Test time for datasets left; compare to walltime
385 {
386  if ( current_dataset < 4 )
387  return; // Don't bother until current_dataset 4
388 
389  QDateTime currTime = QDateTime::currentDateTime();
390  int curr_iter = current_dataset + 1;
391  int mins_so_far = ( startTime.secsTo( currTime ) + 59 ) / 60;
392  int mins_left_allow = max_walltime - mins_so_far;
393  int ds_iters_left = ( mins_left_allow * curr_iter ) / mins_so_far;
394  ds_iters_left = ( ds_iters_left / mgroup_count ) * mgroup_count;
395  int ds_iters_estim = curr_iter + ds_iters_left;
396 
397  if ( ds_iters_estim < count_datasets && ds_iters_left < 4 )
398  { // In danger of exceeding allowed time: reduce count of datasets
399  int old_dsiters = count_datasets;
400  count_datasets = qMax( curr_iter, ds_iters_estim - 2 );
401  int ac_iters_left = count_datasets - curr_iter;
402  ac_iters_left = ( ac_iters_left / mgroup_count ) * mgroup_count;
403  count_datasets = curr_iter + ac_iters_left;
404 
405  QString msg = tr( "Dataset count reduced from %1 to %2, "
406  "due to max. time restrictions." )
407  .arg( old_dsiters ).arg( count_datasets );
408  send_udp( msg );
409 
410  DbgLv(0) << " Specified Maximum Wall-time minutes:" << max_walltime;
411  DbgLv(0) << " Number of minutes used so far: " << mins_so_far;
412  DbgLv(0) << " Allowed minutes remaining: " << mins_left_allow;
413  DbgLv(0) << " Dataset processed so far: " << curr_iter;
414  DbgLv(0) << " Estimated allowed datasets left: " << ds_iters_left;
415  DbgLv(0) << " Actual adjusted datasets left: " << ac_iters_left;
416  DbgLv(0) << "Datasets reduced from" << old_dsiters << "to"
417  << count_datasets << ", due to max. time restrictions.";
418 
419  // Just to be sure, create tar file right now
420  if ( my_group == 0 )
421  update_outputs();
422  }
423 
424  return;
425 }
426 
427 // Parallel-masters version of group 2DSA master
429 {
430 DbgLv(1) << "master start 2DSA" << startTime;
431  init_solutes();
432  fill_queue();
433 
434  work_rss.resize( gcores_count );
435 
436  current_dataset = 0;
437  datasets_to_process = 1; // Process one dataset at a time for now
438 
439  int super = 0;
440  int iter = current_dataset;
441  MPI_Status status;
442 
443  // Get 1st dataset from supervisor
444  MPI_Recv( &current_dataset,
445  1,
446  MPI_INT,
447  super,
448  MPI_ANY_TAG,
449  MPI_COMM_WORLD,
450  &status );
451 
452  int tag = status.MPI_TAG;
453  int ittest = current_dataset + mgroup_count;
454 
455  if ( meniscus_points > 1 )
456  { // Reset the range of fit-meniscus points for this data set
457  US_DataIO::EditedData* edata = &data_sets[ current_dataset ]->run_data;
458  double men_str = edata->meniscus - meniscus_range / 2.0;
459  double men_inc = meniscus_range / ( meniscus_points - 1.0 );
460  double dat_str = edata->radius( 0 );
461  double men_end = men_str + meniscus_range - men_inc;
462  if ( men_end >= dat_str )
463  { // Adjust first meniscus so range remains below data range
464  men_end = dat_str - men_inc / 2.0;
465  men_str = men_end - meniscus_range + men_inc;
466  }
467  for ( int ii = 0; ii < meniscus_points; ii++ )
468  meniscus_values[ ii ] = men_str + men_inc * ii;
469  }
470 
471  while ( true )
472  {
473  int worker;
474  meniscus_value = meniscus_values.size() == 1 ?
475  data_sets[ current_dataset ]->run_data.meniscus :
477 //if ( max_depth > 1 )
478 // DbgLv(1) << " master loop-TOP: jq-empty?" << job_queue.isEmpty() << " areReady?" << worker_status.contains(READY)
479 // << " areWorking?" << worker_status.contains(WORKING);
480 
481  // Give the jobs to the workers
482  while ( ! job_queue.isEmpty() && worker_status.contains( READY ) )
483  {
484  worker = ready_worker();
485 
486  Sa_Job job = job_queue.takeFirst();
487 
488  submit( job, worker );
489 
490  worker_depth [ worker ] = job.mpi_job.depth;
491  worker_status[ worker ] = WORKING;
492  }
493 
494  // All done with the pass if no jobs are ready or running
495  if ( job_queue.isEmpty() && ! worker_status.contains( WORKING ) )
496  {
497  US_DataIO::EditedData* edata = &data_sets[ current_dataset ]->run_data;
498  QString tripleID = edata->cell + edata->channel + edata->wavelength;
499  int menisc_size = meniscus_values.size();
500  QString progress =
501  "Iteration: " + QString::number( iterations ) +
502  "; Dataset: " + QString::number( current_dataset + 1 ) +
503  " (" + tripleID + ") of " + QString::number( count_datasets );
504 
505  if ( mc_iterations > 1 )
506  progress += "; MonteCarlo: "
507  + QString::number( mc_iteration + 1 );
508 
509  else if ( menisc_size > 1 )
510  progress += "; Meniscus: "
511  + QString::number( meniscus_value, 'f', 3 )
512  + tr( " (%1 of %2)" ).arg( meniscus_run + 1 )
513  .arg( menisc_size );
514 
515  else
516  progress += "; RMSD: "
517  + QString::number( sqrt( simulation_values.variance ) );
518 
519  send_udp( progress );
520 
521  // Iterative refinement
522  if ( max_iterations > 1 )
523  {
524  if ( iterations == 1 )
525  qDebug() << " == Refinement Iterations for Dataset"
526  << current_dataset + 1 << "==";
527 
528  qDebug() << "Iterations:" << iterations << " Variance:"
529  << simulation_values.variance << "RMSD:"
530  << sqrt( simulation_values.variance );
531 
532  iterate();
533  }
534 
535  if ( ! job_queue.isEmpty() ) continue;
536 
537  // Write out the model and, possibly, noise(s)
538  max_rss();
539 
540  write_output();
541 
542  // Fit meniscus
543  if ( ( meniscus_run + 1 ) < meniscus_values.size() )
544  {
545  set_meniscus();
546  }
547 
548  if ( ! job_queue.isEmpty() ) continue;
549 
550  // Monte Carlo
551  if ( mc_iterations > 1 )
552  { // Recompute final fit to get simulation and residual
553  mc_iteration++;
554 
557 
559 
561 
562  if ( mc_iteration < mc_iterations )
563  {
564  set_monteCarlo();
565  }
566  }
567 
568  if ( ! job_queue.isEmpty() ) continue;
569 
570  ittest = current_dataset + mgroup_count;
571 
572  if ( ittest >= count_datasets )
573  {
574  for ( int jj = 1; jj <= my_workers; jj++ )
575  maxrss += work_rss[ jj ];
576  }
577 
578  // Tell the supervisor that an iteration is done
579  iter = (int)maxrss;
580  tag = ( ittest < count_datasets ) ? DONEITER : DONELAST;
581 
582  MPI_Send( &iter,
583  1,
584  MPI_INT,
585  super,
586  tag,
587  MPI_COMM_WORLD );
588 
590  {
591  if ( my_group == 0 && ittest < count_datasets )
592  { // If group 0 master, create an intermediate archive
593  update_outputs();
594  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
595  << " : Intermediate archive was created.";
596  }
597 
598  US_DataIO::EditedData* edata
599  = &data_sets[ current_dataset ]->run_data;
600  QString tripleID = edata->cell + edata->channel + edata->wavelength;
601 
602  if ( simulation_values.noisflag == 0 )
603  {
604  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
605  << "(" << tripleID << ")"
606  << " : model was output.";
607  }
608  else
609  {
610  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
611  << "(" << tripleID << ")"
612  << " : model/noise(s) were output.";
613  }
614 
616 
617  if ( ittest < count_datasets )
618  {
619  // Get new dataset index from supervisor
620  MPI_Recv( &iter,
621  1,
622  MPI_INT,
623  super,
624  MPI_ANY_TAG,
625  MPI_COMM_WORLD,
626  &status );
627 
628  tag = status.MPI_TAG;
629 DbgLv(1) << "CJ_MAST Recv tag" << tag << "iter" << iter;
630 
631  if ( tag == STARTLAST )
632  count_datasets = iter + 1;
633 
634  else if ( tag != STARTITER )
635  {
636  DbgLv(0) << "Unexpected tag in PMG 2DSA Master" << tag;
637  continue;
638  }
639 
640  current_dataset = iter;
641  mc_iteration = 0;
642  iterations = 1;
643  meniscus_run = 0;
644 
645  if ( meniscus_points > 1 )
646  { // Reset the range of fit-meniscus points for this data set
647  US_DataIO::EditedData* edata
648  = &data_sets[ current_dataset ]->run_data;
649  double men_str = edata->meniscus - meniscus_range / 2.0;
650  double men_inc = meniscus_range / ( meniscus_points - 1.0 );
651  double dat_str = edata->radius( 0 );
652  double men_end = men_str + meniscus_range - men_inc;
653  if ( men_end >= dat_str )
654  { // Adjust first meniscus so range remains below data range
655  men_end = dat_str - men_inc / 2.0;
656  men_str = men_end - meniscus_range + men_inc;
657  }
658  for ( int ii = 0; ii < meniscus_points; ii++ )
659  meniscus_values[ ii ] = men_str + men_inc * ii;
660  }
661 
662  for ( int ii = 1; ii <= my_workers; ii++ )
663  worker_status[ ii ] = READY;
664 
665  fill_queue();
666 
667  for ( int ii = 0; ii < calculated_solutes.size(); ii++ )
668  calculated_solutes[ ii ].clear();
669 
670  continue;
671  }
672  }
673 
674  if ( ! job_queue.isEmpty() ) continue;
675 
676  shutdown_all(); // All done
677  break; // Break out of main loop.
678  }
679 
680  // Wait for worker to send a message
681  int sizes[ 4 ];
682 
683  MPI_Recv( sizes,
684  4,
685  MPI_INT,
686  MPI_ANY_SOURCE,
687  MPI_ANY_TAG,
689  &status);
690 
691  worker = status.MPI_SOURCE;
692 
693 //if ( max_depth > 0 )
694 // DbgLv(1) << " PMG master loop-BOTTOM: status TAG" << status.MPI_TAG
695 // << MPI_Job::READY << MPI_Job::RESULTS << " source" << status.MPI_SOURCE;
696  switch( status.MPI_TAG )
697  {
698  case MPI_Job::READY: // Ready for work
699  worker_status[ worker ] = READY;
700  break;
701 
702  case MPI_Job::RESULTS: // Return solute data
703  process_results( worker, sizes );
704  work_rss[ worker ] = sizes[ 3 ];
705  break;
706 
707  default: // Should never happen
708  QString msg = "Master 2DSA: Received invalid status " +
709  QString::number( status.MPI_TAG );
710  abort( msg );
711  break;
712  }
713 
714  max_rss();
715  }
716 }
717 
718 // Parallel-masters version of GA group master
720 {
721  current_dataset = 0;
723  max_depth = 0;
724  calculated_solutes.clear();
725 //DbgLv(1) << "master start GA" << startTime;
726 
727  // Set noise and debug flags
728  simulation_values.noisflag = 0;
729  simulation_values.dbg_level = dbg_level;
730  simulation_values.dbg_timing = dbg_timing;
731 DbgLv(0) << "DEBUG_LEVEL" << simulation_values.dbg_level;
732 
733  // Initialize best fitness
734  best_genes .reserve( gcores_count );
735  best_fitness.reserve( gcores_count );
736 
737  Fitness empty_fitness;
738  empty_fitness.fitness = LARGE;
739 
740  Gene working_gene( buckets.count(), US_Solute() );
741 
742  // Initialize arrays
743  for ( int i = 0; i < gcores_count; i++ )
744  {
745  best_genes << working_gene;
746 
747  empty_fitness.index = i;
748  best_fitness << empty_fitness;
749  }
750 
751  QDateTime time = QDateTime::currentDateTime();
752 
753  // Handle Monte Carlo iterations. There will always be at least 1.
754  while ( true )
755  {
756  // Get MC iteration index from supervisor
757  int iter = 1;
758  int super = 0;
759  MPI_Status status;
760 
761  MPI_Recv( &iter,
762  1,
763  MPI_INT,
764  super,
765  MPI_ANY_TAG,
766  MPI_COMM_WORLD,
767  &status );
768 
769  int tag = status.MPI_TAG;
770 DbgLv(1) << " MASTER: iter" << iter << "gr" << my_group << "tag" << tag;
771  switch ( tag )
772  {
773  case STARTLAST:
774  mc_iterations = iter;
775 
776  case STARTITER:
777  mc_iteration = iter;
778  break;
779 
780  default:
781  DbgLv(0) << "Unknown message to PMG Master" << tag;
782  break;
783  }
784 
785  ga_master_loop();
786 
787  qSort( best_fitness );
788  simulation_values.solutes = best_genes[ best_fitness[ 0 ].index ];
789 
790  int nisols = simulation_values.solutes.size();
791 
792  solutes_from_gene( simulation_values.solutes, nisols );
793 
794 DbgLv(1) << "GaMast: sols size" << simulation_values.solutes.size()
795  << "buck size" << buckets.size();
796 DbgLv(1) << "GaMast: dset size" << data_sets.size();
797 DbgLv(1) << "GaMast: sol0.s" << simulation_values.solutes[0].s;
799 
800 DbgLv(1) << "GaMast: calc_resids return";
801 
802  // Write out the model, but skip if not 1st of iteration 1
803  bool do_write = ( mc_iteration > 1 ) ||
804  ( mc_iteration == 1 && my_group == 0 );
805 DbgLv(1) << "2dMast: do_write" << do_write << "mc_iter" << mc_iteration
806  << "variance" << simulation_values.variance << "my_group" << my_group;
807 
808  qSort( simulation_values.solutes );
809 
810  // Convert given solute points to s,k for model output
811  double vbar20 = data_sets[ 0 ]->vbar20;
812  QList< int > attrxs;
813  attrxs << attr_x << attr_y << attr_z;
814  bool have_s = ( attrxs.indexOf( ATTR_S ) >= 0 );
815  bool have_k = ( attrxs.indexOf( ATTR_K ) >= 0 );
816  bool have_w = ( attrxs.indexOf( ATTR_W ) >= 0 );
817  bool have_d = ( attrxs.indexOf( ATTR_D ) >= 0 );
818  bool have_f = ( attrxs.indexOf( ATTR_F ) >= 0 );
819  bool vary_v = ( attr_z != ATTR_V );
820 
821  for ( int gg = 0; gg < simulation_values.solutes.size(); gg++ )
822  {
823  US_Solute* solu = &simulation_values.solutes[ gg ];
825  mcomp.s = have_s ? solu->s : 0.0;
826  mcomp.f_f0 = have_k ? solu->k : 0.0;
827  mcomp.mw = have_w ? solu->d : 0.0;
828  mcomp.vbar20 = vary_v ? solu->v : vbar20;
829  mcomp.D = have_d ? solu->d : 0.0;
830  mcomp.f = have_f ? solu->d : 0.0;
831 
833 
834  solu->s = mcomp.s;
835  solu->k = mcomp.f_f0;
836  solu->v = mcomp.vbar20;
837  }
838 
839  calculated_solutes.clear();
841 
842  if ( do_write )
843  {
844 
845  if ( data_sets.size() == 1 )
846  {
847  write_output();
848  }
849  else
850  {
851  write_global();
852  }
853  }
854 
855  if ( my_group == 0 )
856  { // Update the tar file of outputs in case of an aborted run
857  update_outputs();
858  }
859 
861  { // Before last iteration: check if max is reset based on time limit
862  time_datasets_left(); // Test if near time limit
863 
864  tag = ( current_dataset < count_datasets ) ?
865  DONEITER : DONELAST;
866  }
867 
868  else
869  { // Mark that max iterations reached
870  tag = DONELAST;
871  }
872 
873  // Tell the supervisor that an iteration is done
874  iter = (int)maxrss;
875 DbgLv(1) << "GaMast: iter done: maxrss" << iter << "tag" << tag << DONELAST;
876 
877  MPI_Send( &iter,
878  1,
879  MPI_INT,
880  super,
881  tag,
882  MPI_COMM_WORLD );
883 
884 DbgLv(1) << "GaMast: mc_iter iters" << mc_iteration << mc_iterations;
885  if ( mc_iteration < mc_iterations )
886  { // Set up for next iteration
887  if ( mc_iteration == 1 )
888  { // Set scaled_data the first time
889  scaled_data = simulation_values.sim_data;
890  }
891 
893  }
894 
895  else // Break out of the loop if all iterations have been done
896  break;
897 
898  } // END: MC iterations loop
899 
900 
901  MPI_Job job;
902 
903  // Send finish to workers ( in the tag )
904  for ( int worker = 1; worker <= my_workers; worker++ )
905  {
906  MPI_Send( &job, // MPI #0
907  sizeof( job ),
908  MPI_BYTE,
909  worker,
910  FINISHED,
911  my_communicator );
912  }
913 }
914 
915 // Parallel-masters version of DMGA group master
917 {
918  current_dataset = 0;
920  max_depth = 0;
921  calculated_solutes.clear();
922 //DbgLv(1) << "master start DMGA" << startTime;
923 
924  // Set noise and debug flags
925  simulation_values.noisflag = 0;
926  simulation_values.dbg_level = dbg_level;
927  simulation_values.dbg_timing = dbg_timing;
928 DbgLv(0) << "DEBUG_LEVEL" << simulation_values.dbg_level;
929 
930  // Build up the base structure of simulations and residuals
931  if ( data_sets.size() == 1 )
932  {
934  data_sets[ 0 ]->run_data, 0.0 );
936  data_sets[ 0 ]->run_data, 0.0 );
937  }
938  else
939  {
940  int ntscan = data_sets[ 0 ]->run_data.scanCount();
941  for ( int ii = 1; ii < data_sets.size(); ii++ )
942  ntscan += data_sets[ ii ]->run_data.scanCount();
943 
944  simulation_values.sim_data .scanData.resize( ntscan );
945  simulation_values.residuals.scanData.resize( ntscan );
946  }
947 
948  // Initialize best fitness
949  best_dgenes .reserve( gcores_count );
950  best_fitness.reserve( gcores_count );
951 
952  // Read in the constraints model and build constraints
953  QString cmfname = "../" + parameters[ "DC_model" ];
954  wmodel.load( cmfname ); // Load the constraints model
955  constraints.load_constraints( &wmodel ); // Build the constraints object
956  constraints.get_work_model ( &wmodel ); // Get the base work model
957 
958  Fitness empty_fitness;
959  empty_fitness.fitness = LARGE;
960  dgene = wmodel;
962  nfvari = ( 1 << nfloatc ) - 1;
963  dgmarker.resize( nfloatc );
964  do_astfem = ( wmodel.components[ 0 ].sigma == 0.0 &&
965  wmodel.components[ 0 ].delta == 0.0 &&
966  wmodel.coSedSolute < 0 &&
967  data_sets[ 0 ]->compress == 0.0 );
968 
969  // Initialize arrays
970  for ( int ii = 0; ii < gcores_count; ii++ )
971  {
972  best_dgenes << dgene;
973 
974  empty_fitness.index = ii;
975  best_fitness << empty_fitness;
976  }
977 
978  QDateTime time = QDateTime::currentDateTime();
979 
980  // Handle Monte Carlo iterations. There will always be at least 1.
981  while ( true )
982  {
983  // Get MC iteration index from supervisor
984  int iter = 1;
985  int super = 0;
986  MPI_Status status;
987 
988  MPI_Recv( &iter,
989  1,
990  MPI_INT,
991  super,
992  MPI_ANY_TAG,
993  MPI_COMM_WORLD,
994  &status );
995 
996  int tag = status.MPI_TAG;
997 DbgLv(1) << " MASTER: iter" << iter << "gr" << my_group << "tag" << tag;
998  switch ( tag )
999  {
1000  case STARTLAST:
1001  mc_iterations = iter;
1002 
1003  case STARTITER:
1004  mc_iteration = iter;
1005  break;
1006 
1007  default:
1008  DbgLv(0) << "Unknown message to PMG Master" << tag;
1009  break;
1010  }
1011 
1012  // Compute all generations of an MC iteration
1013 
1014  dmga_master_loop();
1015 
1016  // Get the best-fit gene
1017 
1018  qSort( best_fitness );
1019  dgene = best_dgenes[ best_fitness[ 0 ].index ];
1020 
1021  // Compute the variance (fitness) for the final best-fit model
1022 
1024 
1025  // Output the model
1026 
1027  // Write out the model, but skip if not 1st of iteration 1
1028  bool do_write = ( mc_iteration > 1 ) ||
1029  ( mc_iteration == 1 && my_group == 0 );
1030 
1031  if ( do_write )
1032  {
1033  if ( data_sets.size() == 1 )
1034  { // Output the single-data model
1035  write_output();
1036  }
1037  else
1038  { // Output the global model
1039  write_global();
1040  }
1041  }
1042 
1043  if ( my_group == 0 )
1044  { // Update the tar file of outputs in case of an aborted run
1045  update_outputs();
1046  }
1047 
1049  { // Before last iteration: check if max is reset based on time limit
1050  time_datasets_left(); // Test if near time limit
1051 
1052  tag = ( current_dataset < count_datasets ) ?
1053  DONEITER : DONELAST;
1054  }
1055 
1056  else
1057  { // Mark that max iterations reached
1058  tag = DONELAST;
1059  }
1060 
1061  // Tell the supervisor that an iteration is done
1062  iter = (int)maxrss;
1063 DbgLv(1) << "GaMast: iter done: maxrss" << iter << "tag" << tag << DONELAST;
1064 
1065  MPI_Send( &iter,
1066  1,
1067  MPI_INT,
1068  super,
1069  tag,
1070  MPI_COMM_WORLD );
1071 
1072 DbgLv(1) << "GaMast: mc_iter iters" << mc_iteration << mc_iterations;
1073  if ( mc_iteration < mc_iterations )
1074  { // Set up for next iteration
1075  if ( mc_iteration == 1 )
1076  { // Set scaled_data the first time
1077  scaled_data = simulation_values.sim_data;
1078  }
1079 
1081  }
1082 
1083  else // Break out of the loop if all iterations have been done
1084  break;
1085 
1086  } // END: MC iterations loop
1087 
1088 
1089  MPI_Job job;
1090 
1091  // Send finish to workers ( in the tag )
1092  for ( int worker = 1; worker <= my_workers; worker++ )
1093  {
1094  MPI_Send( &job, // MPI #0
1095  sizeof( job ),
1096  MPI_BYTE,
1097  worker,
1098  FINISHED,
1099  my_communicator );
1100  }
1101 }
1102 
1103 // Parallel-masters version of group PCSA master
1105 {
1106 DbgLv(1) << my_rank << ": master start PCSA" << startTime;
1107  current_dataset = 0;
1108  datasets_to_process = 1; // Process one dataset at a time for now
1109 
1110  work_rss.resize( gcores_count );
1111 
1112  int super = 0;
1113  int kcurve = 0;
1114  int iter = current_dataset;
1115  alpha = 0.0;
1116  mc_iterations = 0;
1117  max_iterations = parameters[ "gfit_iterations" ].toInt();
1118  MPI_Status status;
1119 
1120  // Get 1st dataset from supervisor
1121  MPI_Recv( &current_dataset,
1122  1,
1123  MPI_INT,
1124  super,
1125  MPI_ANY_TAG,
1126  MPI_COMM_WORLD,
1127  &status );
1128 
1129  int tag = status.MPI_TAG;
1130  int ittest = current_dataset + mgroup_count;
1131 
1133 DbgLv(1) << my_rank << ": init sols return";
1134  fill_pcsa_queue();
1135 DbgLv(1) << my_rank << ": fill queue return";
1136 
1137 DbgLv(1) << my_rank << ": recvd ds" << current_dataset << "mgc ittest" << mgroup_count << ittest;
1138 
1139  while ( true )
1140  {
1141  int worker;
1142  meniscus_value = data_sets[ current_dataset ]->run_data.meniscus;
1143 //if ( max_depth > 1 )
1144 // DbgLv(1) << " master loop-TOP: jq-empty?" << job_queue.isEmpty() << " areReady?" << worker_status.contains(READY)
1145 // << " areWorking?" << worker_status.contains(WORKING);
1146 
1147  // Give the jobs to the workers
1148  while ( ! job_queue.isEmpty() && worker_status.contains( READY ) )
1149  {
1150  worker = ready_worker();
1151 DbgLv(1) << my_rank << ": submit worker" << worker;
1152 
1153  Sa_Job job = job_queue.takeFirst();
1154  job.mpi_job.depth = kcurve++;
1155 
1156  submit_pcsa( job, worker );
1157  }
1158 
1159  // All done with the pass if no jobs are ready or running
1160  if ( job_queue.isEmpty() && ! worker_status.contains( WORKING ) )
1161  {
1162  kcurve = 0;
1163  qSort( mrecs );
1164  US_DataIO::EditedData* edata = &data_sets[ current_dataset ]->run_data;
1165  QString tripleID = edata->cell + edata->channel + edata->wavelength;
1166 
1167  simulation_values.variance = mrecs[ 0 ].variance;
1168  simulation_values.zsolutes = mrecs[ 0 ].csolutes;
1169  simulation_values.ti_noise = mrecs[ 0 ].ti_noise;
1170  simulation_values.ri_noise = mrecs[ 0 ].ri_noise;
1171 
1172  QString progress =
1173  "Iteration: " + QString::number( iterations );
1174 
1175  if ( datasets_to_process > 1 )
1176  progress += "; Datasets: "
1177  + QString::number( datasets_to_process );
1178  else
1179  progress += "; Dataset: "
1180  + QString::number( current_dataset + 1 )
1181  + " (" + tripleID + ") of "
1182  + QString::number( count_datasets );
1183 
1184  if ( mc_iterations > 1 )
1185  progress += "; MonteCarlo: "
1186  + QString::number( mc_iteration + 1 );
1187 
1188  else
1189  progress += "; RMSD: "
1190  + QString::number( mrecs[ 0 ].rmsd );
1191 
1192  send_udp( progress );
1193 
1194  // Iterative refinement
1195  if ( max_iterations > 1 )
1196  {
1197  if ( data_sets.size() > 1 && iterations == 1 )
1198  {
1199  if ( datasets_to_process == 1 )
1200  {
1201  qDebug() << " == Grid-Fit Iterations for Dataset"
1202  << current_dataset + 1 << "==";
1203  }
1204  else
1205  {
1206  qDebug() << " == Grid-Fit Iterations for Datasets 1 to"
1207  << datasets_to_process << "==";
1208  }
1209  }
1210 
1211  qDebug() << "Iteration:" << iterations
1212  << " Variance:" << mrecs[ 0 ].variance
1213  << "RMSD:" << mrecs[ 0 ].rmsd;
1214 
1215  iterate_pcsa();
1216  }
1217 
1218  if ( ! job_queue.isEmpty() ) continue;
1219 
1220  iterations = 1;
1221  max_iterations = 1;
1222 
1223  // Clean up mrecs of any empty-calculated-solutes records
1224  clean_mrecs( mrecs );
1225 
1226  // Write out the model and, possibly, noise(s)
1227  write_output();
1228 
1229  if ( ! job_queue.isEmpty() ) continue;
1230 
1231  // Save information from best model
1232  pcsa_best_model();
1233 
1234  // Tikhonov Regularization
1235  tikreg_pcsa();
1236 
1237  // Monte Carlo
1238  montecarlo_pcsa();
1239 
1240  ittest = current_dataset + mgroup_count;
1241 
1242  if ( ittest >= count_datasets )
1243  {
1244  for ( int jj = 1; jj <= my_workers; jj++ )
1245  maxrss += work_rss[ jj ];
1246  }
1247 
1248  // Tell the supervisor that an iteration is done
1249  iter = (int)maxrss;
1250  tag = ( ittest < count_datasets ) ? DONEITER : DONELAST;
1251 
1252  MPI_Send( &iter,
1253  1,
1254  MPI_INT,
1255  super,
1256  tag,
1257  MPI_COMM_WORLD );
1258 
1260  {
1261  if ( my_group == 0 && ittest < count_datasets )
1262  { // If group 0 master, create an intermediate archive
1263  update_outputs();
1264  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
1265  << " : Intermediate archive was created.";
1266  }
1267 
1268  US_DataIO::EditedData* edata
1269  = &data_sets[ current_dataset ]->run_data;
1270  QString tripleID = edata->cell + edata->channel + edata->wavelength;
1271 
1272  if ( simulation_values.noisflag == 0 )
1273  {
1274  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
1275  << "(" << tripleID << ")"
1276  << " : model was output.";
1277  }
1278  else
1279  {
1280  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
1281  << "(" << tripleID << ")"
1282  << " : model/noise(s) were output.";
1283  }
1284 
1286 
1287  if ( ittest < count_datasets )
1288  {
1289  // Get new dataset index from supervisor
1290  MPI_Recv( &iter,
1291  1,
1292  MPI_INT,
1293  super,
1294  MPI_ANY_TAG,
1295  MPI_COMM_WORLD,
1296  &status );
1297 
1298  tag = status.MPI_TAG;
1299 DbgLv(1) << "CJ_MAST Recv tag" << tag << "iter" << iter;
1300 
1301  if ( tag == STARTLAST )
1302  count_datasets = iter + 1;
1303 
1304  else if ( tag != STARTITER )
1305  {
1306  DbgLv(0) << "Unexpected tag in PMG 2DSA Master" << tag;
1307  continue;
1308  }
1309 
1310  current_dataset = iter;
1311  iterations = 1;
1312  mc_iteration = 0;
1313  mc_iterations = 0;
1314  alpha = 0.0;
1315  max_iterations = parameters[ "gfit_iterations" ].toInt();
1316  kcurve = 0;
1317 
1318  fill_queue();
1319 
1320  for ( int ii = 1; ii <= my_workers; ii++ )
1321  worker_status[ ii ] = READY;
1322 
1323  continue;
1324  }
1325  }
1326 
1327  shutdown_all(); // All done
1328  break; // Break out of main loop.
1329  }
1330 
1331  // Wait for worker to send a message
1332  int sizes[ 4 ];
1333 
1334  MPI_Recv( sizes,
1335  4,
1336  MPI_INT,
1337  MPI_ANY_SOURCE,
1338  MPI_ANY_TAG,
1340  &status );
1341 
1342  worker = status.MPI_SOURCE;
1343 
1344 //if ( max_depth > 0 )
1345  DbgLv(1) << my_rank << ": PMG master loop-BOTTOM: status TAG" << status.MPI_TAG
1346  << MPI_Job::READY << MPI_Job::RESULTS << " source" << status.MPI_SOURCE;
1347  switch( status.MPI_TAG )
1348  {
1349  case MPI_Job::READY: // Ready for work
1350  worker_status[ worker ] = READY;
1351  break;
1352 
1353  case MPI_Job::RESULTS: // Return solute data
1354  process_pcsa_results( worker, sizes );
1355  work_rss[ worker ] = sizes[ 3 ];
1356  break;
1357 
1358  default: // Should never happen
1359  QString msg = "Master 2DSA: Received invalid status " +
1360  QString::number( status.MPI_TAG );
1361  abort( msg );
1362  break;
1363  }
1364 
1365  max_rss();
1366  }
1367 }
1368