UltraScan III
ga_master.cpp
Go to the documentation of this file.
1 #include "us_mpi_analysis.h"
2 #include "us_util.h"
3 #include "us_math2.h"
4 #include "us_settings.h"
5 
7 {
8  current_dataset = 0;
10  max_depth = 0;
11  calculated_solutes.clear();
12 
13  // Set noise and debug flags
14  simulation_values.noisflag = parameters[ "tinoise_option" ].toInt() > 0 ?
15  1 : 0;
16  simulation_values.noisflag += parameters[ "rinoise_option" ].toInt() > 0 ?
17  2 : 0;
18  simulation_values.dbg_level = dbg_level;
19  simulation_values.dbg_timing = dbg_timing;
20 DbgLv(0) << "DEBUG_LEVEL" << simulation_values.dbg_level;
21 
22  // Initialize best fitness
23  best_genes .reserve( gcores_count );
24  best_fitness.reserve( gcores_count );
25 
26  Fitness empty_fitness;
27  empty_fitness.fitness = LARGE;
28 
29  Gene working_gene( buckets.count(), US_Solute() );
30 
31  // Initialize arrays
32  for ( int i = 0; i < gcores_count; i++ )
33  {
34  best_genes << working_gene;
35 
36  empty_fitness.index = i;
37  best_fitness << empty_fitness;
38  }
39 
40  QDateTime time = QDateTime::currentDateTime();
41 
42  // Handle Monte Carlo iterations. There will always be at least 1.
43  while ( true )
44  {
46 
47  qSort( best_fitness );
48  simulation_values.solutes = best_genes[ best_fitness[ 0 ].index ];
49  int nisols = simulation_values.solutes.size();
50 DbgLv(1) << "GaMast: sols size" << nisols << "buck size" << buckets.size()
51  << "dset size" << data_sets.size();
52 DbgLv(1) << "GaMast: sol0.s .k .v .d" << simulation_values.solutes[0].s
53  << simulation_values.solutes[0].k << simulation_values.solutes[0].v
54  << simulation_values.solutes[0].d;
55 
56  solutes_from_gene( simulation_values.solutes, nisols );
57 DbgLv(1) << "GaMast: osol0.s .k .v .d" << simulation_values.solutes[0].s
58  << simulation_values.solutes[0].k << simulation_values.solutes[0].v
59  << simulation_values.solutes[0].d;
60 
62 DbgLv(1) << "GaMast: calc_resids return - calcsize vari"
63  << simulation_values.solutes.size() << simulation_values.variance;
64 DbgLv(1) << "GaMast: csol0.s .k .v .d" << simulation_values.solutes[0].s
65  << simulation_values.solutes[0].k << simulation_values.solutes[0].v
66  << simulation_values.solutes[0].d;
67 
68  qSort( simulation_values.solutes );
69 
70  // Convert given solute points to s,k for model output
71  double vbar20 = data_sets[ 0 ]->vbar20;
72  QList< int > attrxs;
73  attrxs << attr_x << attr_y << attr_z;
74  bool have_s = ( attrxs.indexOf( ATTR_S ) >= 0 );
75  bool have_k = ( attrxs.indexOf( ATTR_K ) >= 0 );
76  bool have_w = ( attrxs.indexOf( ATTR_W ) >= 0 );
77  bool have_d = ( attrxs.indexOf( ATTR_D ) >= 0 );
78  bool have_f = ( attrxs.indexOf( ATTR_F ) >= 0 );
79  bool vary_v = ( attr_z != ATTR_V );
80 
81  for ( int gg = 0; gg < simulation_values.solutes.size(); gg++ )
82  {
83  US_Solute* solu = &simulation_values.solutes[ gg ];
85  mcomp.s = have_s ? solu->s : 0.0;
86  mcomp.f_f0 = have_k ? solu->k : 0.0;
87  mcomp.mw = have_w ? solu->d : 0.0;
88  mcomp.vbar20 = vary_v ? solu->v : vbar20;
89  mcomp.D = have_d ? solu->d : 0.0;
90  mcomp.f = have_f ? solu->d : 0.0;
91 
93 
94  solu->s = mcomp.s;
95  solu->k = mcomp.f_f0;
96  solu->v = mcomp.vbar20;
97  }
98 
99  calculated_solutes.clear();
101 
102  if ( is_global_fit )
103  {
104  write_global();
105  }
106  else
107  {
108  write_output();
109  }
110 
111 DbgLv(1) << "GaMast: mc_iter iters" << mc_iteration << mc_iterations;
112  mc_iteration++;
113  if ( mc_iterations > 1 )
114  {
115  qDebug() << "Fit RMSD" << sqrt( simulation_values.variance )
116  << " of MC_Iteration" << mc_iteration;
117  if ( mc_iteration < mc_iterations )
118  {
119  // Set scaled_data the first time
120  if ( mc_iteration <= mgroup_count )
121  {
122  scaled_data = simulation_values.sim_data;
123  }
124 
126 
127 DbgLv(1) << "GaMast: set_gaMC call";
129 DbgLv(1) << "GaMast: set_gaMC return";
130  }
131  else
132  break;
133  }
134  else
135  {
136  qDebug() << "Final Fit RMSD" << sqrt( simulation_values.variance );
137 
138  if ( is_composite_job )
139  {
140  DbgLv(0) << my_rank << ": Dataset" << current_dataset + 1
141  << " : model output.";
142  send_udp( "Dataset " + QString::number( current_dataset + 1 )
143  + " : analysis complete." );
144 
145  update_outputs();
146 
147  current_dataset++;
148  if ( current_dataset < count_datasets )
149  {
150  continue;
151  }
152  }
153  break;
154  }
155  }
156 
157  DbgLv(0) << my_rank << ": Master signalling FINISHED to all Demes";
158 
159  MPI_Job job;
160 
161  // Send finish to workers ( in the tag )
162  for ( int worker = 1; worker <= my_workers; worker++ )
163  {
164  MPI_Send( &job, // MPI #0
165  sizeof( job ),
166  MPI_BYTE,
167  worker,
168  FINISHED,
169  my_communicator );
170  }
171 }
172 
174 {
175 // static const double fit_div = 1.0e-9;
176 // static const double fit_mul = 1.0e+9;
177  static const double DIGIT_FIT = 1.0e+4;
178  static const int max_same_count = my_workers * 5;
179  static const int min_generation = 10;
180  int avg_generation = 0;
181  bool early_termination = false;
182  int fitness_same_count = 0;
183  double best_overall_fitness = LARGE;
184  int tag;
185  int workers = my_workers;
186 DbgLv(1) << "ga_master start loop: gcores_count fitsize" << gcores_count
187  << best_fitness.size() << "best_overall" << best_overall_fitness;
188 
189  // Reset best fitness for each worker
190  for ( int i = 0; i < gcores_count; i++ )
191  {
192  best_fitness[ i ].fitness = LARGE;
193  best_fitness[ i ].index = i;
194  }
195 
196  QList < Gene > emigres; // Holds genes passed as emmigrants
197  QVector< int > v_generations( gcores_count, 0 );
198  int sum = 0;
199  int avg = 0;
200  long rsstotal = 0L;
201  double fit_power = 5;
202  double fit_digit = 1.0e4;
203  double fitness_round = 1.0e5;
204 
205 
206  while ( workers > 0 )
207  {
208  MPI_GA_MSG msg;
209  MPI_Status status;
210  int worker;
211 
212  MPI_Recv( &msg, // Get a message MPI #1
213  sizeof( msg ),
214  MPI_BYTE,
215  MPI_ANY_SOURCE,
216  MPI_ANY_TAG,
218  &status );
219 
220  worker = status.MPI_SOURCE;
221 
222 QString g;
223 QString s;
224 
225  max_rss();
226 
227  switch ( status.MPI_TAG )
228  {
229  case GENERATION:
230  v_generations[ worker ] = msg.generation;
231 
232  sum = 0;
233  for ( int i = 1; i <= my_workers; i++ )
234  sum += v_generations[ i ];
235 
236  avg = qRound( sum / my_workers ) + 1;
237 
238  if ( avg > avg_generation )
239  {
240  avg_generation = avg;
241  int mc_iter = mgroup_count < 2 ? ( mc_iteration + 1 )
242  : mc_iteration;
243 
244  QString progress =
245  "Avg. Generation: " + QString::number( avg_generation );
246 
247  if ( count_datasets > 1 )
248  {
249  if ( datasets_to_process == 1 )
250  progress += "; Dataset: "
251  + QString::number( current_dataset + 1 )
252  + " of " + QString::number( count_datasets );
253  else
254  progress += "; Datasets: "
255  + QString::number( datasets_to_process );
256  }
257 
258  progress += "; MonteCarlo: " + QString::number( mc_iter );
259 
260  send_udp( progress );
261  }
262 
263  // Get the best gene for the current generation from the worker
264  MPI_Recv( best_genes[ worker ].data(), // MPI #2
265  buckets.size() * solute_doubles,
266  MPI_DOUBLE,
267  worker,
268  GENE,
270  MPI_STATUS_IGNORE );
271 
272  max_rss();
273 
274  // Compute a current-deme best fitness value that is rounded
275  // to 4 significant digits
276  fit_power = (double)qRound( log10( msg.fitness ) );
277  fit_digit = pow( 10.0, -fit_power ) * DIGIT_FIT;
278  fitness_round = (double)qRound64( msg.fitness * fit_digit )
279  / fit_digit;
280 
281 DbgLv(1) << " MAST: work" << worker << "fit msg,round,bestw,besto"
282  << msg.fitness << fitness_round << best_fitness[worker].fitness
283  << best_overall_fitness;
284  // Set deme's best fitness
285  if ( fitness_round < best_fitness[ worker ].fitness )
286  best_fitness[ worker ].fitness = fitness_round;
287 g = "";
288 for ( int i = 0; i < buckets.size(); i++ )
289  g += s.sprintf( "(%.3f,%.3f)", best_genes[ worker ][ i ].s, best_genes[ worker ][ i ].k);
290 DbgLv(1) << "master: worker/fitness/best gene" << worker << msg.fitness << g;
291 
292  if ( ! early_termination )
293  { // Handle normal pre-early-termination updates
294  if ( avg_generation == 1 && mc_iterations == 1 &&
295  best_overall_fitness == LARGE )
296  { // Report first best-fit RMSD
297  DbgLv(0) << "First Best Fit RMSD" << sqrt( fitness_round );
298  }
299 DbgLv(1) << " MAST: work" << worker << "fit besto,round" << best_overall_fitness << fitness_round
300  << "fit_power fit_digit msgfit" << fit_power << fit_digit << msg.fitness;
301 
302  if ( fitness_round < best_overall_fitness )
303  { // Update over-all best fitness value (rounded)
304  best_overall_fitness = fitness_round;
305  fitness_same_count = 0;
306  }
307  else
308  { // Bump the count of consecutive same best overall fitness
309  fitness_same_count++;
310  }
311 
312 
313  if ( fitness_same_count > max_same_count &&
314  avg_generation > min_generation )
315  { // Mark early termination at threshold same-fitness count
316  DbgLv(0) << "Fitness has not improved in the last"
317  << fitness_same_count
318  << "deme results - Early Termination.";
319  early_termination = true;
320  }
321 
322  }
323 DbgLv(1) << " best_overall_fitness" << best_overall_fitness
324  << "fitness_same_count" << fitness_same_count
325  << " early_term?" << early_termination;
326 
327  // Tell the worker to either stop or continue
328  tag = early_termination ? FINISHED : GENERATION;
329 
330  MPI_Send( &msg, // MPI #3
331  0, // Only interested in the tag
332  MPI_BYTE,
333  worker,
334  tag,
335  my_communicator );
336  break;
337 
338  case FINISHED:
339  rsstotal += (long)msg.size;
340  workers--;
341  break;
342 
343  case EMMIGRATE:
344  {
345  // First get a set of genes as a concatenated vector.
346  int gene_count = msg.size;
347  int doubles_count = gene_count * buckets.size() *
349  QVector< double > emmigrants( doubles_count ) ;
350 
351  MPI_Recv( emmigrants.data(), // MPI #4
352  doubles_count,
353  MPI_DOUBLE,
354  worker,
355  EMMIGRATE,
357  MPI_STATUS_IGNORE );
358 
359  // Add the genes to the emmigres list
360  int solute = 0;
361  int solinc = solute_doubles - 2;
362 
363  for ( int i = 0; i < gene_count; i++ )
364  {
365  Gene gene;
366 
367  for ( int b = 0; b < buckets.size(); b++ )
368  {
369  double s = emmigrants[ solute++ ];
370  double k = emmigrants[ solute++ ];
371  gene << US_Solute( s, k );
372  solute += solinc; // Concentration, Vbar, DiffCoeff
373  }
374 
375  emigres << gene;
376  }
377 //*DEBUG*
378 //if(emigres[0][0].s<0.0)
379 // DbgLv(0) << "MAST: **GENE s" << emigres[0][0].s << " Emigrant";
380 //*DEBUG*
381 
382  max_rss();
383 
384  // Don't send any back if the pool is too small
385  if ( emigres.size() < gene_count * 5 ) doubles_count = 0;
386 
387  // Get immigrants from emmigres
388  QVector< US_Solute > immigrants;
389 
390  if ( doubles_count > 0 )
391  {
392  // Prepare a vector of concatenated genes from the emmigrant list
393  for ( int i = 0; i < gene_count; i++ )
394  immigrants += emigres.takeAt( u_random( emigres.size() ) );
395  }
396 
397  MPI_Send( immigrants.data(), // MPI #5
398  doubles_count,
399  MPI_DOUBLE,
400  worker,
401  IMMIGRATE,
402  my_communicator );
403 //*DEBUG*
404 //if(immigrants[0].s<0.0)
405 // DbgLv(0) << "MAST: **GENE s" << immigrants[0].s << " Immigrant-to-send";
406 //*DEBUG*
407  break;
408  }
409  }
410 
411  max_rss();
412  }
413 
414 DbgLv(1) << "Master maxrss" << maxrss << " worker total rss" << rsstotal
415  << "rank" << my_rank;
416  maxrss += rsstotal;
417 
418  if ( early_termination )
419  { // Report when we have reached early termination of generations
420  int mc_iter = mgroup_count < 2 ? ( mc_iteration + 1 ) : mc_iteration;
421  DbgLv(0) << "Early termination at average generation" << avg
422  << ", MC" << mc_iter;
423  }
424 }
425 
427 {
428  // This is almost the same as 2dsa global_fit.
429  double concentration = 0.0;
430 
431  // The first dataset is done automatically.
432  for ( int solute = 0; solute < simulation_values.solutes.size(); solute++ )
433  {
434  concentration += simulation_values.solutes[ solute ].c;
435  }
436 
437  // Point to current dataset
438  US_DataIO::EditedData* data = &data_sets[ current_dataset ]->run_data;
439 
440  int scan_count = data->scanCount();
441  int radius_points = data->pointCount();
442  int index = 0;
443 
444  QVector< double > scaled_data( scan_count * radius_points );
445 
446  // Scale the data
447  for ( int s = 0; s < scan_count; s++ )
448  {
449  for ( int r = 0; r < radius_points; r++ )
450  {
451  scaled_data[ index++ ] = data->value( s, r ) / concentration;
452  }
453  }
454 
455  // Send the scaled data to the workers
456  MPI_Job job;
457  job.length = scaled_data.size();
459  job.dataset_count = 1;
460 
461  // Tell each worker that new data coming
462  // Can't use a broadcast because the worker is expecting a Send
463 
464  for ( int worker = 1; worker <= my_workers; worker++ )
465  {
466  MPI_Send( &job, // MPI #7
467  sizeof( MPI_Job ),
468  MPI_BYTE,
469  worker,
470  UPDATE,
471  my_communicator );
472  }
473 
474  // Get everybody synced up
475  MPI_Barrier( my_communicator );
476 
477  MPI_Bcast( scaled_data.data(), // MPI #8
478  scaled_data.size(),
479  MPI_DOUBLE,
481  my_communicator );
482 
483  // Go to the next dataset
484  current_dataset++;
485 
486  // If all datasets have been scaled, do all datasets from now on
487  if ( current_dataset >= count_datasets )
488  {
490  current_dataset = 0;
491  }
492 }
493 
495 {
496 DbgLv(1) << "sgMC: mciter" << mc_iteration;
497  // This is almost the same as 2dsa set_monteCarlo
498  if ( mc_iteration <= mgroup_count )
499  {
500  //meniscus_values << -1.0;
501  max_depth = 0; // Make the datasets compatible
502  calculated_solutes.clear();
503  calculated_solutes << best_genes[ best_fitness[ 0 ].index ];
504  int ncsols = calculated_solutes[ 0 ].size();
505 DbgLv(1) << "sgMC: bfgenes stored" << ncsols;
506 
507  solutes_from_gene( calculated_solutes[ 0 ], ncsols );
508 
509 DbgLv(1) << "sgMC: sol0 s" << calculated_solutes[0][0].s;
510  set_gaussians();
511 DbgLv(1) << "sgMC: gaussians set";
512  }
513 
514  mc_data.resize( total_points );
515  int index = 0;
516 
517  // Get a randomized variation of the concentrations
518  // Use a gaussian distribution with the residual as the standard deviation
519  for ( int e = 0; e < count_datasets; e++ )
520  {
521  US_DataIO::EditedData* data = &data_sets[ e ]->run_data;
522 
523  int scan_count = data->scanCount();
524  int radius_points = data->pointCount();
525 
526  for ( int s = 0; s < scan_count; s++ )
527  {
528  for ( int r = 0; r < radius_points; r++ )
529  {
530  double variation = US_Math2::box_muller( 0.0, sigmas[ index ] );
531  mc_data[ index ] = scaled_data.value( s, r ) + variation;
532  index++;
533  }
534  }
535  }
536 DbgLv(1) << "sgMC: mc_data set index" << index;
537 
538  // Broadcast Monte Carlo data to all workers
539  MPI_Job job;
541  job.length = total_points;
542  job.dataset_offset = 0;
544 DbgLv(1) << "sgMC: MPI send my_workers" << my_workers;
545 
546  // Tell each worker that new data coming
547  // Can't use a broadcast because the worker is expecting a Send
548  for ( int worker = 1; worker <= my_workers; worker++ )
549  {
550  MPI_Send( &job, // MPI #9
551  sizeof( job ),
552  MPI_BYTE,
553  worker,
554  UPDATE,
555  my_communicator );
556  }
557 
558  // Get everybody synced up
559 DbgLv(1) << "sgMC: MPI Barrier";
560  MPI_Barrier( my_communicator );
561 
562 DbgLv(1) << "sgMC: MPI Bcast";
563  MPI_Bcast( mc_data.data(), // MPI #10
564  total_points,
565  MPI_DOUBLE,
567  my_communicator );
568 }
569