jcs's openbsd hax
openbsd
1/* $OpenBSD: evbuffer.c,v 1.17 2014/10/30 16:45:37 bluhm Exp $ */
2
3/*
4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include <sys/types.h>
31#include <sys/time.h>
32
33#include <errno.h>
34#include <stdio.h>
35#include <stdlib.h>
36#include <string.h>
37#include <stdarg.h>
38
39#include "event.h"
40
41/* prototypes */
42
43void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
44
45static int
46bufferevent_add(struct event *ev, int timeout)
47{
48 struct timeval tv, *ptv = NULL;
49
50 if (timeout) {
51 timerclear(&tv);
52 tv.tv_sec = timeout;
53 ptv = &tv;
54 }
55
56 return (event_add(ev, ptv));
57}
58
59/*
60 * This callback is executed when the size of the input buffer changes.
61 * We use it to apply back pressure on the reading side.
62 */
63
64void
65bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
66 void *arg) {
67 struct bufferevent *bufev = arg;
68 /*
69 * If we are below the watermark then reschedule reading if it's
70 * still enabled.
71 */
72 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
73 evbuffer_setcb(buf, NULL, NULL);
74
75 if (bufev->enabled & EV_READ)
76 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
77 }
78}
79
80static void
81bufferevent_readcb(int fd, short event, void *arg)
82{
83 struct bufferevent *bufev = arg;
84 int res = 0;
85 short what = EVBUFFER_READ;
86 size_t len;
87 int howmuch = -1;
88
89 if (event == EV_TIMEOUT) {
90 what |= EVBUFFER_TIMEOUT;
91 goto error;
92 }
93
94 /*
95 * If we have a high watermark configured then we don't want to
96 * read more data than would make us reach the watermark.
97 */
98 if (bufev->wm_read.high != 0) {
99 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
100 /* we might have lowered the watermark, stop reading */
101 if (howmuch <= 0) {
102 struct evbuffer *buf = bufev->input;
103 event_del(&bufev->ev_read);
104 evbuffer_setcb(buf,
105 bufferevent_read_pressure_cb, bufev);
106 return;
107 }
108 }
109
110 res = evbuffer_read(bufev->input, fd, howmuch);
111 if (res == -1) {
112 if (errno == EAGAIN || errno == EINTR)
113 goto reschedule;
114 /* error case */
115 what |= EVBUFFER_ERROR;
116 } else if (res == 0) {
117 /* eof case */
118 what |= EVBUFFER_EOF;
119 }
120
121 if (res <= 0)
122 goto error;
123
124 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
125
126 /* See if this callbacks meets the water marks */
127 len = EVBUFFER_LENGTH(bufev->input);
128 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
129 return;
130 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
131 struct evbuffer *buf = bufev->input;
132 event_del(&bufev->ev_read);
133
134 /* Now schedule a callback for us when the buffer changes */
135 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
136 }
137
138 /* Invoke the user callback - must always be called last */
139 if (bufev->readcb != NULL)
140 (*bufev->readcb)(bufev, bufev->cbarg);
141 return;
142
143 reschedule:
144 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
145 return;
146
147 error:
148 (*bufev->errorcb)(bufev, what, bufev->cbarg);
149}
150
151static void
152bufferevent_writecb(int fd, short event, void *arg)
153{
154 struct bufferevent *bufev = arg;
155 int res = 0;
156 short what = EVBUFFER_WRITE;
157
158 if (event == EV_TIMEOUT) {
159 what |= EVBUFFER_TIMEOUT;
160 goto error;
161 }
162
163 if (EVBUFFER_LENGTH(bufev->output)) {
164 res = evbuffer_write(bufev->output, fd);
165 if (res == -1) {
166 if (errno == EAGAIN ||
167 errno == EINTR ||
168 errno == EINPROGRESS)
169 goto reschedule;
170 /* error case */
171 what |= EVBUFFER_ERROR;
172 } else if (res == 0) {
173 /* eof case */
174 what |= EVBUFFER_EOF;
175 }
176 if (res <= 0)
177 goto error;
178 }
179
180 if (EVBUFFER_LENGTH(bufev->output) != 0)
181 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
182
183 /*
184 * Invoke the user callback if our buffer is drained or below the
185 * low watermark.
186 */
187 if (bufev->writecb != NULL &&
188 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
189 (*bufev->writecb)(bufev, bufev->cbarg);
190
191 return;
192
193 reschedule:
194 if (EVBUFFER_LENGTH(bufev->output) != 0)
195 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
196 return;
197
198 error:
199 (*bufev->errorcb)(bufev, what, bufev->cbarg);
200}
201
202/*
203 * Create a new buffered event object.
204 *
205 * The read callback is invoked whenever we read new data.
206 * The write callback is invoked whenever the output buffer is drained.
207 * The error callback is invoked on a write/read error or on EOF.
208 *
209 * Both read and write callbacks maybe NULL. The error callback is not
210 * allowed to be NULL and have to be provided always.
211 */
212
213struct bufferevent *
214bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
215 everrorcb errorcb, void *cbarg)
216{
217 struct bufferevent *bufev;
218
219 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
220 return (NULL);
221
222 if ((bufev->input = evbuffer_new()) == NULL) {
223 free(bufev);
224 return (NULL);
225 }
226
227 if ((bufev->output = evbuffer_new()) == NULL) {
228 evbuffer_free(bufev->input);
229 free(bufev);
230 return (NULL);
231 }
232
233 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
234 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
235
236 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
237
238 /*
239 * Set to EV_WRITE so that using bufferevent_write is going to
240 * trigger a callback. Reading needs to be explicitly enabled
241 * because otherwise no data will be available.
242 */
243 bufev->enabled = EV_WRITE;
244
245 return (bufev);
246}
247
248void
249bufferevent_setcb(struct bufferevent *bufev,
250 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
251{
252 bufev->readcb = readcb;
253 bufev->writecb = writecb;
254 bufev->errorcb = errorcb;
255
256 bufev->cbarg = cbarg;
257}
258
259void
260bufferevent_setfd(struct bufferevent *bufev, int fd)
261{
262 event_del(&bufev->ev_read);
263 event_del(&bufev->ev_write);
264
265 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
266 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
267 if (bufev->ev_base != NULL) {
268 event_base_set(bufev->ev_base, &bufev->ev_read);
269 event_base_set(bufev->ev_base, &bufev->ev_write);
270 }
271
272 /* might have to manually trigger event registration */
273}
274
275int
276bufferevent_priority_set(struct bufferevent *bufev, int priority)
277{
278 if (event_priority_set(&bufev->ev_read, priority) == -1)
279 return (-1);
280 if (event_priority_set(&bufev->ev_write, priority) == -1)
281 return (-1);
282
283 return (0);
284}
285
286/* Closing the file descriptor is the responsibility of the caller */
287
288void
289bufferevent_free(struct bufferevent *bufev)
290{
291 event_del(&bufev->ev_read);
292 event_del(&bufev->ev_write);
293
294 evbuffer_free(bufev->input);
295 evbuffer_free(bufev->output);
296
297 free(bufev);
298}
299
300/*
301 * Returns 0 on success;
302 * -1 on failure.
303 */
304
305int
306bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
307{
308 int res;
309
310 res = evbuffer_add(bufev->output, data, size);
311
312 if (res == -1)
313 return (res);
314
315 /* If everything is okay, we need to schedule a write */
316 if (size > 0 && (bufev->enabled & EV_WRITE))
317 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
318
319 return (res);
320}
321
322int
323bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
324{
325 int res;
326
327 res = bufferevent_write(bufev, buf->buffer, buf->off);
328 if (res != -1)
329 evbuffer_drain(buf, buf->off);
330
331 return (res);
332}
333
334size_t
335bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
336{
337 struct evbuffer *buf = bufev->input;
338
339 if (buf->off < size)
340 size = buf->off;
341
342 /* Copy the available data to the user buffer */
343 memcpy(data, buf->buffer, size);
344
345 if (size)
346 evbuffer_drain(buf, size);
347
348 return (size);
349}
350
351int
352bufferevent_enable(struct bufferevent *bufev, short event)
353{
354 if (event & EV_READ) {
355 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
356 return (-1);
357 }
358 if (event & EV_WRITE) {
359 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
360 return (-1);
361 }
362
363 bufev->enabled |= event;
364 return (0);
365}
366
367int
368bufferevent_disable(struct bufferevent *bufev, short event)
369{
370 if (event & EV_READ) {
371 if (event_del(&bufev->ev_read) == -1)
372 return (-1);
373 }
374 if (event & EV_WRITE) {
375 if (event_del(&bufev->ev_write) == -1)
376 return (-1);
377 }
378
379 bufev->enabled &= ~event;
380 return (0);
381}
382
383/*
384 * Sets the read and write timeout for a buffered event.
385 */
386
387void
388bufferevent_settimeout(struct bufferevent *bufev,
389 int timeout_read, int timeout_write) {
390 bufev->timeout_read = timeout_read;
391 bufev->timeout_write = timeout_write;
392
393 if (event_pending(&bufev->ev_read, EV_READ, NULL))
394 bufferevent_add(&bufev->ev_read, timeout_read);
395 if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
396 bufferevent_add(&bufev->ev_write, timeout_write);
397}
398
399/*
400 * Sets the water marks
401 */
402
403void
404bufferevent_setwatermark(struct bufferevent *bufev, short events,
405 size_t lowmark, size_t highmark)
406{
407 if (events & EV_READ) {
408 bufev->wm_read.low = lowmark;
409 bufev->wm_read.high = highmark;
410 }
411
412 if (events & EV_WRITE) {
413 bufev->wm_write.low = lowmark;
414 bufev->wm_write.high = highmark;
415 }
416
417 /* If the watermarks changed then see if we should call read again */
418 bufferevent_read_pressure_cb(bufev->input,
419 0, EVBUFFER_LENGTH(bufev->input), bufev);
420}
421
422int
423bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
424{
425 int res;
426
427 bufev->ev_base = base;
428
429 res = event_base_set(base, &bufev->ev_read);
430 if (res == -1)
431 return (res);
432
433 res = event_base_set(base, &bufev->ev_write);
434 return (res);
435}