From ee4544fc8087f389a3affa98b28ade9a2f4441b4 Mon Sep 17 00:00:00 2001 From: Martin Blix Grydeland Date: Thu, 15 Dec 2011 15:04:56 +0100 Subject: [PATCH 06/10] Add stream data synchronization functions to cache_busyobj.c in preparation of threaded streaming. VBO_StreamData is called by the fetch to update the busyobj with how much data is available. VBO_StreamSync is called by the dilvery to update it's local stream_ctx struct with the new pointers. --- bin/varnishd/cache/cache.h | 10 ++++++++++ bin/varnishd/cache/cache_busyobj.c | 31 +++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 0 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index c265ce4..fc43010 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -273,6 +273,10 @@ struct stream_ctx { /* First byte of storage if we free it as we go (pass) */ ssize_t stream_front; + struct storage *stream_frontchunk; + + /* Max byte we can stream */ + ssize_t stream_max; }; /*--------------------------------------------------------------------*/ @@ -525,6 +529,10 @@ struct busyobj { unsigned do_gzip; unsigned do_gunzip; unsigned do_stream; + + /* Stream stuff */ + ssize_t stream_max; + struct storage *stream_frontchunk; }; /* Object structure --------------------------------------------------*/ @@ -703,6 +711,8 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk); void VBO_RefBusyObj(const struct busyobj *busyobj); void VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj); void VBO_Free(struct vbo **vbo); +void VBO_StreamData(struct busyobj *busyobj); +void VBO_StreamSync(struct worker *wrk); /* cache_center.c [CNT] */ void CNT_Session(struct sess *sp); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index 9080aa3..988ae07 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -200,3 +200,34 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) } } } + +/* Signal additional data available */ +void +VBO_StreamData(struct busyobj *busyobj) +{ + CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); + CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC); + + assert(busyobj->fetch_obj->len >= busyobj->stream_max); + if (busyobj->fetch_obj->len > busyobj->stream_max) { + busyobj->stream_max = busyobj->fetch_obj->len; + } +} + +/* Sync the client's stream_ctx with the busyobj */ +void +VBO_StreamSync(struct worker *wrk) +{ + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); + CHECK_OBJ_NOTNULL(wrk->obj, OBJECT_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + + wrk->sctx->stream_max = wrk->busyobj->stream_max; + if (wrk->obj->objcore == NULL || + (wrk->obj->objcore->flags & OC_F_PASS)) { + /* Give notice to backend fetch that we are finished + * with all chunks before this one */ + wrk->busyobj->stream_frontchunk = wrk->sctx->stream_frontchunk; + } +} -- 1.7.4.1