UltraScan III
us_mpi_analysis.cpp
Go to the documentation of this file.
1 #include "us_mpi_analysis.h"
2 #include "us_math2.h"
3 #include "us_tar.h"
4 #include "us_memory.h"
5 #include "us_sleep.h"
6 #include "us_util.h"
7 #include "us_revision.h"
8 
9 #include <mpi.h>
10 #include <sys/user.h>
11 #include <cstdio>
12 
13 int main( int argc, char* argv[] )
14 {
15  MPI_Init( &argc, &argv );
16  QCoreApplication application( argc, argv );
17 
18  QStringList cmdargs;
19 
20  for ( int jj = 0; jj < argc; jj++ )
21  cmdargs << argv[ jj ];
22 
23  new US_MPI_Analysis( argc, cmdargs );
24 }
25 
26 // Constructor
27 US_MPI_Analysis::US_MPI_Analysis( int nargs, QStringList& cmdargs ) : QObject()
28 {
29  // Command line special parameter keys
30  const QString wallkey ( "-walltime" );
31  const QString pmgckey ( "-mgroupcount" );
32  // Alternate versions of those keys
33  const QString wallkey2( "-WallTimeLimit" );
34  const QString pmgckey2( "-MGroupCount" );
35  const QString wallkey3( "-wallTimeLimit" );
36  const QString pmgckey3( "-pmgc" );
37 
38  MPI_Comm_size( MPI_COMM_WORLD, &proc_count );
39  MPI_Comm_rank( MPI_COMM_WORLD, &my_rank );
40 
41  dbg_level = 0;
42  dbg_timing = FALSE;
43  maxrss = 0L;
44  minimize_opt = 2;
45  in_gsm = FALSE;
46  QString tarfile;
47  QString jxmlfili;
48  task_params[ "walltime" ] = "1440";
49  task_params[ "mgroupcount" ] = "1";
50 
51  // Get some task parameters from the command line
52  for ( int jj = 1; jj < nargs; jj++ )
53  {
54  QString cmdarg = cmdargs[ jj ];
55 if(my_rank==0)
56 DbgLv(0) << "CmdArg: jj" << jj << "cmdarg" << cmdarg;
57 
58  if ( cmdarg.startsWith( "-" ) )
59  { // Argument pair is keyed set (.e.g., "-maxwall <n>")
60  QString cmdval = "";
61  int valx = cmdarg.indexOf( "=" );
62 
63  if ( valx > 0 )
64  { // Syntax is "-key=value"
65  cmdval = cmdarg.mid( valx + 1 ).simplified();
66  }
67 
68  else if ( ( jj + 1 ) < nargs )
69  { // Syntax is "-key value"
70  cmdval = cmdargs[ ++jj ];
71  }
72 
73  if ( cmdarg.contains( wallkey ) ||
74  cmdarg.contains( wallkey2 ) ||
75  cmdarg.contains( wallkey3 ) )
76  { // Get maximum wall time in minutes
77  task_params[ "walltime" ] = cmdval;
78  }
79 
80  else if ( cmdarg.contains( pmgckey ) ||
81  cmdarg.contains( pmgckey2 ) ||
82  cmdarg.contains( pmgckey3 ) )
83  { // Get number of parallel masters groups
84  task_params[ "mgroupcount" ] = cmdval;
85  }
86 if(my_rank==0)
87 DbgLv(0) << "CmdArg: valx" << valx << "cmdval" << cmdval;
88  }
89 
90  else if ( tarfile.isEmpty() )
91  { // First non-keyed argument is tar file path
92  tarfile = cmdarg;
93  }
94 
95  else
96  { // Second non-keyed argument is jobxmlfile
97  jxmlfili = cmdarg;
98  }
99  }
100 if(my_rank==0) {
101 DbgLv(0) << "CmdArg: walltime" << task_params["walltime"];
102 DbgLv(0) << "CmdArg: mgroupcount" << task_params["mgroupcount"];
103 DbgLv(0) << "CmdArg: tarfile" << tarfile;
104 DbgLv(0) << "CmdArg: jxmlfili" << jxmlfili;
105 }
106 
107  attr_x = ATTR_S;
108  attr_y = ATTR_K;
109  attr_z = ATTR_V;
110 
111  if ( my_rank == 0 )
112  socket = new QUdpSocket( this );
113 
114  QStringList ffilt;
115  QDir wkdir = QDir::current();
116  QString work_dir = QDir::currentPath();
117  QString output_dir = work_dir + "/output";
118  QString input_dir = work_dir + "/input";
119 
120  if ( my_rank == 0 )
121  {
122  DbgLv(0) << "Us_Mpi_Analysis " << REVISION;
123 
124  // Unpack the input tarfile
125  US_Tar tar;
126 
127  int result = tar.extract( tarfile );
128 
129  if ( result != TAR_OK ) abort( "Could not unpack " + tarfile );
130 
131  // Create a dedicated output directory and make sure it's empty
132  // During testing, it may not always be empty
133  wkdir.mkdir ( output_dir );
134 
135  QDir odir( output_dir );
136  ffilt.clear();
137  ffilt << "*";
138  QStringList files = odir.entryList( ffilt, QDir::Files );
139  QString file;
140 
141  foreach( file, files ) odir.remove( file );
142  DbgLv(0) << "Start: processor_count" << proc_count;
143  }
144 
145  MPI_Barrier( MPI_COMM_WORLD ); // Sync everybody up
146 
147 
148  startTime = QDateTime::currentDateTime();
149  analysisDate = startTime.toUTC().toString( "yyMMddhhmm" );
150  set_count = 0;
151  iterations = 1;
152 
153  previous_values.variance = 1.0e39; // A large number
154 
155  data_sets .clear();
156  parameters.clear();
157  buckets .clear();
158  maxods .clear();
159 
160  QString xmlfile;
161  ffilt.clear();
162  ffilt << "hpc*.xml";
163  QStringList files = wkdir.entryList( ffilt );
164  if ( files.size() == 0 )
165  {
166  files = QDir( input_dir ).entryList( ffilt );
167  if ( files.size() == 1 )
168  xmlfile = input_dir + "/" + files[ 0 ];
169  }
170  else
171  xmlfile = files[ 0 ];
172 
173  if ( files.size() != 1 ) abort( "Could not find unique hpc input file." );
174 
175  // Parse analysis parameters
176  parse( xmlfile );
177 
178  uint seed = 0;
179 
180  if ( parameters.keys().contains( "seed" ) )
181  {
182  seed = parameters[ "seed" ].toUInt();
183  qsrand( seed + my_rank ); // Set system random sequence
184  }
185  else
187 
188  QString jxmlfile = jxmlfili;
189 
190  // Parse task xml file if present or needed (input argument or detected file)
191  if ( jxmlfile.isEmpty() )
192  { // Not on command line: just look for its presence in the work directory
193  ffilt.clear();
194  ffilt << "*jobxmlfile.xml";
195  ffilt << "jobxmlfile.xml";
196  QStringList jfiles = QDir( input_dir ).entryList( ffilt, QDir::Files );
197 if(my_rank==0) DbgLv(0) << " jfiles size" << jfiles.size();
198 
199  if ( jfiles.size() > 0 )
200  { // Files found in ./input
201  jxmlfile = input_dir + "/" + jfiles[ 0 ];
202  }
203 
204  else
205  { // Files not found in ./input, so try base work directory
206  ffilt << "us3.pbs";
207  jfiles = wkdir.entryList( ffilt, QDir::Files );
208  jxmlfile = jfiles.size() > 0 ? jfiles[ 0 ] : "";
209  }
210 if(my_rank==0)
211 DbgLv(0) << " jfiles size" << jfiles.size() << "jxmlfile" << jxmlfile;
212  }
213 
214  task_parse( jxmlfile );
215 
216  if ( my_rank == 0 )
217  { // Save submit time
218  submitTime = QFileInfo( tarfile ).lastModified();
219 DbgLv(0) << "submitTime " << submitTime
220  << " mgroupcount" << task_params["mgroupcount"].toInt()
221  << " walltime" << task_params["walltime"].toInt();
222 
223  printf( "Us_Mpi_Analysis %s has started.\n", REVISION );
224  }
225 
226  group_rank = my_rank; // Temporary setting for send_udp
227 
228  QString msg_start = QString( "Starting -- " ) + QString( REVISION );
229  send_udp( msg_start ); // Can't send udp message until xmlfile is parsed
230 
231  // Read data
232  for ( int ii = 0; ii < data_sets.size(); ii++ )
233  {
234  US_SolveSim::DataSet* dset = data_sets[ ii ];
235 
236  try
237  {
238  int result = US_DataIO::loadData( ".", dset->edit_file,
239  dset->run_data );
240 
241  if ( result != US_DataIO::OK ) throw result;
242  }
243  catch ( int error )
244  {
245  QString msg = "Bad data file " + dset->auc_file + " "
246  + dset->edit_file;
247 DbgLv(0) << "BAD DATA. error" << error << "rank" << my_rank;
248  abort( msg, error );
249  }
250  catch ( US_DataIO::ioError error )
251  {
252  QString msg = "Bad data file " + dset->auc_file + " "
253  + dset->edit_file;
254 DbgLv(0) << "BAD DATA. ioError" << error << "rank" << my_rank << proc_count;
255 //if(proc_count!=16)
256  abort( msg, error );
257  }
258 //DbgLv(0) << "Good DATA. rank" << my_rank;
259 
260  for ( int jj = 0; jj < dset->noise_files.size(); jj++ )
261  {
262  US_Noise noise;
263 
264  int err = noise.load( dset->noise_files[ jj ] );
265 
266  if ( err != 0 )
267  {
268  QString msg = "Bad noise file " + dset->noise_files[ jj ];
269  abort( msg );
270  }
271 
272  if ( noise.apply_to_data( dset->run_data ) != 0 )
273  {
274  QString msg = "Bad noise file " + dset->noise_files[ jj ];
275  abort( msg );
276  }
277  }
278 
279  dset->temperature = dset->run_data.average_temperature();
281  dset->temperature );
282 
283  if ( dset->centerpiece_bottom == 7.3 )
284  abort( "The bottom is set to the invalid default value of 7.3" );
285  }
286 
287  // After reading all input, set the working directory for file output.
288  QDir::setCurrent( output_dir );
289 
290  // Set some minimums
291  max_iterations = parameters[ "max_iterations" ].toInt();
292  max_iterations = qMax( max_iterations, 1 );
293 
294  mc_iterations = parameters[ "mc_iterations" ].toInt();
295  mc_iterations = qMax( mc_iterations, 1 );
296 
297  meniscus_range = parameters[ "meniscus_range" ].toDouble();
298  meniscus_points = parameters[ "meniscus_points" ].toInt();
299  meniscus_points = qMax( meniscus_points, 1 );
301 
302  // Do some parameter checking
303  count_datasets = data_sets.size();
304  is_global_fit = US_Util::bool_flag( parameters[ "global_fit" ] );
306  if ( my_rank == 0 )
307  {
308  DbgLv(0) << " count_datasets " << count_datasets;
309  DbgLv(0) << " is_global_fit " << is_global_fit;
310  DbgLv(0) << " is_composite_job " << is_composite_job;
311  }
312 
313  if ( is_global_fit && meniscus_points > 1 )
314  {
315  abort( "Meniscus fit is not compatible with global fit" );
316  }
317 
318  if ( meniscus_points > 1 && mc_iterations > 1 )
319  {
320  abort( "Meniscus fit is not compatible with Monte Carlo analysis" );
321  }
322 
323  bool noise = parameters[ "tinoise_option" ].toInt() > 0 ||
324  parameters[ "rinoise_option" ].toInt() > 0;
325 
326  if ( ! analysis_type.startsWith( "PCSA" ) &&
327  mc_iterations > 1 && noise )
328  {
329  abort( "Monte Carlo iteration is not compatible with noise computation" );
330  }
331 
332  if ( is_global_fit && noise )
333  {
334  abort( "Global fit is not compatible with noise computation" );
335  }
336 
337 
338  // Calculate meniscus values
340 
341  double meniscus_start = data_sets[ 0 ]->run_data.meniscus
342  - meniscus_range / 2.0;
343 
344  double dm = ( meniscus_points > 1 )
345  ? meniscus_range / ( meniscus_points - 1 ): 0.0;
346 
347  for ( int i = 0; i < meniscus_points; i++ )
348  {
349  meniscus_values[ i ] = meniscus_start + dm * i;
350  }
351 
352  // Get lower limit of data and last (largest) meniscus value
353  double start_range = data_sets[ 0 ]->run_data.radius( 0 );
354  double last_meniscus = meniscus_values[ meniscus_points - 1 ];
355 
356  if ( last_meniscus >= start_range )
357  {
358  if ( my_rank == 0 )
359  {
360  qDebug() << "*ERROR* Meniscus value extends into data";
361  qDebug() << " data meniscus" << data_sets[0]->run_data.meniscus;
362  qDebug() << " meniscus_start" << meniscus_start;
363  qDebug() << " meniscus_range" << meniscus_range;
364  qDebug() << " meniscus_points" << meniscus_points;
365  qDebug() << " dm" << dm;
366  qDebug() << " last_meniscus" << last_meniscus;
367  qDebug() << " left_data" << start_range;
368  }
369  abort( "Meniscus value extends into data" );
370  }
371 
372  population = parameters[ "population" ].toInt();
373  generations = parameters[ "generations" ].toInt();
374  crossover = parameters[ "crossover" ].toInt();
375  mutation = parameters[ "mutation" ].toInt();
376  plague = parameters[ "plague" ].toInt();
377  migrate_count = parameters[ "migration" ].toInt();
378  elitism = parameters[ "elitism" ].toInt();
379  mutate_sigma = parameters[ "mutate_sigma" ].toDouble();
380  p_mutate_s = parameters[ "p_mutate_s" ].toDouble();
381  p_mutate_k = parameters[ "p_mutate_k" ].toDouble();
382  p_mutate_sk = parameters[ "p_mutate_sk" ].toDouble();
383  regularization = parameters[ "regularization" ].toDouble();
384  concentration_threshold = parameters[ "conc_threshold" ].toDouble();
385  minimize_opt = parameters[ "minimize_opt" ].toInt();
387  total_points = 0;
388 
389  // Calculate s, D corrections for calc_residuals; simulation parameters
390  for ( int ee = 0; ee < data_sets.size(); ee++ )
391  {
392  US_SolveSim::DataSet* ds = data_sets[ ee ];
393  US_DataIO::EditedData* edata = &ds->run_data;
394 
395  // Convert to a different structure and calulate the s and D corrections
397  sd.density = ds->density;
398  sd.viscosity = ds->viscosity;
399  sd.manual = ds->manual;
400  sd.vbar20 = ds->vbar20;
401  sd.vbar = ds->vbartb;
402 if ( my_rank == 0 )
403  DbgLv(0) << "density/viscosity/comm vbar20/commvbar/compress"
404  << sd.density << sd.viscosity << sd.vbar20 << sd.vbar << ds->compress;
405 
407 
410 if ( my_rank == 0 )
411  DbgLv(0) << "s20w_correction/D20w_correction"
412  << sd.s20w_correction << sd.D20w_correction;
413 
414  // Set up simulation parameters for the data set
415 
416  bool incl_speed = ( ds->simparams.speed_step.count() < 1 );
417  ds->simparams.initFromData( NULL, *edata, incl_speed );
418 
419  ds->simparams.rotorcoeffs[ 0 ] = ds->rotor_stretch[ 0 ];
420  ds->simparams.rotorcoeffs[ 1 ] = ds->rotor_stretch[ 1 ];
421  ds->simparams.bottom_position = ds->centerpiece_bottom;
422  ds->simparams.bottom = ds->centerpiece_bottom;
423  ds->simparams.band_forming = ds->simparams.band_volume != 0.0;
424 
425  // Accumulate total points and set dataset index,points
426  int npoints = edata->scanCount() * edata->pointCount();
428  ds_points << npoints;
429  total_points += npoints;
430 
431  // Initialize concentrations vector in case of global fit
432  concentrations << 1.0;
433 
434  // Accumulate maximum OD for each dataset
435  double odlim = edata->ODlimit;
436  double odmax = 0.0;
437 
438  for ( int ss = 0; ss < edata->scanCount(); ss++ )
439  for ( int rr = 0; rr < edata->pointCount(); rr++ )
440  odmax = qMax( odmax, edata->value( ss, rr ) );
441 
442  odmax = qMin( odmax, odlim );
443 DbgLv(2) << "ee" << ee << "odlim odmax" << odlim << odmax;
444 
445  maxods << odmax;
446  }
447 
448  double s_max = parameters[ "s_max" ].toDouble() * 1.0e-13;
449  double x_max = parameters[ "x_max" ].toDouble() * 1.0e-13;
450  s_max = ( s_max == 0.0 ) ? x_max : s_max;
451  double y_max = 1e-39;
452 
453  // Check GA buckets
454  if ( analysis_type == "GA" )
455  {
456  if ( buckets.size() < 1 )
457  abort( "No buckets defined" );
458 
459  QList< QRectF > bucket_rects;
460  x_max = buckets[ 0 ].x_max;
461 
462  // Put into Qt rectangles (upper left, lower right points)
463  for ( int i = 0; i < buckets.size(); i++ )
464  {
465  limitBucket( buckets[ i ] );
466 
467  bucket_rects << QRectF(
468  QPointF( buckets[ i ].x_min, buckets[ i ].y_max ),
469  QPointF( buckets[ i ].x_max, buckets[ i ].y_min ) );
470 
471  x_max = qMax( x_max, buckets[ i ].x_max );
472  y_max = qMax( y_max, buckets[ i ].y_max );
473  }
474 
475  x_max *= 1.0e-13;
476 
477  for ( int i = 0; i < bucket_rects.size() - 1; i++ )
478  {
479  for ( int j = i + 1; j < bucket_rects.size(); j++ )
480  {
481  if ( bucket_rects[ i ].intersects( bucket_rects[ j ] ) )
482  {
483  QRectF bukov = bucket_rects[i].intersected( bucket_rects[j] );
484  double sdif = bukov.width();
485  double fdif = bukov.height();
486 
487  if ( my_rank == 0 )
488  {
489  DbgLv(0) << "Bucket" << i << "overlaps bucket" << j;
490  DbgLv(0) << " Overlap: st w h" << bukov.topLeft()
491  << sdif << fdif;
492  DbgLv(0) << " Bucket i" << bucket_rects[i].topLeft()
493  << bucket_rects[i].bottomRight();
494  DbgLv(0) << " Bucket j" << bucket_rects[j].topLeft()
495  << bucket_rects[j].bottomRight();
496  DbgLv(0) << " Bucket i right "
497  << QString().sprintf( "%22.18f", bucket_rects[i].right() );
498  DbgLv(0) << " Bucket j left "
499  << QString().sprintf( "%22.18f", bucket_rects[j].left() );
500  DbgLv(0) << " Bucket i bottom"
501  << QString().sprintf( "%22.18f", bucket_rects[i].bottom() );
502  DbgLv(0) << " Bucket j top "
503  << QString().sprintf( "%22.18f", bucket_rects[j].top() );
504  }
505 
506  if ( qMin( sdif, fdif ) < 1.e-6 )
507  { // Ignore the overlap if it is trivial
508  if ( my_rank == 0 )
509  DbgLv(0) << "Trivial overlap is ignored.";
510  continue;
511  }
512 
513  abort( "Buckets overlap" );
514  }
515  }
516  }
517 
518  s_max = x_max; // By default s_max is x_max for GA
519  }
520 
521  // Do a check of implied grid size
522  QString smsg;
523  double zval = parameters[ "bucket_fixed" ].toDouble();
524  if ( analysis_type.startsWith( "PCSA" ) )
525  zval = parameters[ "z_value" ].toDouble();
526 
527  if ( attr_x != ATTR_S )
528  { // We will need to determine the maximum s value
529  if ( attr_y == ATTR_S ) // s_max may be y_max
530  s_max = y_max * 1.0e-13;
531  else if ( attr_z == ATTR_S ) // s_max may be fixed value
532  s_max = zval * 1.0e-13;
533  else if ( analysis_type == "GA" )
534  { // s_max may need computing
535  s_max = 1e-39;
536  zval = ( attr_z == ATTR_V ) ? data_sets[ 0 ]->vbar20 : zval;
537  zval = ( zval == 0.0 ) ? data_sets[ 0 ]->vbar20 : zval;
539  zcomp.s = 0.0;
540  zcomp.f_f0 = 0.0;
541  zcomp.mw = 0.0;
542  zcomp.vbar20 = 0.0;
543  zcomp.D = 0.0;
544  zcomp.f = 0.0;
545 
546  for ( int jj = 0; jj < buckets.size(); jj++ )
547  { // Compute s using given x,y,fixed and find the max
548  double xval = buckets[ jj ].x_max;
549  double yval = buckets[ jj ].y_max;
550  US_Model::SimulationComponent mcomp = zcomp;
551  set_comp_attrib( mcomp, xval, attr_x );
552  set_comp_attrib( mcomp, yval, attr_y );
553  set_comp_attrib( mcomp, zval, attr_z );
555 if (my_rank==0) DbgLv(0) << "ckGrSz: buck" << jj << "xv yv zv"
556  << xval << yval << zval << "mc s k w v d f"
557  << mcomp.s << mcomp.f_f0 << mcomp.mw << mcomp.vbar20 << mcomp.D << mcomp.f;
558  s_max = qMax( s_max, mcomp.s );
559  }
560  }
561 
562  }
563 
564  s_max = ( s_max == 0.0 ) ? 10e-13 : s_max;
565 if (my_rank==0) DbgLv(0) << "ckGrSz: s_max" << s_max << "attr_x,y,z"
566  << attr_x << attr_y << attr_z;
567 
568  if ( US_SolveSim::checkGridSize( data_sets, s_max, smsg ) )
569  {
570  if ( my_rank == 0 )
571  qDebug() << smsg;
572  abort( "Implied Grid Size is Too Large!" );
573  }
574 //else
575 // qDebug() << "check_grid_size FALSE x_max" << x_max
576 // << "rpm" << data_sets[0]->simparams.speed_step[0].rotorspeed;
577 
578  // Set some defaults
579  if ( ! parameters.contains( "mutate_sigma" ) )
580  parameters[ "mutate_sigma" ] = "2.0";
581 
582  if ( ! parameters.contains( "p_mutate_s" ) )
583  parameters[ "p_mutate_s" ] = "20";
584 
585  if ( ! parameters.contains( "p_mutate_k" ) )
586  parameters[ "p_mutate_k" ] = "20";
587 
588  if ( ! parameters.contains( "p_mutate_sk" ) )
589  parameters[ "p_mutate_sk" ] = "20";
590 
591  count_calc_residuals = 0; // Internal instrumentation
592  meniscus_run = 0;
593  mc_iteration = 0;
594 
595  // Determine masters-group count and related controls
596  mgroup_count = task_params[ "mgroupcount" ].toInt();
597  max_walltime = task_params[ "walltime" ].toInt();
598 
599  if ( is_composite_job )
600  {
601  if ( count_datasets < 2 || mgroup_count > ( count_datasets + 2 ) )
602  mgroup_count = 1;
603  }
604  else
605  {
606  if ( mc_iterations < 2 || mgroup_count > ( mc_iterations + 2 ) )
607  mgroup_count = 1;
608  }
609 
610  mgroup_count = qMax( 1, mgroup_count );
612 
613  if ( mgroup_count < 2 )
614  start(); // Start standard job
615 
616  else if ( is_composite_job )
617  pm_cjobs_start(); // Start parallel-masters job (composite)
618 
619  else
620  pmasters_start(); // Start parallel-masters job (mc_iters)
621 }
622 
623 // Main function (single master group)
625 {
626  my_communicator = MPI_COMM_WORLD;
627  my_workers = proc_count - 1;
630 
631  // Real processing goes here
632  if ( analysis_type.startsWith( "2DSA" ) )
633  {
634  iterations = parameters[ "montecarlo_value" ].toInt();
635 
636  if ( iterations < 1 ) iterations = 1;
637 
638  if ( my_rank == 0 )
639  _2dsa_master();
640  else
641  _2dsa_worker();
642  }
643 
644  else if ( analysis_type.startsWith( "GA" ) )
645  {
646  if ( my_rank == 0 )
647  ga_master();
648  else
649  ga_worker();
650  }
651 
652  else if ( analysis_type.startsWith( "DMGA" ) )
653  {
654  if ( my_rank == 0 )
655  dmga_master();
656  else
657  dmga_worker();
658  }
659 
660  else if ( analysis_type.startsWith( "PCSA" ) )
661  {
662  if ( my_rank == 0 )
663  pcsa_master();
664  else
665  pcsa_worker();
666  }
667 
668  int exit_status = 0;
669 
670  // Pack results
671  if ( my_rank == 0 )
672  {
673  // Get job end time (after waiting so it has greatest time stamp)
674  US_Sleep::msleep( 900 );
675  QDateTime endTime = QDateTime::currentDateTime();
676  bool reduced_iter = false;
677 
678  if ( mc_iterations > 0 )
679  {
680  int kc_iterations = parameters[ "mc_iterations" ].toInt();
681 
682  if ( mc_iterations < kc_iterations )
683  {
684  reduced_iter = true;
685  exit_status = 99;
686  }
687  }
688 
689  // Build file with run-time statistics
690  int walltime = qRound(
691  submitTime.msecsTo( endTime ) / 1000.0 );
692  int cputime = qRound(
693  startTime .msecsTo( endTime ) / 1000.0 );
694  int maxrssmb = qRound( (double)maxrss / 1024.0 );
695  int kc_iters = data_sets.size();
696 
697  stats_output( walltime, cputime, maxrssmb,
698  submitTime, startTime, endTime );
699 
700  // Create archive file of outputs and remove other output files
701  update_outputs( true );
702 
703  // Send "Finished" message.
704  int wt_hr = walltime / 3600;
705  int wt_min = ( walltime - wt_hr * 3600 ) / 60;
706  int wt_sec = walltime - wt_hr * 3600 - wt_min * 60;
707  int ct_hr = cputime / 3600;
708  int ct_min = ( cputime - ct_hr * 3600 ) / 60;
709  int ct_sec = cputime - ct_hr * 3600 - ct_min * 60;
710  printf( "Us_Mpi_Analysis has finished successfully"
711  " (Wall=%d:%02d:%02d Cpu=%d:%02d:%02d).\n"
712  , wt_hr, wt_min, wt_sec, ct_hr, ct_min, ct_sec );
713  fflush( stdout );
714 
715  if ( count_datasets < kc_iters )
716  {
717  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
718  + " MB, total run seconds " + QString::number( cputime )
719  + " (Reduced Datasets Count)" );
720  DbgLv(0) << "Finished: maxrss " << maxrssmb
721  << "MB, total run seconds " << cputime
722  << " (Reduced Datasets Count)";
723  }
724 
725  else if ( reduced_iter )
726  {
727  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
728  + " MB, total run seconds " + QString::number( cputime )
729  + " (Reduced MC Iterations)" );
730  DbgLv(0) << "Finished: maxrss " << maxrssmb
731  << "MB, total run seconds " << cputime
732  << " (Reduced MC Iterations)";
733 
734  }
735 
736  else
737  {
738  send_udp( "Finished: maxrss " + QString::number( maxrssmb )
739  + " MB, total run seconds " + QString::number( cputime ) );
740  DbgLv(0) << "Finished: maxrss " << maxrssmb
741  << "MB, total run seconds " << cputime;
742  }
743  }
744 
745  MPI_Finalize();
746  exit( exit_status );
747 }
748 
749 // Send udp
750 void US_MPI_Analysis::send_udp( const QString& message )
751 {
752  QByteArray msg;
753 
754  if ( my_rank == 0 )
755  { // Send UDP message from supervisor (or single-group master)
757 //if(mgroup_count>1) return; //*DEBUG*
759  QString jobid = db_name + "-" + requestID + ": ";
760  msg = QString( jobid + message ).toAscii();
761  socket->writeDatagram( msg.data(), msg.size(), server, port );
762  }
763 
764  else if ( group_rank == 0 )
765  { // For pm group master, forward message to supervisor
766  int super = 0;
767  QString gpfix = QString( "(pmg %1) " ).arg( my_group );
768  msg = QString( gpfix + message ).toAscii();
769  int size = msg.size();
770 
771  MPI_Send( &size,
772  sizeof( int ),
773  MPI_BYTE,
774  super,
775  UDPSIZE,
776  MPI_COMM_WORLD );
777 
778  MPI_Send( msg.data(),
779  size,
780  MPI_BYTE,
781  super,
782  UDPMSG,
783  MPI_COMM_WORLD );
784  }
785 }
786 
787 long int US_MPI_Analysis::max_rss( void )
788 {
789  return US_Memory::rss_max( maxrss );
790 }
791 
792 void US_MPI_Analysis::abort( const QString& message, int error )
793 {
794  if ( my_rank == 0 )
795  { // Send abort message to both stdout and udp
796  US_Sleep::msleep( 1100 ); // Delay a bit so rank 0 completes first
797  printf( "\n ***ABORTED***: %s\n\n", message.toAscii().data() );
798  fflush( stdout );
799  send_udp( "Abort. " + message );
800  }
801 
802  MPI_Barrier( MPI_COMM_WORLD ); // Sync everybody up so stdout msg first
803 
804  if ( my_rank == 0 )
805  { // Create archive file of outputs and remove other output files
806  update_outputs( true );
807  }
808 
809  DbgLv(0) << my_rank << ": " << message;
810  MPI_Abort( MPI_COMM_WORLD, error );
811  exit( error );
812 }
813 
814 // Create output statistics file
815 void US_MPI_Analysis::stats_output( int walltime, int cputime, int maxrssmb,
816  QDateTime submitTime, QDateTime startTime, QDateTime endTime )
817 {
818  QString fname = "job_statistics.xml";
819  QFile file( fname );
820 
821  if ( ! file.open( QIODevice::WriteOnly | QIODevice::Text ) )
822  {
823  DbgLv(0) << "*WARNING* Unable to open job_statistics.xml file for write.";
824  return;
825  }
826 
827  QString sSubmitTime = submitTime.toString( Qt::ISODate ).replace( "T", " " );
828  QString sStartTime = startTime .toString( Qt::ISODate ).replace( "T", " " );
829  QString sEndTime = endTime .toString( Qt::ISODate ).replace( "T", " " );
830 
831  QXmlStreamWriter xml( &file );
832 
833  xml.setAutoFormatting( true );
834  xml.writeStartDocument();
835  xml.writeDTD ( "<!DOCTYPE US_Statistics>" );
836  xml.writeStartElement ( "US_JobStatistics" );
837  xml.writeAttribute ( "version", "1.0" );
838  xml.writeStartElement ( "statistics" );
839  xml.writeAttribute ( "walltime", QString::number( walltime ) );
840  xml.writeAttribute ( "cputime", QString::number( cputime ) );
841  xml.writeAttribute ( "cpucount", QString::number( proc_count ) );
842  xml.writeAttribute ( "maxmemory", QString::number( maxrssmb ) );
843  xml.writeAttribute ( "cluster", cluster );
844  xml.writeAttribute ( "analysistype", analysis_type );
845  xml.writeEndElement (); // statistics
846 
847  xml.writeStartElement ( "id" );
848  xml.writeAttribute ( "requestguid", requestGUID );
849  xml.writeAttribute ( "dbname", db_name );
850  xml.writeAttribute ( "requestid", requestID );
851  xml.writeAttribute ( "submittime", sSubmitTime );
852  xml.writeAttribute ( "starttime", sStartTime );
853  xml.writeAttribute ( "endtime", sEndTime );
854  xml.writeAttribute ( "maxwalltime", QString::number( max_walltime ) );
855  xml.writeAttribute ( "groupcount", QString::number( mgroup_count ) );
856  xml.writeEndElement (); // id
857  xml.writeEndElement (); // US_JobStatistics
858  xml.writeEndDocument ();
859 
860  file.close();
861 
862  // Add the file name of the statistics file to the output list
863  QFile f( "analysis_files.txt" );
864  if ( ! f.open( QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append ) )
865  {
866  abort( "Could not open 'analysis_files.txt' for writing", -1 );
867  return;
868  }
869 
870  QTextStream out( &f );
871  out << fname << "\n";
872  f.close();
873  return;
874 }
875 
876 // Insure vertexes of a bucket do not exceed physically possible limits
878 {
879  if ( buk.x_min > 0.0 )
880  { // All-positive s's start at 0.1 at least (other attribs are different)
881  double xmin = 0.1;
882  xmin = ( attr_x == ATTR_K ) ? 1.0 : xmin;
883  xmin = ( attr_x == ATTR_V ) ? 0.01 : xmin;
884  xmin = ( attr_x == ATTR_D ) ? 1.e-9 : xmin;
885  xmin = ( attr_x == ATTR_F ) ? 1.e-9 : xmin;
886  double xinc = xmin / 1000.0;
887  buk.x_min = qMax( xmin, buk.x_min );
888  buk.x_max = qMax( ( buk.x_min + xinc ), buk.x_max );
889  }
890 
891  else if ( buk.x_max <= 0.0 )
892  { // All-negative s's end at -0.1 at most
893  buk.x_max = qMin( -0.1, buk.x_max );
894  buk.x_min = qMin( ( buk.x_max - 0.0001 ), buk.x_min );
895  }
896 
897  else if ( ( buk.x_min + buk.x_max ) >= 0.0 )
898  { // Mostly positive clipped to all positive starting at 0.1
899  buk.x_min = 0.1;
900  buk.x_max = qMax( 0.2, buk.x_max );
901  }
902 
903  else
904  { // Mostly negative clipped to all negative ending at -0.1
905  buk.x_min = qMin( -0.2, buk.x_min );
906  buk.x_max = -0.1;
907  }
908 
909  if ( attr_y == ATTR_K )
910  { // If y-type is "ff0", insure minimum is at least 1.0
911  buk.y_min = qMax( 1.0, buk.y_min );
912  buk.y_max = qMax( ( buk.y_min + 0.0001 ), buk.y_max );
913  }
914 }
915 
916 // Get the A,b matrices for a data set
917 void US_MPI_Analysis::dset_matrices( int dsx, int nsolutes,
918  QVector< double >& nnls_a, QVector< double >& nnls_b )
919 {
920  int colx = ds_startx[ dsx ];
921  int ndspts = ds_points[ dsx ];
922  double concen = concentrations[ dsx ];
923  int kk = 0;
924  int jj = colx;
925  int inccx = total_points - ndspts;
926 
927  // Copy the data set portion of the global A matrix
928  for ( int cc = 0; cc < nsolutes; cc++ )
929  {
930  for ( int pp = 0; pp < ndspts; pp++, kk++, jj++ )
931  {
932  nnls_a[ kk ] = gl_nnls_a[ jj ];
933  }
934 
935  jj += inccx;
936  }
937 
938  // Copy and restore scaling for data set portion of the global b matrix
939  jj = colx;
940 
941  for ( kk = 0; kk < ndspts; kk++, jj++ )
942  {
943  nnls_b[ kk ] = gl_nnls_b[ jj ] * concen;
944  }
945 }
946