Classdesc 3.44
classdescMP.h
Go to the documentation of this file.
1/*
2 @copyright Russell Standish 2000-2013
3 @author Russell Standish
4 This file is part of Classdesc
5
6 Open source licensed under the MIT license. See LICENSE for details.
7*/
8
12#ifndef CLASSDESCMP_H
13#define CLASSDESCMP_H
14
15#include "pack_base.h"
16
17#undef HAVE_MPI_CPP
18//#undef SEEK_SET
19//#undef SEEK_CUR
20//#undef SEEK_END
21#include <mpi.h>
22#include <stdexcept>
23#include <cstdio>
24#include <vector>
25
26namespace classdesc
27{
28
29#ifdef HETERO
30 /* Use XDR machine independent packing is cluster is heterogeneous*/
31 typedef xdr_pack MPIbuf_base;
32#else
33 typedef pack_t MPIbuf_base;
34#endif
35
37 class send
38 {
39 send();
40 public:
41 int proc, tag;
42 send(int proc, int tag=0): proc(proc), tag(tag) {}
43 };
44
45
47 class isend
48 {
49 isend();
50 public:
51 int proc, tag;
53 isend(int proc, int tag=0): proc(proc), tag(tag) {}
54 };
55
56
58 class bcast
59 {
60 bcast();
61 public:
62 int root;
64 bcast(int root): root(root) {}
65 };
66
68 class mark {};
69
70 class MPIbuf_array;
71
73
75 class MPIbuf: public MPIbuf_base
76 {
77 int *offsets;
78 unsigned offsctr;
79 /* MPI_Finalized only available in MPI-2 standard */
80 bool MPI_running()
81 {
82 int fi, ff=0; MPI_Initialized(&fi);
83#if (defined(MPI_VERSION) && MPI_VERSION>1 || defined(MPICH_NAME))
84 MPI_Finalized(&ff);
85#endif
86 return fi&&!ff;
87 }
88 MPI_Request request;
89 friend class MPIbuf_array;
90 public:
92 MPI_Comm Communicator;
93
94 unsigned myid();
95 unsigned nprocs();
97
100 int proc, tag; /* store status of receives */
101
102 MPIbuf()
103 {
104 request=MPI_REQUEST_NULL;
105 Communicator=MPI_COMM_WORLD; const_buffer=0;
106 offsets=new int[nprocs()+1]; offsctr=1; offsets[0]=0;
107 }
108 MPIbuf(const MPIbuf& x): offsets(NULL) {*this=x;}
109 ~MPIbuf()
110 {
111 if (request!=MPI_REQUEST_NULL) wait(); //MPI_Request_free(&request);
112 delete [] offsets;
113 }
114 const MPIbuf& operator=(const MPIbuf& x)
115 {
116 Communicator=x.Communicator;
117 delete [] offsets;
118 offsets=new int[nprocs()+1];
119 offsctr=x.offsctr;
120 for (unsigned i=0; i<offsctr; i++) offsets[i]=x.offsets[i];
121 request=MPI_REQUEST_NULL;
122 packraw(x.data(),x.size());
123 return *this;
124 }
125
126
128 bool sent() {int is_sent; MPI_Status s; MPI_Test(&request,&is_sent,&s); return is_sent;}
130 void wait() {MPI_Status s; MPI_Wait(&request,&s);}
131
133 void send(unsigned dest, int tag);
135 void isend(unsigned dest, int tag);
136
138 MPIbuf& get(int p=MPI_ANY_SOURCE, int t=MPI_ANY_TAG);
139
141 void send_recv(unsigned dest, int sendtag, int source, int recvtag);
142
144 MPIbuf& bcast(unsigned root);
145
147 MPIbuf& gather(unsigned root);
149 MPIbuf& scatter(unsigned root);
151 MPIbuf& reset() {reseti(); reseto(); tag=1; return *this;}
153 bool msg_waiting(int source=MPI_ANY_SOURCE, int tag=MPI_ANY_TAG);
154
155 template <class T> MPIbuf& operator<<(const T& x);
156
157 /* Manipulators */
158 MPIbuf& operator<<(classdesc::send s)
159 {send(s.proc,s.tag); return *this;}
160 MPIbuf& operator<<(classdesc::isend s)
161 {isend(s.proc,s.tag); return *this;}
162 MPIbuf& operator<<(classdesc::bcast s)
163 {bcast(s.root); return *this;}
164 /* Mark a processor boundary for scatterv */
165 MPIbuf& operator<<(mark s)
166 {offsets[offsctr++]=size(); return *this;}
167
168 // template <class T> inline MPIbuf& operator<<(const T& x);
169 // {pack(this,string(),x); return *this;}
170
171 };
172
174 class MPIbuf_array
175 {
176 std::vector<MPIbuf> bufs;
177 std::vector<MPI_Request> requests;
178 public:
179
180 MPIbuf_array(unsigned n): bufs(n), requests(n) {}
181
182 MPIbuf& operator[](unsigned i) {return bufs[i];}
183
185 bool testall()
186 {
187 int flag;
188 for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
189 MPI_Testall(bufs.size(),requests.data(),&flag,MPI_STATUSES_IGNORE);
190 return flag;
191 }
192
194 {
195 int flag,index;
196 for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
197 MPI_Testany(bufs.size(),requests.data(),&index,&flag,MPI_STATUS_IGNORE);
198 return index;
199 }
200
201 std::vector<int> testsome()
202 {
203 int count;
204 std::vector<int> index(bufs.size());
205 for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
206 MPI_Testsome(bufs.size(),requests.data(),&count,index.data(),MPI_STATUSES_IGNORE);
207 return std::vector<int>(index.begin(),index.begin()+count);
208 }
209
210 void waitall()
211 {
212 for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
213 MPI_Waitall(bufs.size(),requests.data(),MPI_STATUSES_IGNORE);
214 }
215
217 {
218 int index;
219 for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
220 MPI_Waitany(bufs.size(),requests.data(),&index,MPI_STATUSES_IGNORE);
221 return index;
222 }
223
224 std::vector<int> waitsome()
225 {
226 int count;
227 std::vector<int> index(bufs.size());
228 for (size_t i=0; i<bufs.size(); i++) requests[i]=bufs[i].request;
229 MPI_Waitsome(bufs.size(),requests.data(),&count,index.data(),MPI_STATUS_IGNORE);
230 return std::vector<int>(index.begin(),index.begin()+count);
231 }
232 };
233
234
235 inline unsigned MPIbuf::myid()
236 {
237 int m;
238 if (MPI_running()) MPI_Comm_rank(Communicator,&m);
239 else m=0;
240 return m;
241 }
242
243 inline unsigned MPIbuf::nprocs()
244 {
245 int m;
246 if (MPI_running()) MPI_Comm_size(Communicator,&m);
247 else m=1;
248 return m;
249 }
250
251 inline void MPIbuf::send(unsigned dest, int tag=0)
252 {
253 if (dest==myid()) return; /* nothing to be done */
254 MPI_Send(data(),size(),MPI_CHAR,dest,tag,Communicator); reseti();
255 }
256
257 inline void MPIbuf::isend(unsigned dest, int tag=0)
258 {
259 if (dest==myid()) return; /* nothing to be done */
260 MPI_Isend(data(),size(),MPI_CHAR,dest,tag,Communicator,&request); reseti();
261 }
262
263 inline MPIbuf& MPIbuf::get(int p, int t)
264 {
265 MPI_Status status;
266 MPI_Probe(p,t,Communicator,&status);
267 int sz;
268 MPI_Get_count(&status,MPI_CHAR,&sz); //this is berserk, but MPI must have ints!
269 m_size=sz;
270 realloc(m_size);
271 proc=status.MPI_SOURCE;
272 tag=status.MPI_TAG;
273 MPI_Recv(data(),size(),MPI_CHAR,proc,tag,Communicator,&status);
274 reseto();
275 return *this;
276 }
277
278 // 2002-05-16 - asynchorous send and receive modification.
279 // 2002-05-22 - use MPI_Isend, MPI_Proobe and MPI_Recv
280 //
281 inline void MPIbuf::send_recv(unsigned dest, int sendtag,
282 int source=MPI_ANY_SOURCE,
283 int recvtag=MPI_ANY_TAG)
284 {
285 if (dest==myid()) return; /* nothing to be done */
286 /* send sizes first */
287 int tempsize;
288 MPI_Status status;
289 MPI_Request r1;
290
291 MPI_Isend(data(), size(), MPI_CHAR, dest, sendtag, Communicator, &r1);
292 MPI_Probe(source, sendtag, Communicator, &status);
293 MPI_Get_count(&status, MPI_CHAR, &tempsize);
294
295 char *tempdata=realloc(NULL, tempsize);
296 MPI_Recv(tempdata,tempsize, MPI_CHAR,source,recvtag,Communicator,&status);
297
298 MPI_Wait(&r1,&status); // ensure data is actually sent before deleting storage
299 realloc(0); m_data=tempdata; m_size=tempsize;
300 proc=status.MPI_SOURCE;
301 tag=status.MPI_TAG;
302 reseto();
303 }
304
305 inline MPIbuf& MPIbuf::bcast(unsigned root)
306 {
307 int myid;
308 if (!const_buffer)
309 {
310 int sz=size();
311 MPI_Bcast(&sz,1,MPI_INT,root,Communicator);
312 m_size=sz;
313 }
314 MPI_Comm_rank(Communicator,&myid);
315 if (myid!=int(root)) realloc(m_size);
316 MPI_Bcast(data(),size(),MPI_CHAR,root,Communicator);
317 reseto();
318 return *this;
319 }
320
321 inline MPIbuf& MPIbuf::gather(unsigned root)
322 {
323 int rootsz=0;
324 unsigned i;
325 char *rootdata=NULL;
326 int *sizes=NULL, *offsets=NULL;
327 if (!const_buffer)
328 {
329 if (myid()==root)
330 {
331 sizes=new int[nprocs()];
332 offsets=new int[nprocs()+1];
333 }
334 int sz=m_size;
335 MPI_Gather(&sz,1,MPI_INT,sizes,1,MPI_INT,root,Communicator);
336 if (myid()==root)
337 {
338 for (offsets[0]=0, i=0; i<nprocs(); i++)
339 offsets[i+1]=offsets[i]+sizes[i];
340 rootsz=offsets[nprocs()];
341 rootdata=realloc(NULL,rootsz);
342 }
343 MPI_Gatherv(data(),size(),MPI_CHAR,rootdata,sizes,offsets,
344 MPI_CHAR,root,Communicator);
345 if (myid()==root)
346 {delete [] sizes; delete [] offsets;}
347 }
348 else
349 {
350 rootsz=size();
351 if (myid()==root) rootdata=realloc(NULL,size()*nprocs());
352 MPI_Gather(data(),size(),MPI_CHAR,rootdata,size(),
353 MPI_CHAR,root,Communicator);
354 }
355 if (myid()==root) {free(m_data); m_data=rootdata; reseto(); m_size=rootsz;}
356 else reseti();
357 return *this;
358 }
359
360 inline MPIbuf& MPIbuf::scatter(unsigned root)
361 {
362 int *sizes=NULL, np=nprocs(), amroot=myid()==root;
363 char *rootdata=NULL;
364 if (amroot)
365 {
366 rootdata=m_data;
367 sizes=new int[np];
368 for (int i=0; i<np; i++) sizes[i]=offsets[i+1]-offsets[i];
369 m_data=realloc(NULL,sizes[root]);
370 }
371 /* broadcast sizes array to slaves */
372 int sz;
373 MPI_Scatter(sizes,1,MPI_INT,&sz,1,MPI_INT,root,Communicator);
374 if (sz>=0)
375 {
376 m_size=sz;
377 realloc(m_size);
378 MPI_Scatterv(rootdata,sizes,offsets,MPI_CHAR,data(),size(),MPI_CHAR,
379 root,Communicator);
380 }
381 if (amroot)
382 {
383 free(rootdata); delete [] sizes;
384 for (int i=0; i<np; offsets[i++]=0);
385 }
386 return *this;
387 }
388
389 inline bool MPIbuf::msg_waiting(int source,int tag)
390 {
391 MPI_Status status;
392 int waiting;
393 MPI_Iprobe(source,tag,Communicator,&waiting,&status);
394 return waiting;
395 }
396
398 template<class S>
399 class MPIslave: public MPIbuf
400 {
401 S slave;
402 void method(MPIbuf& buffer);
403 public:
404 std::vector<int> idle;
405 MPIslave() {init();}
406 ~MPIslave() {finalize();}
407 void init();
408 void finalize();
409 MPIbuf& operator<<(void (S::*m)(MPIbuf&))
410 {
411 reseti();
412 pack(*this,string(),m);
413 // ::pack(cmd,string(),is_array(),*(char*)&m,1,sizeof(m));
414 return *this;
415 }
416 template <class T> MPIbuf& operator<<(const T& x)
417 {reseti(); return (*this) << x;}
419 void exec(MPIbuf& x) {x.send(idle.back(),0); idle.pop_back();}
421 MPIbuf& get_returnv(){get(); idle.push_back(proc); return *this;}
423 bool all_idle() {return idle.size()==nprocs()-1;}
425 void wait_all_idle() {while (!all_idle()) get_returnv();}
427 void bcast(MPIbuf& c);
428 };
429
430 template <class S>
431 inline void MPIslave<S>::init()
432 {
433#if MPI_DEBUG
434 /* enable this piece of code for debugging under gdb */
435 if (myid==0) std::getchar();
436 MPI_Barrier(MPI_COMM_WORLD);
437#endif
438 if (myid()>0)
439 {
440 /* slave loop */
441 MPIbuf buffer;
442 for (buffer.get(); buffer.tag==0; buffer.get())
443 method(buffer);
444 MPI_Finalize();
445 exit(0);
446 }
447 else
448 for (unsigned i=1; i<nprocs(); i++) idle.push_back(i);
449 }
450
451 template <class S>
452 void MPIslave<S>::finalize()
453 {
454 if (myid()==0)
455 for (unsigned i=1; i<nprocs(); i++) send(i,1);
456 }
457
458 template <class S>
459 inline void MPIslave<S>::method(MPIbuf& buffer)
460 {
461 void (S::*m)(MPIbuf&);
462 // buffer.unpackraw((char*)&m,sizeof(m));
463 unpack(buffer,string(),m);
464 buffer.tag=0;
465 (slave.*m)(buffer);
466 if (buffer.tag) buffer.send(buffer.proc,0);
467 }
468
469 template <class S>
470 inline void MPIslave<S>::bcast(MPIbuf& c)
471 {
472 for (unsigned i=1; i<nprocs(); i++)
473 {MPI_Send(data(),size(),MPI_CHAR,i,0,c.Communicator);}
474 c.reseti();
475 }
476
478 class MPISPMD
479 {
480 public:
481 int nprocs, myid;
482 MPISPMD() {nprocs=1, myid=0;}
483 MPISPMD(int& argc, char**& argv) {init(argc,argv);};
484 ~MPISPMD() {finalize();}
485 void init(int& argc, char**& argv);
486 void finalize() {MPI_Finalize();}
487 };
488
489 inline void MPISPMD::init(int& argc, char**& argv)
490 {
491 MPI_Init(&argc,&argv);
492 MPI_Comm_rank(MPI_COMM_WORLD,&myid);
493 MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
494#if MPI_DEBUG
495 /* enable this piece of code for debugging under gdb */
496 if (myid==0) std::getchar();
497 MPI_Barrier(MPI_COMM_WORLD);
498#endif
499 }
500
501}
502
503#endif /* CLASSDESCMP_H */
504
505
506
used for managing groups of messages
Definition classdescMP.h:175
int testany()
return the index of any request that has completed, or MPI_UNDEFINED if none
Definition classdescMP.h:193
bool testall()
return true if all messages have been completed
Definition classdescMP.h:185
std::vector< int > waitsome()
wait for some outstanding requests to complete, returning an array of request indices
Definition classdescMP.h:224
int waitany()
wait for any outstanding request to complete, returning index of completed request
Definition classdescMP.h:216
void waitall()
wait for all outstanding requests to complete
Definition classdescMP.h:210
std::vector< int > testsome()
return the index of the requests that have completed
Definition classdescMP.h:201
buffer object providing MPI functionality
Definition classdescMP.h:76
MPIbuf & bcast(unsigned root)
broadcast data from root
Definition classdescMP.h:305
bool msg_waiting(int source=MPI_ANY_SOURCE, int tag=MPI_ANY_TAG)
is there a message waiting to be received into the buffe
Definition classdescMP.h:389
MPIbuf & get(int p=MPI_ANY_SOURCE, int t=MPI_ANY_TAG)
receive a message from p (MPI_ANY_SOURCE) with tag t (MPI_ANY_TAG)
Definition classdescMP.h:263
bool const_buffer
buffer size is same on all processes in a collective communication
Definition classdescMP.h:99
void send(unsigned dest, int tag)
send data to dest with tag tag
Definition classdescMP.h:251
MPIbuf & gather(unsigned root)
gather data (concatenated) into root's buffer
Definition classdescMP.h:321
void send_recv(unsigned dest, int sendtag, int source, int recvtag)
perform a simultaneous send and receive between a pair of processes
Definition classdescMP.h:281
void isend(unsigned dest, int tag)
asyncronously send data to dest with tag tag
Definition classdescMP.h:257
MPIbuf & reset()
reset the buffer to send or receive a new message
Definition classdescMP.h:151
void wait()
wait for previous asyncronous call to complete
Definition classdescMP.h:130
bool sent()
returns true if previous asyncronous call has been set
Definition classdescMP.h:128
MPI_Comm Communicator
The MPI communicator to be used for subsequent communications.
Definition classdescMP.h:92
unsigned nprocs()
current processes taskID
Definition classdescMP.h:243
MPIbuf & scatter(unsigned root)
scatter root's data (that has been marked with mark)
Definition classdescMP.h:360
Master slave manager.
Definition classdescMP.h:400
void wait_all_idle()
wait until all slaves are idle
Definition classdescMP.h:425
void exec(MPIbuf &x)
send a request to the next available slave
Definition classdescMP.h:419
bool all_idle()
true if all slaves are idle
Definition classdescMP.h:423
MPIbuf & get_returnv()
process a return value
Definition classdescMP.h:421
std::vector< int > idle
list of waiting slaves, valid on master
Definition classdescMP.h:404
void bcast(MPIbuf &c)
broadcast a request to all slaves.
Definition classdescMP.h:470
bcast(int root)
root is the taskID of the source data
Definition classdescMP.h:64
MPIbuf manipulator to asyncronously send the MPIbuf's contents to a remote process.
Definition classdescMP.h:48
isend(int proc, int tag=0)
proc MPI taskID to send the message to
Definition classdescMP.h:53
A manipulator to mark a processor boundary for scatterv.
Definition classdescMP.h:68
Definition pack_base.h:138
size_t m_size
size of buffer
Definition pack_base.h:160
size_t size() const
size of buffer
Definition pack_base.h:170
char * m_data
actual buffer
Definition pack_base.h:159
const char * data() const
actual buffer
Definition pack_base.h:168
MPIbuf manipulator to send the MPIbuf's contents to a remote process.
Definition classdescMP.h:38
Definition pack_base.h:330
Contains definitions related to classdesc functionality.
void pack(pack_t &targ, const string &desc, is_treenode dum, const T *const &arg)
serialise a tree (or DAG)
Definition pack_graph.h:28
void unpack(unpack_t &targ, const string &desc, is_treenode dum, T *&arg)
unserialise a tree.
Definition pack_graph.h:44
serialisation descriptor