From 3ba48dcfb5f4231dfa3147d5d63d9dda9d993ad5 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland Date: Thu, 15 Dec 2011 15:09:47 +0100 Subject: [PATCH 07/10] Rework RES_StreamPoll to use the VBO_StreamData and VBO_StreamSync functions, and make the object data access thread safe (given proper locking in VBO_StreamData and VBO_StreamSync that is coming in a later patch). --- bin/varnishd/cache/cache.h | 4 +- bin/varnishd/cache/cache_response.c | 83 +++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 34 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index fc43010..320f9ca 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -271,7 +271,7 @@ struct stream_ctx { /* Next byte we will take from storage */ ssize_t stream_next; - /* First byte of storage if we free it as we go (pass) */ + /* Point in storage chunk chain we have reached */ ssize_t stream_front; struct storage *stream_frontchunk; @@ -950,7 +950,7 @@ void RES_BuildHttp(const struct sess *sp); void RES_WriteObj(struct sess *sp); void RES_StreamStart(struct sess *sp); void RES_StreamEnd(struct sess *sp); -void RES_StreamPoll(struct worker *); +void RES_StreamPoll(struct worker *wrk); /* cache_vary.c */ struct vsb *VRY_Create(const struct sess *sp, const struct http *hp); diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c index 7c63d51..5038395 100644 --- a/bin/varnishd/cache/cache_response.c +++ b/bin/varnishd/cache/cache_response.c @@ -357,55 +357,74 @@ RES_StreamStart(struct sess *sp) } void -RES_StreamPoll(struct worker *w) +RES_StreamPoll(struct worker *wrk) { struct stream_ctx *sctx; struct storage *st; - ssize_t l, l2; + struct object *fetch_obj; + ssize_t l, l2, stlen; void *ptr; - CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); - CHECK_OBJ_NOTNULL(w->busyobj->fetch_obj, OBJECT_MAGIC); - sctx = w->sctx; + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); + sctx = wrk->sctx; CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - if (w->busyobj->fetch_obj->len == sctx->stream_next) + + VBO_StreamData(wrk->busyobj); + VBO_StreamSync(wrk); + + if (sctx->stream_max == sctx->stream_next) return; - assert(w->busyobj->fetch_obj->len > sctx->stream_next); + assert(sctx->stream_max > sctx->stream_next); + l = sctx->stream_front; - VTAILQ_FOREACH(st, &w->busyobj->fetch_obj->store, list) { - if (st->len + l <= sctx->stream_next) { - l += st->len; + st = sctx->stream_frontchunk; + if (st == NULL) + st = VTAILQ_FIRST(&wrk->obj->store); + for (; st != NULL; st = VTAILQ_NEXT(st, list)) { + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + sctx->stream_front = l; + sctx->stream_frontchunk = st; + stlen = st->len; + if (l + stlen <= sctx->stream_next) { + l += stlen; continue; } - l2 = st->len + l - sctx->stream_next; + assert(l + stlen > sctx->stream_next); + l2 = l + stlen - sctx->stream_next; + if (sctx->stream_next + l2 > sctx->stream_max) + l2 = sctx->stream_max - sctx->stream_next; ptr = st->ptr + (sctx->stream_next - l); - if (w->res_mode & RES_GUNZIP) { - (void)VGZ_WrwGunzip(w, sctx->vgz, ptr, l2, + if (wrk->res_mode & RES_GUNZIP) { + (void)VGZ_WrwGunzip(wrk, sctx->vgz, ptr, l2, sctx->obuf, sctx->obuf_len, &sctx->obuf_ptr); } else { - (void)WRW_Write(w, ptr, l2); + (void)WRW_Write(wrk, ptr, l2); } - l += st->len; sctx->stream_next += l2; + if (sctx->stream_next == sctx->stream_max) + break; + AN(VTAILQ_NEXT(st, list)); + l += st->len; } - if (!(w->res_mode & RES_GUNZIP)) - (void)WRW_Flush(w); + if (!(wrk->res_mode & RES_GUNZIP)) + (void)WRW_Flush(wrk); - if (w->busyobj->fetch_obj->objcore == NULL || - (w->busyobj->fetch_obj->objcore->flags & OC_F_PASS)) { - /* - * This is a pass object, release storage as soon as we - * have delivered it. - */ - while (1) { - st = VTAILQ_FIRST(&w->busyobj->fetch_obj->store); - if (st == NULL || - sctx->stream_front + st->len > sctx->stream_next) - break; - VTAILQ_REMOVE(&w->busyobj->fetch_obj->store, st, list); - sctx->stream_front += st->len; - STV_free(st); - } + if (wrk->busyobj->stream_frontchunk == NULL) + return; + + /* It's a pass - remove chunks already delivered */ + fetch_obj = wrk->busyobj->fetch_obj; + CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); + assert(fetch_obj->objcore == NULL || + (fetch_obj->objcore->flags & OC_F_PASS)); + while (1) { + st = VTAILQ_FIRST(&fetch_obj->store); + if (st == NULL || st == wrk->busyobj->stream_frontchunk) + break; + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + VTAILQ_REMOVE(&fetch_obj->store, st, list); + STV_free(st); } } -- 1.7.4.1