From 2741d2de622a1ba702b5abb7fb1e028c8867fa0b Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland Date: Thu, 15 Dec 2011 15:09:47 +0100 Subject: [PATCH 08/14] Rework RES_StreamPoll to use the VBO_StreamData and VBO_StreamSync functions, and make the object data access thread safe (given proper locking in VBO_StreamData and VBO_StreamSync that is coming in a later patch). --- bin/varnishd/cache/cache.h | 4 +- bin/varnishd/cache/cache_busyobj.c | 8 +++-- bin/varnishd/cache/cache_response.c | 67 ++++++++++++++++++++++------------ 3 files changed, 50 insertions(+), 29 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index b7741a4..56133fb 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -269,7 +269,7 @@ struct stream_ctx { /* Next byte we will take from storage */ ssize_t stream_next; - /* First byte of storage if we free it as we go (pass) */ + /* Point in storage chunk chain we have reached */ ssize_t stream_front; struct storage *stream_frontchunk; @@ -986,7 +986,7 @@ void RES_BuildHttp(const struct sess *sp); void RES_WriteObj(struct sess *sp); void RES_StreamStart(struct sess *sp); void RES_StreamEnd(struct sess *sp); -void RES_StreamPoll(struct worker *); +void RES_StreamPoll(struct worker *wrk); /* cache_vary.c */ struct vsb *VRY_Create(const struct sess *sp, const struct http *hp); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index b90a32a..e6064a2 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -222,12 +222,14 @@ VBO_StreamSync(struct worker *wrk) { CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); - CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sp, SESS_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sp->req, REQ_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC); CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); wrk->sctx->stream_max = wrk->busyobj->stream_max; - if (wrk->obj->objcore == NULL || - (wrk->obj->objcore->flags & OC_F_PASS)) { + if (wrk->sp->req->obj->objcore == NULL || + (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) { /* Give notice to backend fetch that we are finished * with all chunks before this one */ wrk->busyobj->stream_frontchunk = wrk->sctx->stream_frontchunk; diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c index fc94c22..97497ac 100644 --- a/bin/varnishd/cache/cache_response.c +++ b/bin/varnishd/cache/cache_response.c @@ -363,23 +363,39 @@ RES_StreamPoll(struct worker *wrk) { struct stream_ctx *sctx; struct storage *st; - ssize_t l, l2; + struct object *fetch_obj; + ssize_t l, l2, stlen; void *ptr; CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); - CHECK_OBJ_NOTNULL(wrk->busyobj->fetch_obj, OBJECT_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC); sctx = wrk->sctx; CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - if (wrk->busyobj->fetch_obj->len == sctx->stream_next) + + VBO_StreamData(wrk->busyobj); + VBO_StreamSync(wrk); + + if (sctx->stream_max == sctx->stream_next) return; - assert(wrk->busyobj->fetch_obj->len > sctx->stream_next); + assert(sctx->stream_max > sctx->stream_next); + l = sctx->stream_front; - VTAILQ_FOREACH(st, &wrk->busyobj->fetch_obj->store, list) { - if (st->len + l <= sctx->stream_next) { - l += st->len; + st = sctx->stream_frontchunk; + if (st == NULL) + st = VTAILQ_FIRST(&wrk->sp->req->obj->store); + for (; st != NULL; st = VTAILQ_NEXT(st, list)) { + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + sctx->stream_front = l; + sctx->stream_frontchunk = st; + stlen = st->len; + if (l + stlen <= sctx->stream_next) { + l += stlen; continue; } - l2 = st->len + l - sctx->stream_next; + assert(l + stlen > sctx->stream_next); + l2 = l + stlen - sctx->stream_next; + if (sctx->stream_next + l2 > sctx->stream_max) + l2 = sctx->stream_max - sctx->stream_next; ptr = st->ptr + (sctx->stream_next - l); if (wrk->res_mode & RES_GUNZIP) { (void)VGZ_WrwGunzip(wrk, sctx->vgz, ptr, l2, @@ -387,27 +403,30 @@ RES_StreamPoll(struct worker *wrk) } else { (void)WRW_Write(wrk, ptr, l2); } - l += st->len; sctx->stream_next += l2; + if (sctx->stream_next == sctx->stream_max) + break; + AN(VTAILQ_NEXT(st, list)); + l += st->len; } if (!(wrk->res_mode & RES_GUNZIP)) (void)WRW_Flush(wrk); - if (wrk->busyobj->fetch_obj->objcore == NULL || - (wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS)) { - /* - * This is a pass object, release storage as soon as we - * have delivered it. - */ - while (1) { - st = VTAILQ_FIRST(&wrk->busyobj->fetch_obj->store); - if (st == NULL || - sctx->stream_front + st->len > sctx->stream_next) - break; - VTAILQ_REMOVE(&wrk->busyobj->fetch_obj->store, st, list); - sctx->stream_front += st->len; - STV_free(st); - } + if (wrk->busyobj->stream_frontchunk == NULL) + return; + + /* It's a pass - remove chunks already delivered */ + fetch_obj = wrk->busyobj->fetch_obj; + CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); + assert(fetch_obj->objcore == NULL || + (fetch_obj->objcore->flags & OC_F_PASS)); + while (1) { + st = VTAILQ_FIRST(&fetch_obj->store); + if (st == NULL || st == wrk->busyobj->stream_frontchunk) + break; + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + VTAILQ_REMOVE(&fetch_obj->store, st, list); + STV_free(st); } } -- 1.7.4.1