
martin at varnish-software
Jan 22, 2012, 9:53 AM
Post #1 of 1
(58 views)
Permalink
|
|
[PATCH 22/25] Implement stream_pass_bufsize throttling of streaming in pass mode
|
|
--- bin/varnishd/cache/cache.h | 3 +- bin/varnishd/cache/cache_busyobj.c | 5 ++- bin/varnishd/cache/cache_response.c | 67 +++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index fe65dbd..a64a0ae 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -541,7 +541,8 @@ struct busyobj { /* Stream stuff */ ssize_t stream_max; - struct storage *stream_frontchunk; + volatile ssize_t stream_next; + volatile struct storage *stream_frontchunk; unsigned stream_stopped; ssize_t stream_pass_bufsize; }; diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index b39c170..8b97d4a 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -295,14 +295,15 @@ VBO_StreamSync(struct worker *wrk) (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) { /* Give notice to backend fetch that we are finished * with all chunks before this one */ + assert(busyobj->stream_next <= sctx->stream_next); + busyobj->stream_next = sctx->stream_next; busyobj->stream_frontchunk = sctx->stream_frontchunk; } sctx->stream_stopped = busyobj->stream_stopped; sctx->stream_max = busyobj->stream_max; - if (busyobj->use_locks && !sctx->stream_stopped && - sctx->stream_next == sctx->stream_max) { + if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) { while (!busyobj->stream_stopped && sctx->stream_max == busyobj->stream_max) { Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c index 6bd4da7..bc7f122 100644 --- a/bin/varnishd/cache/cache_response.c +++ b/bin/varnishd/cache/cache_response.c @@ -459,36 +459,51 @@ RES_StreamPoll(struct worker *wrk) { struct object *fetch_obj; struct storage *st; + struct busyobj *bo; + int pass = 0; - CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); - - VBO_StreamData(wrk->busyobj); - if (wrk->busyobj->do_stream_flipflop == 1) { - AN(wrk->sctx); - /* MBGXXX: Loop around waiting for the lag behind to - * be less than some configurable size, to keep the - * cache memory usage low (this for streaming - * extremely large objects with pass) */ - VBO_StreamSync(wrk); - RES_StreamWrite(wrk->sp); - } - - if (wrk->busyobj->stream_frontchunk == NULL) - return; - - /* It's a pass - remove chunks already delivered. Should be OK - * to do lock-free, as we are not fiddling pointers of any - * storage chunk passed busyobj->stream_frontchunk */ + bo = wrk->busyobj; + CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); fetch_obj = wrk->busyobj->fetch_obj; CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); - assert(fetch_obj->objcore == NULL || - (fetch_obj->objcore->flags & OC_F_PASS)); + + if (fetch_obj->objcore == NULL || fetch_obj->objcore->flags & OC_F_PASS) + pass = 1; + + VBO_StreamData(bo); while (1) { - st = VTAILQ_FIRST(&fetch_obj->store); - if (st == NULL || st == wrk->busyobj->stream_frontchunk) + if (bo->do_stream_flipflop == 1) { + CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + VBO_StreamSync(wrk); + RES_StreamWrite(wrk->sp); + } + + if (bo->stream_frontchunk != NULL) { + /* It's a pass - remove chunks already + * delivered. Should be OK to do lock-free, as + * we are not fiddling pointers of any storage + * chunk beyond busyobj->stream_frontchunk */ + AN(pass); + while (1) { + st = VTAILQ_FIRST(&fetch_obj->store); + if (st == NULL || + st == bo->stream_frontchunk) + break; + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + VTAILQ_REMOVE(&fetch_obj->store, st, list); + STV_free(st); + } + } + + if (pass == 0 || bo->stream_pass_bufsize == 0 || + bo->stream_max - bo->stream_next < bo->stream_pass_bufsize) break; - CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); - VTAILQ_REMOVE(&fetch_obj->store, st, list); - STV_free(st); + + /* Loop around waiting for the lag behind to be less + * than some configurable size, to keep the cache + * memory usage low (this for streaming large objects + * with pass) */ + if (bo->do_stream_flipflop == 0) + VTIM_sleep(0.1); } } -- 1.7.4.1 _______________________________________________ varnish-dev mailing list varnish-dev [at] varnish-cache https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev
|