FD.io VPP  v18.10-32-g1161dda
Vector Packet Processing
queue.c
Go to the documentation of this file.
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19 
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31 
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35  svm_queue_t *q;
36  pthread_mutexattr_t attr;
37  pthread_condattr_t cattr;
38 
39  q = (svm_queue_t *) base;
40  memset (q, 0, sizeof (*q));
41 
42  q->elsize = elsize;
43  q->maxsize = nels;
44  q->producer_evtfd = -1;
45  q->consumer_evtfd = -1;
46 
47  memset (&attr, 0, sizeof (attr));
48  memset (&cattr, 0, sizeof (cattr));
49 
50  if (pthread_mutexattr_init (&attr))
51  clib_unix_warning ("mutexattr_init");
52  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53  clib_unix_warning ("pthread_mutexattr_setpshared");
54  if (pthread_mutex_init (&q->mutex, &attr))
55  clib_unix_warning ("mutex_init");
56  if (pthread_mutexattr_destroy (&attr))
57  clib_unix_warning ("mutexattr_destroy");
58  if (pthread_condattr_init (&cattr))
59  clib_unix_warning ("condattr_init");
60  /* prints funny-looking messages in the Linux target */
61  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
62  clib_unix_warning ("condattr_setpshared");
63  if (pthread_cond_init (&q->condvar, &cattr))
64  clib_unix_warning ("cond_init1");
65  if (pthread_condattr_destroy (&cattr))
66  clib_unix_warning ("cond_init2");
67 
68  return (q);
69 }
70 
72 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
73 {
74  svm_queue_t *q;
75 
77  + nels * elsize, CLIB_CACHE_LINE_BYTES);
78  memset (q, 0, sizeof (*q));
79  q = svm_queue_init (q, nels, elsize);
80  q->consumer_pid = consumer_pid;
81 
82  return q;
83 }
84 
85 /*
86  * svm_queue_free
87  */
88 void
90 {
91  (void) pthread_mutex_destroy (&q->mutex);
92  (void) pthread_cond_destroy (&q->condvar);
93  clib_mem_free (q);
94 }
95 
96 void
98 {
99  pthread_mutex_lock (&q->mutex);
100 }
101 
102 void
104 {
105  pthread_mutex_unlock (&q->mutex);
106 }
107 
108 int
110 {
111  return q->cursize == q->maxsize;
112 }
113 
114 static inline void
116 {
117  if (q->producer_evtfd == -1)
118  {
119  (void) pthread_cond_broadcast (&q->condvar);
120  }
121  else
122  {
123  int __clib_unused rv, fd;
124  u64 data = 1;
125  ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126  fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127  rv = write (fd, &data, sizeof (data));
128  }
129 }
130 
131 static inline void
133 {
134  if (q->producer_evtfd == -1)
135  {
136  pthread_cond_wait (&q->condvar, &q->mutex);
137  }
138  else
139  {
140  /* Fake a wait for event. We could use epoll but that would mean
141  * using yet another fd. Should do for now */
142  u32 cursize = q->cursize;
143  pthread_mutex_unlock (&q->mutex);
144  while (q->cursize == cursize)
145  CLIB_PAUSE ();
146  pthread_mutex_lock (&q->mutex);
147  }
148 }
149 
150 void
152 {
154 }
155 
156 static inline int
158 {
159  struct timespec ts;
160  ts.tv_sec = unix_time_now () + (u32) timeout;
161  ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
162 
163  if (q->producer_evtfd == -1)
164  {
165  return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
166  }
167  else
168  {
169  double max_time = unix_time_now () + timeout;
170  u32 cursize = q->cursize;
171  int rv;
172 
173  pthread_mutex_unlock (&q->mutex);
174  while (q->cursize == cursize && unix_time_now () < max_time)
175  CLIB_PAUSE ();
176  rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
177  pthread_mutex_lock (&q->mutex);
178  return rv;
179  }
180 }
181 
182 int
183 svm_queue_timedwait (svm_queue_t * q, double timeout)
184 {
185  return svm_queue_timedwait_inline (q, timeout);
186 }
187 
188 /*
189  * svm_queue_add_nolock
190  */
191 int
193 {
194  i8 *tailp;
195  int need_broadcast = 0;
196 
197  if (PREDICT_FALSE (q->cursize == q->maxsize))
198  {
199  while (q->cursize == q->maxsize)
201  }
202 
203  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
204  clib_memcpy (tailp, elem, q->elsize);
205 
206  q->tail++;
207  q->cursize++;
208 
209  need_broadcast = (q->cursize == 1);
210 
211  if (q->tail == q->maxsize)
212  q->tail = 0;
213 
214  if (need_broadcast)
215  svm_queue_send_signal (q, 1);
216  return 0;
217 }
218 
219 void
221 {
222  i8 *tailp;
223 
224  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225  clib_memcpy (tailp, elem, q->elsize);
226 
227  q->tail = (q->tail + 1) % q->maxsize;
228  q->cursize++;
229 
230  if (q->cursize == 1)
231  svm_queue_send_signal (q, 1);
232 }
233 
234 
235 /*
236  * svm_queue_add
237  */
238 int
239 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
240 {
241  i8 *tailp;
242  int need_broadcast = 0;
243 
244  if (nowait)
245  {
246  /* zero on success */
247  if (pthread_mutex_trylock (&q->mutex))
248  {
249  return (-1);
250  }
251  }
252  else
253  pthread_mutex_lock (&q->mutex);
254 
255  if (PREDICT_FALSE (q->cursize == q->maxsize))
256  {
257  if (nowait)
258  {
259  pthread_mutex_unlock (&q->mutex);
260  return (-2);
261  }
262  while (q->cursize == q->maxsize)
264  }
265 
266  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
267  clib_memcpy (tailp, elem, q->elsize);
268 
269  q->tail++;
270  q->cursize++;
271 
272  need_broadcast = (q->cursize == 1);
273 
274  if (q->tail == q->maxsize)
275  q->tail = 0;
276 
277  if (need_broadcast)
278  svm_queue_send_signal (q, 1);
279 
280  pthread_mutex_unlock (&q->mutex);
281 
282  return 0;
283 }
284 
285 /*
286  * svm_queue_add2
287  */
288 int
289 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
290 {
291  i8 *tailp;
292  int need_broadcast = 0;
293 
294  if (nowait)
295  {
296  /* zero on success */
297  if (pthread_mutex_trylock (&q->mutex))
298  {
299  return (-1);
300  }
301  }
302  else
303  pthread_mutex_lock (&q->mutex);
304 
305  if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
306  {
307  if (nowait)
308  {
309  pthread_mutex_unlock (&q->mutex);
310  return (-2);
311  }
312  while (q->cursize + 1 == q->maxsize)
314  }
315 
316  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
317  clib_memcpy (tailp, elem, q->elsize);
318 
319  q->tail++;
320  q->cursize++;
321 
322  if (q->tail == q->maxsize)
323  q->tail = 0;
324 
325  need_broadcast = (q->cursize == 1);
326 
327  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
328  clib_memcpy (tailp, elem2, q->elsize);
329 
330  q->tail++;
331  q->cursize++;
332 
333  if (q->tail == q->maxsize)
334  q->tail = 0;
335 
336  if (need_broadcast)
337  svm_queue_send_signal (q, 1);
338 
339  pthread_mutex_unlock (&q->mutex);
340 
341  return 0;
342 }
343 
344 /*
345  * svm_queue_sub
346  */
347 int
349  u32 time)
350 {
351  i8 *headp;
352  int need_broadcast = 0;
353  int rc = 0;
354 
355  if (cond == SVM_Q_NOWAIT)
356  {
357  /* zero on success */
358  if (pthread_mutex_trylock (&q->mutex))
359  {
360  return (-1);
361  }
362  }
363  else
364  pthread_mutex_lock (&q->mutex);
365 
366  if (PREDICT_FALSE (q->cursize == 0))
367  {
368  if (cond == SVM_Q_NOWAIT)
369  {
370  pthread_mutex_unlock (&q->mutex);
371  return (-2);
372  }
373  else if (cond == SVM_Q_TIMEDWAIT)
374  {
375  while (q->cursize == 0 && rc == 0)
376  rc = svm_queue_timedwait_inline (q, time);
377 
378  if (rc == ETIMEDOUT)
379  {
380  pthread_mutex_unlock (&q->mutex);
381  return ETIMEDOUT;
382  }
383  }
384  else
385  {
386  while (q->cursize == 0)
388  }
389  }
390 
391  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
392  clib_memcpy (elem, headp, q->elsize);
393 
394  q->head++;
395  /* $$$$ JFC shouldn't this be == 0? */
396  if (q->cursize == q->maxsize)
397  need_broadcast = 1;
398 
399  q->cursize--;
400 
401  if (q->head == q->maxsize)
402  q->head = 0;
403 
404  if (need_broadcast)
405  svm_queue_send_signal (q, 0);
406 
407  pthread_mutex_unlock (&q->mutex);
408 
409  return 0;
410 }
411 
412 int
414 {
415  int need_broadcast;
416  i8 *headp;
417 
418  pthread_mutex_lock (&q->mutex);
419  if (q->cursize == 0)
420  {
421  pthread_mutex_unlock (&q->mutex);
422  return -1;
423  }
424 
425  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
426  clib_memcpy (elem, headp, q->elsize);
427 
428  q->head++;
429  need_broadcast = (q->cursize == q->maxsize / 2);
430  q->cursize--;
431 
432  if (PREDICT_FALSE (q->head == q->maxsize))
433  q->head = 0;
434  pthread_mutex_unlock (&q->mutex);
435 
436  if (need_broadcast)
437  svm_queue_send_signal (q, 0);
438 
439  return 0;
440 }
441 
442 int
444 {
445  i8 *headp;
446 
447  if (PREDICT_FALSE (q->cursize == 0))
448  {
449  while (q->cursize == 0)
450  ;
451  }
452 
453  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
454  clib_memcpy (elem, headp, q->elsize);
455 
456  q->head = (q->head + 1) % q->maxsize;
457  q->cursize--;
458 
459  return 0;
460 }
461 
462 void
464 {
465  q->producer_evtfd = fd;
466 }
467 
468 void
470 {
471  q->consumer_evtfd = fd;
472 }
473 
474 /*
475  * fd.io coding-style-patch-verification: ON
476  *
477  * Local Variables:
478  * eval: (c-set-style "gnu")
479  * End:
480  */
#define CLIB_PAUSE()
Definition: lock.h:22
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:220
int svm_queue_is_full(svm_queue_t *q)
Definition: queue.c:109
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:239
static void svm_queue_send_signal(svm_queue_t *q, u8 is_prod)
Definition: queue.c:115
unsigned long u64
Definition: types.h:89
void svm_queue_set_producer_event_fd(svm_queue_t *q, int fd)
Set producer&#39;s event fd.
Definition: queue.c:463
static int svm_queue_timedwait_inline(svm_queue_t *q, double timeout)
Definition: queue.c:157
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
Definition: queue.c:413
unsigned char u8
Definition: types.h:56
void svm_queue_unlock(svm_queue_t *q)
Definition: queue.c:103
blocking call, returns on signal or time-out - best used in combination with condvars, with eventfds we don&#39;t yield the cpu
Definition: queue.h:46
memset(h->entries, 0, sizeof(h->entries[0])*entries)
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:348
static f64 unix_time_now(void)
Definition: time.h:238
unsigned int u32
Definition: types.h:88
void svm_queue_wait(svm_queue_t *q)
Wait for queue event.
Definition: queue.c:151
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
Definition: queue.c:289
void svm_queue_lock(svm_queue_t *q)
Definition: queue.c:97
#define PREDICT_FALSE(x)
Definition: clib.h:107
signed char i8
Definition: types.h:45
svm_q_conditional_wait_t
Definition: queue.h:40
#define clib_memcpy(a, b, c)
Definition: string.h:75
static void svm_queue_wait_inline(svm_queue_t *q)
Definition: queue.c:132
void svm_queue_free(svm_queue_t *q)
Definition: queue.c:89
void svm_queue_set_consumer_event_fd(svm_queue_t *q, int fd)
Set consumer&#39;s event fd.
Definition: queue.c:469
#define ASSERT(truth)
svm_queue_t * svm_queue_alloc_and_init(int nels, int elsize, int consumer_pid)
Allocate and initialize svm queue.
Definition: queue.c:72
static void clib_mem_free(void *p)
Definition: mem.h:205
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:443
#define clib_unix_warning(format, args...)
Definition: error.h:68
svm_queue_t * svm_queue_init(void *base, int nels, int elsize)
Definition: queue.c:33
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
Definition: queue.c:192
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:140
int svm_queue_timedwait(svm_queue_t *q, double timeout)
Timed wait for queue event.
Definition: queue.c:183
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:59
non-blocking call - works with both condvar and eventfd signaling
Definition: queue.h:44