FD.io VPP  v18.11-rc0-18-g2a3fb1a
Vector Packet Processing
message_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19 
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22 
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <svm/queue.h>
26 
27 typedef struct svm_msg_q_ring_
28 {
29  volatile u32 cursize; /**< current size of the ring */
30  u32 nitems; /**< max size of the ring */
31  volatile u32 head; /**< current head (for dequeue) */
32  volatile u32 tail; /**< current tail (for enqueue) */
33  u32 elsize; /**< size of an element */
34  u8 *data; /**< chunk of memory for msg data */
36 
37 typedef struct svm_msg_q_
38 {
39  svm_queue_t *q; /**< queue for exchanging messages */
40  svm_msg_q_ring_t *rings; /**< rings with message data*/
41 } svm_msg_q_t;
42 
43 typedef struct svm_msg_q_ring_cfg_
44 {
47  void *data;
49 
50 typedef struct svm_msg_q_cfg_
51 {
52  int consumer_pid; /**< pid of msg consumer */
53  u32 q_nitems; /**< msg queue size (not rings) */
54  u32 n_rings; /**< number of msg rings */
55  svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */
57 
58 typedef union
59 {
60  struct
61  {
62  u32 ring_index; /**< ring index, could be u8 */
63  u32 elt_index; /**< index in ring */
64  };
67 
68 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
69 /**
70  * Allocate message queue
71  *
72  * Allocates a message queue on the heap. Based on the configuration options,
73  * apart from the message queue this also allocates (one or multiple)
74  * shared-memory rings for the messages.
75  *
76  * @param cfg configuration options: queue len, consumer pid,
77  * ring configs
78  * @return message queue
79  */
81 
82 /**
83  * Free message queue
84  *
85  * @param mq message queue to be freed
86  */
87 void svm_msg_q_free (svm_msg_q_t * mq);
88 
89 /**
90  * Allocate message buffer
91  *
92  * Message is allocated on the first available ring capable of holding
93  * the requested number of bytes.
94  *
95  * @param mq message queue
96  * @param nbytes number of bytes needed for message
97  * @return message structure pointing to the ring and position
98  * allocated
99  */
101 
102 /**
103  * Allocate message buffer on ring
104  *
105  * Message is allocated, on requested ring. The caller MUST check that
106  * the ring is not full.
107  *
108  * @param mq message queue
109  * @param ring_index ring on which the allocation should occur
110  * @return message structure pointing to the ring and position
111  * allocated
112  */
114 
115 /**
116  * Lock message queue and allocate message buffer on ring
117  *
118  * This should be used when multiple writers/readers are expected to
119  * compete for the rings/queue. Message should be enqueued by calling
120  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
121  * the message in enqueued.
122  *
123  * @param mq message queue
124  * @param ring_index ring on which the allocation should occur
125  * @param noblock flag that indicates if request should block
126  * @param msg pointer to message to be filled in
127  * @return 0 on success, negative number otherwise
128  */
130  u8 noblock, svm_msg_q_msg_t * msg);
131 
132 /**
133  * Free message buffer
134  *
135  * Marks message buffer on ring as free.
136  *
137  * @param mq message queue
138  * @param msg message to be freed
139  */
141 
142 /**
143  * Producer enqueue one message to queue
144  *
145  * Prior to calling this, the producer should've obtained a message buffer
146  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
147  *
148  * @param mq message queue
149  * @param msg message (pointer to ring position) to be enqueued
150  * @param nowait flag to indicate if request is blocking or not
151  * @return success status
152  */
153 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
154 
155 /**
156  * Producer enqueue one message to queue with mutex held
157  *
158  * Prior to calling this, the producer should've obtained a message buffer
159  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
160  * the queue mutex is held.
161  *
162  * @param mq message queue
163  * @param msg message (pointer to ring position) to be enqueued
164  * @return success status
165  */
167 
168 /**
169  * Consumer dequeue one message from queue
170  *
171  * This returns the message pointing to the data in the message rings.
172  * The consumer is expected to call @ref svm_msg_q_free_msg once it
173  * finishes processing/copies the message data.
174  *
175  * @param mq message queue
176  * @param msg pointer to structure where message is to be received
177  * @param cond flag that indicates if request should block or not
178  * @param time time to wait if condition it SVM_Q_TIMEDWAIT
179  * @return success status
180  */
181 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
182  svm_q_conditional_wait_t cond, u32 time);
183 
184 /**
185  * Consumer dequeue one message from queue with mutex held
186  *
187  * Returns the message pointing to the data in the message rings under the
188  * assumption that the message queue lock is already held. The consumer is
189  * expected to call @ref svm_msg_q_free_msg once it finishes
190  * processing/copies the message data.
191  *
192  * @param mq message queue
193  * @param msg pointer to structure where message is to be received
194  * @return success status
195  */
197 
198 /**
199  * Get data for message in queue
200  *
201  * @param mq message queue
202  * @param msg message for which the data is requested
203  * @return pointer to data
204  */
206 
207 /**
208  * Get message queue ring
209  *
210  * @param mq message queue
211  * @param ring_index index of ring
212  * @return pointer to ring
213  */
214 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
215 
216 /**
217  * Check if message queue is full
218  */
219 static inline u8
221 {
222  return (mq->q->cursize == mq->q->maxsize);
223 }
224 
225 static inline u8
227 {
228  ASSERT (ring_index < vec_len (mq->rings));
229  return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
230 }
231 
232 /**
233  * Check if message queue is empty
234  */
235 static inline u8
237 {
238  return (mq->q->cursize == 0);
239 }
240 
241 /**
242  * Check length of message queue
243  */
244 static inline u32
246 {
247  return mq->q->cursize;
248 }
249 
250 /**
251  * Check if message is invalid
252  */
253 static inline u8
255 {
256  return (msg->as_u64 == (u64) ~ 0);
257 }
258 
259 /**
260  * Try locking message queue
261  */
262 static inline int
264 {
265  return pthread_mutex_trylock (&mq->q->mutex);
266 }
267 
268 /**
269  * Lock, or block trying, the message queue
270  */
271 static inline int
273 {
274  return pthread_mutex_lock (&mq->q->mutex);
275 }
276 
277 static inline void
279 {
280  pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
281 }
282 
283 /**
284  * Unlock message queue
285  */
286 static inline void
288 {
289  /* The other side of the connection is not polling */
290  if (mq->q->cursize < (mq->q->maxsize / 8))
291  (void) pthread_cond_broadcast (&mq->q->condvar);
292  pthread_mutex_unlock (&mq->q->mutex);
293 }
294 
295 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
296 
297 /*
298  * fd.io coding-style-patch-verification: ON
299  *
300  * Local Variables:
301  * eval: (c-set-style "gnu")
302  * End:
303  */
static u8 svm_msg_q_msg_is_invalid(svm_msg_q_msg_t *msg)
Check if message is invalid.
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
unsigned long u64
Definition: types.h:89
static u8 svm_msg_q_is_empty(svm_msg_q_t *mq)
Check if message queue is empty.
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
unsigned char u8
Definition: types.h:56
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
struct svm_msg_q_ svm_msg_q_t
struct svm_msg_q_ring_ svm_msg_q_ring_t
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:39
unsigned int u32
Definition: types.h:88
struct svm_msg_q_cfg_ svm_msg_q_cfg_t
static void svm_msg_q_wait(svm_msg_q_t *mq)
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
struct svm_msg_q_ring_cfg_ svm_msg_q_ring_cfg_t
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:39
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define ASSERT(truth)
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
Definition: message_queue.c:85
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
struct _svm_queue svm_queue_t
u32 elsize
size of an element
Definition: message_queue.h:33
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
Definition: message_queue.c:99
static u32 svm_msg_q_size(svm_msg_q_t *mq)
Check length of message queue.
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:26
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
static u8 svm_msg_q_is_full(svm_msg_q_t *mq)
Check if message queue is full.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:72
u32 nitems
max size of the ring
Definition: message_queue.h:30