FD.io VPP  v17.04-9-g99c0734
Vector Packet Processing
unix_shared_memory_queue.c
Go to the documentation of this file.
1 /*
2  *------------------------------------------------------------------
3  * unix_shared_memory_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19 
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <vppinfra/mem.h>
25 #include <vppinfra/format.h>
26 #include <vppinfra/cache.h>
28 #include <signal.h>
29 
30 /*
31  * unix_shared_memory_queue_init
32  *
33  * nels = number of elements on the queue
34  * elsize = element size, presumably 4 and cacheline-size will
35  * be popular choices.
36  * pid = consumer pid
37  *
38  * The idea is to call this function in the queue consumer,
39  * and e-mail the queue pointer to the producer(s).
40  *
41  * The vpp process / main thread allocates one of these
42  * at startup; its main input queue. The vpp main input queue
43  * has a pointer to it in the shared memory segment header.
44  *
45  * You probably want to be on an svm data heap before calling this
46  * function.
47  */
50  int elsize,
51  int consumer_pid,
52  int signal_when_queue_non_empty)
53 {
55  pthread_mutexattr_t attr;
56  pthread_condattr_t cattr;
57 
59  + nels * elsize, CLIB_CACHE_LINE_BYTES);
60  memset (q, 0, sizeof (*q));
61 
62  q->elsize = elsize;
63  q->maxsize = nels;
64  q->consumer_pid = consumer_pid;
65  q->signal_when_queue_non_empty = signal_when_queue_non_empty;
66 
67  memset (&attr, 0, sizeof (attr));
68  memset (&cattr, 0, sizeof (cattr));
69 
70  if (pthread_mutexattr_init (&attr))
71  clib_unix_warning ("mutexattr_init");
72  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
73  clib_unix_warning ("pthread_mutexattr_setpshared");
74  if (pthread_mutex_init (&q->mutex, &attr))
75  clib_unix_warning ("mutex_init");
76  if (pthread_mutexattr_destroy (&attr))
77  clib_unix_warning ("mutexattr_destroy");
78  if (pthread_condattr_init (&cattr))
79  clib_unix_warning ("condattr_init");
80  /* prints funny-looking messages in the Linux target */
81  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
82  clib_unix_warning ("condattr_setpshared");
83  if (pthread_cond_init (&q->condvar, &cattr))
84  clib_unix_warning ("cond_init1");
85  if (pthread_condattr_destroy (&cattr))
86  clib_unix_warning ("cond_init2");
87 
88  return (q);
89 }
90 
91 /*
92  * unix_shared_memory_queue_free
93  */
94 void
96 {
97  (void) pthread_mutex_destroy (&q->mutex);
98  (void) pthread_cond_destroy (&q->condvar);
99  clib_mem_free (q);
100 }
101 
102 void
104 {
105  pthread_mutex_lock (&q->mutex);
106 }
107 
108 void
110 {
111  pthread_mutex_unlock (&q->mutex);
112 }
113 
114 int
116 {
117  return q->cursize == q->maxsize;
118 }
119 
120 /*
121  * unix_shared_memory_queue_add_nolock
122  */
123 int
125  u8 * elem)
126 {
127  i8 *tailp;
128  int need_broadcast = 0;
129 
130  if (PREDICT_FALSE (q->cursize == q->maxsize))
131  {
132  while (q->cursize == q->maxsize)
133  {
134  (void) pthread_cond_wait (&q->condvar, &q->mutex);
135  }
136  }
137 
138  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
139  clib_memcpy (tailp, elem, q->elsize);
140 
141  q->tail++;
142  q->cursize++;
143 
144  need_broadcast = (q->cursize == 1);
145 
146  if (q->tail == q->maxsize)
147  q->tail = 0;
148 
149  if (need_broadcast)
150  {
151  (void) pthread_cond_broadcast (&q->condvar);
152  if (q->signal_when_queue_non_empty)
153  kill (q->consumer_pid, q->signal_when_queue_non_empty);
154  }
155  return 0;
156 }
157 
158 int
160 {
161  i8 *tailp;
162 
163  if (PREDICT_FALSE (q->cursize == q->maxsize))
164  {
165  while (q->cursize == q->maxsize)
166  ;
167  }
168 
169  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
170  clib_memcpy (tailp, elem, q->elsize);
171 
172  q->tail++;
173  q->cursize++;
174 
175  if (q->tail == q->maxsize)
176  q->tail = 0;
177  return 0;
178 }
179 
180 
181 /*
182  * unix_shared_memory_queue_add
183  */
184 int
186  u8 * elem, int nowait)
187 {
188  i8 *tailp;
189  int need_broadcast = 0;
190 
191  if (nowait)
192  {
193  /* zero on success */
194  if (pthread_mutex_trylock (&q->mutex))
195  {
196  return (-1);
197  }
198  }
199  else
200  pthread_mutex_lock (&q->mutex);
201 
202  if (PREDICT_FALSE (q->cursize == q->maxsize))
203  {
204  if (nowait)
205  {
206  pthread_mutex_unlock (&q->mutex);
207  return (-2);
208  }
209  while (q->cursize == q->maxsize)
210  {
211  (void) pthread_cond_wait (&q->condvar, &q->mutex);
212  }
213  }
214 
215  tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
216  clib_memcpy (tailp, elem, q->elsize);
217 
218  q->tail++;
219  q->cursize++;
220 
221  need_broadcast = (q->cursize == 1);
222 
223  if (q->tail == q->maxsize)
224  q->tail = 0;
225 
226  if (need_broadcast)
227  {
228  (void) pthread_cond_broadcast (&q->condvar);
229  if (q->signal_when_queue_non_empty)
230  kill (q->consumer_pid, q->signal_when_queue_non_empty);
231  }
232  pthread_mutex_unlock (&q->mutex);
233 
234  return 0;
235 }
236 
237 /*
238  * unix_shared_memory_queue_sub
239  */
240 int
242  u8 * elem, int nowait)
243 {
244  i8 *headp;
245  int need_broadcast = 0;
246 
247  if (nowait)
248  {
249  /* zero on success */
250  if (pthread_mutex_trylock (&q->mutex))
251  {
252  return (-1);
253  }
254  }
255  else
256  pthread_mutex_lock (&q->mutex);
257 
258  if (PREDICT_FALSE (q->cursize == 0))
259  {
260  if (nowait)
261  {
262  pthread_mutex_unlock (&q->mutex);
263  return (-2);
264  }
265  while (q->cursize == 0)
266  {
267  (void) pthread_cond_wait (&q->condvar, &q->mutex);
268  }
269  }
270 
271  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
272  clib_memcpy (elem, headp, q->elsize);
273 
274  q->head++;
275  /* $$$$ JFC shouldn't this be == 0? */
276  if (q->cursize == q->maxsize)
277  need_broadcast = 1;
278 
279  q->cursize--;
280 
281  if (q->head == q->maxsize)
282  q->head = 0;
283 
284  if (need_broadcast)
285  (void) pthread_cond_broadcast (&q->condvar);
286 
287  pthread_mutex_unlock (&q->mutex);
288 
289  return 0;
290 }
291 
292 int
294 {
295  i8 *headp;
296 
297  if (PREDICT_FALSE (q->cursize == 0))
298  {
299  while (q->cursize == 0)
300  ;
301  }
302 
303  headp = (i8 *) (&q->data[0] + q->elsize * q->head);
304  clib_memcpy (elem, headp, q->elsize);
305 
306  q->head++;
307  q->cursize--;
308 
309  if (q->head == q->maxsize)
310  q->head = 0;
311  return 0;
312 }
313 
314 /*
315  * fd.io coding-style-patch-verification: ON
316  *
317  * Local Variables:
318  * eval: (c-set-style "gnu")
319  * End:
320  */
int unix_shared_memory_queue_is_full(unix_shared_memory_queue_t *q)
void unix_shared_memory_queue_free(unix_shared_memory_queue_t *q)
char i8
Definition: types.h:45
int unix_shared_memory_queue_add_nolock(unix_shared_memory_queue_t *q, u8 *elem)
int unix_shared_memory_queue_add(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define PREDICT_FALSE(x)
Definition: clib.h:97
int unix_shared_memory_queue_sub_raw(unix_shared_memory_queue_t *q, u8 *elem)
void unix_shared_memory_queue_lock(unix_shared_memory_queue_t *q)
int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, u8 *elem, int nowait)
#define clib_memcpy(a, b, c)
Definition: string.h:69
unix_shared_memory_queue_t * unix_shared_memory_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
static void clib_mem_free(void *p)
Definition: mem.h:176
int unix_shared_memory_queue_add_raw(unix_shared_memory_queue_t *q, u8 *elem)
void unix_shared_memory_queue_unlock(unix_shared_memory_queue_t *q)
unsigned char u8
Definition: types.h:56
#define clib_unix_warning(format, args...)
Definition: error.h:68
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:117
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:67
struct _unix_shared_memory_queue unix_shared_memory_queue_t