UltraScan III
pcsa_worker.cpp
Go to the documentation of this file.
1 #include "us_mpi_analysis.h"
2 #include "us_constants.h"
3 #include "us_math2.h"
4 #include "us_astfem_rsa.h"
5 #include "us_simparms.h"
6 #include "us_sleep.h"
7 
9 {
10  bool repeat_loop = true;
11  MPI_Job job;
12  MPI_Status status;
13 DbgLv(1) << "w:" << my_rank << ": pcsa_worker IN";
14 
15  // Use 4 here because the master will be reading 4 with the
16  // same instruction when reading ::READY or ::RESULTS.
17  int x[ 4 ] = { 0, 0, 0, 0 };
18 
19  while ( repeat_loop )
20  {
21  MPI_Send( x, // Basically don't care
22  4,
23  MPI_INT,
26  my_communicator ); // let master know we are ready
27 //if(my_rank==1)
28 DbgLv(1) << "w:" << my_rank << ": ready sent";
29 
30  // Blocking -- Wait for instructions
31  MPI_Recv( &job, // get masters' response
32  sizeof( job ),
33  MPI_BYTE,
37  &status ); // status not used
38 //if(my_rank==1)
39 DbgLv(1) << "w:" << my_rank << ": job_recvd length" << job.length
40  << "command" << job.command;
41 
42  int offset = job.dataset_offset;
43  int dataset_count = job.dataset_count;
44  int job_length = job.length;
45  int mc_iter = job.solution;
46  US_DataIO::EditedData* edata = &data_sets[ offset ]->run_data;
47  meniscus_value = edata->meniscus;
48 DbgLv(1) << "w:" << my_rank << ": offs cnt" << offset << dataset_count
49  << "menisc" << meniscus_value;
50  int scan_count = edata->scanCount();
51  int radius_points = edata->pointCount();
52  int ds_points = scan_count * radius_points;
53 
54  data_sets[ offset ]->simparams.meniscus = meniscus_value;
55 
56  switch( job.command )
57  {
58  case MPI_Job::PROCESS: // Process solutes
59  {
60 DbgLv(1) << "w:" << my_rank << ":Recv:PROCESS";
62  simulation_values.noisflag =
63  ( parameters[ "tinoise_option" ].toInt() > 0 ? 1 : 0 )
64  + ( parameters[ "rinoise_option" ].toInt() > 0 ? 2 : 0 );
65  simulation_values.dbg_level = dbg_level;
66  simulation_values.dbg_timing = dbg_timing;
67 
68 //if(my_rank==1)
69 DbgLv(1) << "w:" << my_rank << ": sols size" << job.length;
70  simulation_values.zsolutes.resize( job.length );
71 
72  MPI_Recv( simulation_values.zsolutes.data(), // Get solutes
73  job_length * zsolut_doubles,
74  MPI_DOUBLE,
78  &status );
79 
80  max_rss();
81 //*DEBUG*
82 //if(dbg_level>0 && my_rank==1)
83 //if(my_rank==1)
84 {
85  int nn = simulation_values.zsolutes.size() - 1;
86  int mm = nn/2;
87  DbgLv(1) << "w:" << my_rank << ": offs dscnt" << offset << dataset_count
88  << "vbar s20wc bott"
89  << data_sets[offset]->vbar20
90  << data_sets[offset]->s20w_correction
91  << data_sets[offset]->centerpiece_bottom;
92  DbgLv(1) << "w:" << my_rank << ": sol0 solm soln"
93  << simulation_values.zsolutes[0].x << simulation_values.zsolutes[0].y
94  << simulation_values.zsolutes[mm].x << simulation_values.zsolutes[mm].y
95  << simulation_values.zsolutes[nn].x << simulation_values.zsolutes[nn].y;
96 }
97 //*DEBUG*
98 
99  calc_residuals( offset, dataset_count, simulation_values );
100 
101 //*DEBUG*
102 //if(my_rank==1)
103 {
104  int nn = simulation_values.zsolutes.size() - 1;
105  int mm = nn/2;
106  DbgLv(1) << "w:" << my_rank << ": nso"
107  << simulation_values.zsolutes.size() << "c:sol0 solm soln"
108  << simulation_values.zsolutes[ 0].x << simulation_values.zsolutes[ 0].y
109  << simulation_values.zsolutes[mm].x << simulation_values.zsolutes[mm].y
110  << simulation_values.zsolutes[nn].x << simulation_values.zsolutes[nn].y;
111 }
112 //*DEBUG*
113  // Tell master we are sending back results
114  int sizes[ 4 ] = { simulation_values.zsolutes.size(),
115  simulation_values.ti_noise.size(),
116  simulation_values.ri_noise.size(),
117  max_rss() };
118 
119 DbgLv(1) << "w:" << my_rank << ": result sols size" << sizes[0]
120  << "max_rss" << sizes[ 3 ];
121 //*DEBUG*
122 if(dbg_level==0 && my_rank==1) {
123 DbgLv(1) << "w:" << my_rank << ": result sols size" << sizes[0]
124  << "nsscan" << simulation_values.sim_data.scanCount();
125 }
126 //*DEBUG*
127  MPI_Send( sizes,
128  4,
129  MPI_INT,
130  MPI_Job::MASTER,
132  my_communicator );
133 
134  // Send back to master all of simulation_values
135  MPI_Send( simulation_values.zsolutes.data(),
136  simulation_values.zsolutes.size() * zsolut_doubles,
137  MPI_DOUBLE,
140  my_communicator );
141 
142  MPI_Send( &simulation_values.variance,
143  1,
144  MPI_DOUBLE,
145  MPI_Job::MASTER,
146  MPI_Job::TAG0,
147  my_communicator );
148 
149  MPI_Send( simulation_values.variances.data(),
150  dataset_count,
151  MPI_DOUBLE,
154  my_communicator );
155 
156  MPI_Send( simulation_values.ti_noise.data(),
157  simulation_values.ti_noise.size(),
158  MPI_DOUBLE,
161  my_communicator );
162 
163  MPI_Send( simulation_values.ri_noise.data(),
164  simulation_values.ri_noise.size(),
165  MPI_DOUBLE,
168  my_communicator );
169  }
170 
171  break;
172 
173  case MPI_Job::PROCESS_MC: // Process solutes for monte carlo
174  {
175 DbgLv(1) << "w:" << my_rank << ":Recv:PROCESS_MC" << "mc_iter" << mc_iter;
176  double varrmsd = 0.0;
178  simulation_values.noisflag = 0;
179  simulation_values.dbg_level = dbg_level;
180  simulation_values.dbg_timing = dbg_timing;
181 
182 //if(my_rank==1)
183 DbgLv(1) << "w:" << my_rank << ": sols size" << job.length;
184  simulation_values.zsolutes.resize( job.length );
185 
186  MPI_Recv( simulation_values.zsolutes.data(), // Get solutes
187  job_length * zsolut_doubles,
188  MPI_DOUBLE,
192  &status );
193 
194  // Construct a new MC data set with randomized noise
195  int index = 0;
196  for ( int ss = 0; ss < scan_count; ss++ )
197  {
198  for ( int rr = 0; rr < radius_points; rr++, index++ )
199  {
200  double vari = US_Math2::box_muller( 0.0, sigmas[ index ] );
201  double datout = sim_data->value( ss, rr ) + vari;
202  varrmsd += sq( vari );
203 
204  edata->setValue( ss, rr, datout );
205  }
206  }
207 
208  varrmsd = sqrt( varrmsd / (double)ds_points );
209  qDebug() << " Box_Muller Variation RMSD"
210  << QString::number( varrmsd, 'f', 7 )
211  << " for MC_Iteration" << mc_iter;
212 
213  max_rss();
214 //*DEBUG*
215 //if(dbg_level>0 && my_rank==1)
216 //if(my_rank==1)
217 {
218  int nn = simulation_values.zsolutes.size() - 1;
219  int mm = nn/2;
220  DbgLv(1) << "w:" << my_rank << ": offs dscnt" << offset << dataset_count
221  << "vbar s20wc bott"
222  << data_sets[offset]->vbar20
223  << data_sets[offset]->s20w_correction
224  << data_sets[offset]->centerpiece_bottom;
225  DbgLv(1) << "w:" << my_rank << ": sol0 solm soln"
226  << simulation_values.zsolutes[ 0].x << simulation_values.zsolutes[ 0].y
227  << simulation_values.zsolutes[mm].x << simulation_values.zsolutes[mm].y
228  << simulation_values.zsolutes[nn].x << simulation_values.zsolutes[nn].y;
229 }
230 //*DEBUG*
231 
232  calc_residuals( offset, dataset_count, simulation_values );
233 
234  qDebug() << "Base-Sim RMSD" << sqrt( simulation_values.variance )
235  << " for MC_Iteration" << mc_iter;
236 
237 //*DEBUG*
238 //if(my_rank==1)
239 {
240  int nn = simulation_values.zsolutes.size() - 1;
241  int mm = nn/2;
242  DbgLv(1) << "w:" << my_rank << ": nso"
243  << simulation_values.zsolutes.size() << "c:sol0 solm soln"
244  << simulation_values.zsolutes[ 0].x << simulation_values.zsolutes[ 0].y
245  << simulation_values.zsolutes[mm].x << simulation_values.zsolutes[mm].y
246  << simulation_values.zsolutes[nn].x << simulation_values.zsolutes[nn].y;
247 }
248 //*DEBUG*
249  // Tell master we are sending back results
250  int sizes[ 4 ] = { simulation_values.zsolutes.size(),
251  0,
252  0,
253  max_rss() };
254 
255 DbgLv(1) << "w:" << my_rank << ": result sols size" << sizes[0]
256  << "max_rss" << sizes[ 3 ];
257 //*DEBUG*
258 if(dbg_level==0 && my_rank==1) {
259 DbgLv(1) << "w:" << my_rank << ": result sols size" << sizes[0]
260  << "nsscan" << simulation_values.sim_data.scanCount();
261 }
262 //*DEBUG*
263 DbgLv(1) << "w:" << my_rank << ":Send:RESULTS_MC sizes"
264  << sizes[0] << sizes[1] << sizes[2] << sizes[3];
265  MPI_Send( sizes,
266  4,
267  MPI_INT,
268  MPI_Job::MASTER,
270  my_communicator );
271 
272  // Send back to master all of simulation_values
273  MPI_Send( simulation_values.zsolutes.data(),
274  sizes[ 0 ] * zsolut_doubles,
275  MPI_DOUBLE,
278  my_communicator );
279 
280  MPI_Send( &simulation_values.variance,
281  1,
282  MPI_DOUBLE,
283  MPI_Job::MASTER,
284  MPI_Job::TAG0,
285  my_communicator );
286 
287  MPI_Send( simulation_values.variances.data(),
288  dataset_count,
289  MPI_DOUBLE,
292  my_communicator );
293  }
294  break;
295 
296  case MPI_Job::NEWDATA: // Reset data for Monte Carlo or global fit
297  {
298 //if(my_rank==1)
299 DbgLv(1) << "w:" << my_rank << ":Recv:NEWDATA joblen" << job_length;
300  if ( is_global_fit && mc_iter < 3 && my_rank < 3 )
301  { // For global fits, check the memory requirements
302  long memused = max_rss();
303  long memdata = job_length * sizeof( double );
304  int grid_reps = qMax( parameters[ "uniform_grid" ].toInt(), 1 );
305  double s_pts = 60.0;
306  double ff0_pts = 60.0;
307  if ( parameters.contains( "s_grid_points" ) )
308  s_pts = parameters[ "s_grid_points" ].toDouble();
309  else if ( parameters.contains( "s_resolution" ) )
310  s_pts = parameters[ "s_resolution" ].toDouble() * grid_reps;
311  if ( parameters.contains( "ff0_grid_points" ) )
312  ff0_pts = parameters[ "ff0_grid_points" ].toDouble();
313  else if ( parameters.contains( "ff0_resolution" ) )
314  ff0_pts = parameters[ "ff0_resolution" ].toDouble() * grid_reps;
315  int nsstep = (int)( s_pts );
316  int nkstep = (int)( ff0_pts );
317  int maxsols = nsstep * nkstep;
318  long memamatr = memdata * maxsols;
319  long membmatr = memdata;
320  long memneed = memdata + memamatr + membmatr;
321  const double mb_bytes = ( 1024. * 1024. );
322  const double gb_bytes = ( mb_bytes * 1024. );
323  double gb_need = (double)memneed / gb_bytes;
324  gb_need = qRound( gb_need * 1000.0 ) * 0.001;
325  double gb_used = (double)memused / mb_bytes;
326  gb_used = qRound( gb_used * 1000.0 ) * 0.001;
327  long pgavail = sysconf( _SC_PHYS_PAGES );
328  long pgsize = sysconf( _SC_PAGE_SIZE );
329  long memavail = pgavail * pgsize;
330  double gb_avail = (double)memavail / gb_bytes;
331  gb_avail = qRound( gb_avail * 1000.0 ) * 0.001;
332  long pgcurav = sysconf( _SC_AVPHYS_PAGES );
333  long memcurav = pgcurav * pgsize;
334  double gb_curav = (double)memcurav / gb_bytes;
335  gb_curav = qRound( gb_curav * 1000.0 ) * 0.001;
336 
337  qDebug() << "++ Worker" << my_rank << ": MC iteration"
338  << mc_iter << ": Memory Profile :"
339  << "\n Maximum memory used to this point" << memused
340  << "\n Composite data memory needed" << memdata
341  << "\n Maximum subgrid solute count" << maxsols
342  << "\n NNLS A matrix memory needed" << memamatr
343  << "\n NNLS B matrix memory needed" << membmatr
344  << "\n Total memory (GB) used" << gb_used
345  << "\n Total memory (GB) needed" << gb_need
346  << "\n Total memory (GB) available" << gb_avail
347  << "\n Memory (GB) currently available" << gb_curav;
348  }
349 
350  mc_data.resize( job_length );
351 
352  if ( mc_data.size() != job_length )
353  {
354  DbgLv(0) << "*ERROR* mc_data.size() job_length"
355  << mc_data.size() << job_length;
356  }
357 
358  MPI_Barrier( my_communicator );
359 
360 if(my_rank==1 || my_rank==11)
361 DbgLv(1) << "newD:" << my_rank << " scld/newdat rcv : offs dsknt"
362  << offset << dataset_count << "joblen" << job_length;
363 double dsum=0.0;
364  // This is a receive
365  MPI_Bcast( mc_data.data(),
366  job_length,
367  MPI_DOUBLE,
369  my_communicator );
370 
371 
372  if ( is_global_fit && dataset_count == 1 )
373  { // For global update to scaled data, extra value is new ODlimit
374  job_length--;
375  data_sets[ offset ]->run_data.ODlimit = mc_data[ job_length ];
376 if( (my_rank==1||my_rank==11) )
377 DbgLv(1) << "newD:" << my_rank << ":offset ODlimit" << offset
378  << data_sets[ offset ]->run_data.ODlimit;
379  }
380 
381  bool is_simdat = ( mc_iter >= 10000 );
382  int index = 0;
383 DbgLv(1) << "newD:" << my_rank << ": is_simdat" << is_simdat;
384 
385  if ( is_simdat )
386  { // If simulation,residuals for MC; save and get sigmas
387  US_DataIO::EditedData* edata = &data_sets[ offset ]->run_data;
388  sim_data = &sim_data1;
389  US_AstfemMath::initSimData( sim_data1, *edata, 0.0 );
390  int scan_count = edata->scanCount();
391  int radius_points = edata->pointCount();
392  sigmas.clear();
393 
394  for ( int ss = 0; ss < scan_count; ss++ ) // Save sim_data
395  for ( int rr = 0; rr < radius_points; rr++ )
396  sim_data->setValue( ss, rr, mc_data[ index++ ] );
397 
398  for ( int ss = 0; ss < scan_count; ss++ ) // Get sigmas
399  for ( int rr = 0; rr < radius_points; rr++ )
400  sigmas << qAbs( mc_data[ index++ ] );
401 DbgLv(1) << "newD:" << my_rank << ": index mcsize" << index << mc_data.size();
402 
403  MPI_Barrier( my_communicator );
404 
405  // Stagger restart of processing for each worker
406  US_Sleep::msleep( my_rank * 200 );
407  }
408 
409  else
410  { // Straight replacement of experiment data
411  for ( int ee = offset; ee < offset + dataset_count; ee++ )
412  {
413  US_DataIO::EditedData* edata = &data_sets[ ee ]->run_data;
414 
415  int scan_count = edata->scanCount();
416  int radius_points = edata->pointCount();
417 
418 int indxh=((scan_count/2)*radius_points)+(radius_points/2);
419  for ( int ss = 0; ss < scan_count; ss++ )
420  {
421  for ( int rr = 0; rr < radius_points; rr++, index++ )
422  {
423  edata->setValue( ss, rr, mc_data[ index ] );
424 dsum+=edata->value(ss,rr);
425 if( (my_rank==1||my_rank==11)
426  && (index<5 || index>(job_length-6) || (index>(indxh-4)&&index<(indxh+3))) )
427 DbgLv(1) << "newD:" << my_rank << ":index" << index << "edat" << edata->value(ss,rr)
428  << "ee" << ee;
429  }
430  }
431  }
432  }
433 if(my_rank==1 || my_rank==11)
434 DbgLv(1) << "newD:" << my_rank << " length index" << job_length << index
435  << "dsum" << dsum;
436  }
437 
438  break;
439 
440  default:
441  repeat_loop = false;
442  break;
443  } // switch
444  } // repeat_loop
445 }
446