OpenMPI  0.1.1
vprotocol_pessimist_sender_based.h
1 /*
2  * Copyright (c) 2004-2007 The Trustees of the University of Tennessee.
3  * All rights reserved.
4  * $COPYRIGHT$
5  *
6  * Additional copyrights may follow
7  *
8  * $HEADER$
9  */
10 
11 #ifndef __VPROTOCOL_PESSIMIST_SENDERBASED_H__
12 #define __VPROTOCOL_PESSIMIST_SENDERBASED_H__
13 
14 #include "ompi_config.h"
16 #include "ompi/mca/pml/v/pml_v_output.h"
17 #include "vprotocol_pessimist_sender_based_types.h"
18 #include "vprotocol_pessimist_request.h"
19 #include "vprotocol_pessimist.h"
20 
21 BEGIN_C_DECLS
22 
23 /** Prepare for using the sender based storage
24  */
25 int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size);
26 
27 /** Cleanup mmap etc
28  */
29 void vprotocol_pessimist_sender_based_finalize(void);
30 
31 /** Manage mmap floating window, allocating enough memory for the message to be
32  * asynchronously copied to disk.
33  */
34 void vprotocol_pessimist_sender_based_alloc(size_t len);
35 
36 
37 /*******************************************************************************
38  * Convertor pack (blocking) method (good latency, bad bandwidth)
39  */
40 #if defined(SB_USE_PACK_METHOD)
41 static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *pmlreq)
42 {
43  if(0 != pmlreq->req_bytes_packed)
44  {
45  opal_convertor_t conv;
46  size_t max_data;
47  size_t zero = 0;
48  unsigned int iov_count = 1;
49  struct iovec iov;
50 
51  max_data = iov.iov_len = pmlreq->req_bytes_packed;
52  iov.iov_base = (IOVBASE_TYPE *) VPESSIMIST_SEND_FTREQ(pmlreq)->sb.cursor;
53  opal_convertor_clone_with_position( &pmlreq->req_base.req_convertor,
54  &conv, 0, &zero );
55  opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
56  }
57 }
58 
59 #define __SENDER_BASED_METHOD_FLUSH(REQ)
60 
61 
62 /*******************************************************************************
63  * Convertor replacement (non blocking) method (under testing)
64  */
65 #elif defined(SB_USE_CONVERTOR_METHOD)
66 int32_t vprotocol_pessimist_sender_based_convertor_advance(opal_convertor_t*,
67  struct iovec*,
68  uint32_t*,
69  size_t*);
70 
71 #define __SENDER_BASED_METHOD_COPY(REQ) do { \
72  opal_convertor_t *pConv; \
73  mca_vprotocol_pessimist_send_request_t *ftreq; \
74  \
75  pConv = & (REQ)->req_base.req_convertor; \
76  ftreq = VPESSIMIST_SEND_FTREQ(REQ); \
77  ftreq->sb.conv_flags = pConv->flags; \
78  ftreq->sb.conv_advance = pConv->fAdvance; \
79  \
80  pConv->flags &= ~CONVERTOR_NO_OP; \
81  pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance; \
82 } while(0)
83 
84 #define __SENDER_BASED_METHOD_FLUSH(REQ)
85 
86 #define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
87  (mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset + \
88  (uintptr_t) ((CONV)->clone_of)))
89 
90 
91 /*******************************************************************************
92  * progress method
93  */
94 #elif defined(SB_USE_PROGRESS_METHOD)
95 static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *req)
96 {
97  if(req->req_bytes_packed)
98  {
99  mca_vprotocol_pessimist_send_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
100  ftreq->sb.bytes_progressed = 0;
101  opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
102  &ftreq->list_item);
103  }
104 }
105 
106 static inline int vprotocol_pessimist_sb_progress_req(mca_pml_base_send_request_t *req)
107 {
108  mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
109  size_t max_data = 0;
110 
111  if(ftreq->sb.bytes_progressed < req->req_bytes_packed)
112  {
113  opal_convertor_t conv;
114  unsigned int iov_count = 1;
115  struct iovec iov;
116  uintptr_t position = ftreq->sb.bytes_progressed;
117  max_data = req->req_bytes_packed - ftreq->sb.bytes_progressed;
118  iov.iov_len = max_data;
119  iov.iov_base = (IOVBASE_TYPE *) (ftreq->sb.cursor + position);
120 
121  V_OUTPUT_VERBOSE(80, "pessimist:\tsb\tprgress\t%"PRIpclock"\tsize %lu from position %lu", ftreq->reqid, max_data, position);
122  opal_convertor_clone_with_position(&req->req_base.req_convertor,
123  &conv, 0, &position );
124  opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
125  ftreq->sb.bytes_progressed += max_data;
126  }
127  return max_data;
128 }
129 
130 static inline int vprotocol_pessimist_sb_progress_all_reqs(void)
131 {
132  int ret = 0;
133 
134  /* progress any waiting Sender Based copy */
135  if(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq))
136  {
138  opal_list_remove_first(&mca_vprotocol_pessimist.sender_based.sb_sendreq);
139  if(vprotocol_pessimist_sb_progress_req(VPROTOCOL_SEND_REQ(ftreq)))
140  ret = 1;
141  opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
142  &ftreq->list_item);
143  }
144  return ret;
145 }
146 
147 static inline void __SENDER_BASED_METHOD_FLUSH(ompi_request_t *req)
148 {
150 
151  if((pmlreq->req_base.req_type == MCA_PML_REQUEST_SEND) &&
152  pmlreq->req_bytes_packed)
153  {
154  mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
155  assert(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq));
156  opal_list_remove_item(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
157  (opal_list_item_t *) ftreq);
158  vprotocol_pessimist_sb_progress_req(pmlreq);
159  assert(pmlreq->req_bytes_packed == ftreq->sb.bytes_progressed);
160  }
161 }
162 
163 #endif /* SB_USE_*_METHOD */
164 
165 
166 /** Copy data associated to a pml_base_send_request_t to the sender based
167  * message payload buffer
168  */
169 static inline void vprotocol_pessimist_sender_based_copy_start(ompi_request_t *req)
170 {
172  mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
174 
175  /* Allocate enough sender-based space to hold the message */
176  if(mca_vprotocol_pessimist.sender_based.sb_available <
177  pmlreq->req_bytes_packed +
179  {
180  vprotocol_pessimist_sender_based_alloc(pmlreq->req_bytes_packed);
181  }
182 
183  /* Copy message header to the sender-based space */
184  /* /!\ This is NOT thread safe */
185  ftreq->sb.cursor = mca_vprotocol_pessimist.sender_based.sb_cursor;
186 #if 1
187  mca_vprotocol_pessimist.sender_based.sb_cursor +=
189  pmlreq->req_bytes_packed;
190  mca_vprotocol_pessimist.sender_based.sb_available -=
192  pmlreq->req_bytes_packed;
193 #endif
194  sbhdr = (vprotocol_pessimist_sender_based_header_t *) ftreq->sb.cursor;
195  sbhdr->size = pmlreq->req_bytes_packed;
196  sbhdr->dst = pmlreq->req_base.req_peer;
197  sbhdr->tag = pmlreq->req_base.req_tag;
198  sbhdr->contextid = pmlreq->req_base.req_comm->c_contextid;
199  sbhdr->sequence = pmlreq->req_base.req_sequence;
200  ftreq->sb.cursor += sizeof(vprotocol_pessimist_sender_based_header_t);
201  V_OUTPUT_VERBOSE(70, "pessimist:\tsb\tsend\t%"PRIpclock"\tsize %lu (+%lu header)", VPESSIMIST_FTREQ(req)->reqid, (long unsigned)pmlreq->req_bytes_packed, (long unsigned)sizeof(vprotocol_pessimist_sender_based_header_t));
202 
203  /* Use one of the previous data copy method */
204  __SENDER_BASED_METHOD_COPY(pmlreq);
205 }
206 
207 /** Ensure sender based is finished before allowing user to touch send buffer
208  */
209 #define vprotocol_pessimist_sender_based_flush(REQ) __SENDER_BASED_METHOD_FLUSH(REQ)
210 
211 END_C_DECLS
212 
213 #endif
214 
static opal_list_item_t * opal_list_remove_item(opal_list_t *list, opal_list_item_t *item)
Remove an item from a list.
Definition: opal_list.h:348
static bool opal_list_is_empty(opal_list_t *list)
Check for empty list.
Definition: opal_list.h:174
struct ompi_communicator_t * req_comm
communicator pointer
Definition: pml_base_request.h:63
Definition: vprotocol_pessimist_sender_based_types.h:51
int32_t req_peer
peer process - rank w/in this communicator
Definition: pml_base_request.h:71
Definition: opal_list.h:98
opal_convertor_t req_convertor
always need the convertor
Definition: pml_base_request.h:66
mca_pml_base_request_t req_base
base request type - common data structure for use by wait/test
Definition: pml_base_sendreq.h:38
#define opal_list_append(l, i)
Append an item to the end of the list.
Definition: opal_list.h:410
int32_t req_tag
user defined tag
Definition: pml_base_request.h:72
Base type for send requests.
Definition: pml_base_sendreq.h:37
Definition: ompi_uio.h:29
Definition: opal_convertor.h:90
mca_pml_base_request_type_t req_type
MPI request type - used for test.
Definition: pml_base_request.h:62
static opal_list_item_t * opal_list_remove_first(opal_list_t *list)
Remove the first item from the list and return it.
Definition: opal_list.h:522
Definition: vprotocol_pessimist_request.h:22
size_t req_bytes_packed
packed size of a message given the datatype and count
Definition: pml_base_sendreq.h:40
Main top-level request struct definition.
Definition: request.h:100
uint64_t req_sequence
sequence number for MPI pt-2-pt ordering
Definition: pml_base_request.h:74