
martin at varnish-software
Jan 22, 2012, 9:53 AM
Post #1 of 1
(42 views)
Permalink
|
|
[PATCH 09/25] Add stream data synchronization functions to cache_busyobj.c in preparation of threaded streaming.
|
|
VBO_StreamData is called by the fetch to update the busyobj with how much data is available. VBO_StreamSync is called by the dilvery to update it's local stream_ctx struct with the new pointers. VBO_StreamStopped signals object fetch has finished. VBO_StreamWait waits for the streaming thread to finish looking at other thread's workspace. --- bin/varnishd/cache/cache.h | 16 ++++++ bin/varnishd/cache/cache_busyobj.c | 91 ++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 0 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index 0bc3f59..0dd8ef4 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -271,6 +271,13 @@ struct stream_ctx { /* First byte of storage if we free it as we go (pass) */ ssize_t stream_front; + struct storage *stream_frontchunk; + + /* Max byte we can stream */ + ssize_t stream_max; + + /* Backend fetch has finished */ + unsigned stream_stopped; }; /*--------------------------------------------------------------------*/ @@ -530,6 +537,11 @@ struct busyobj { unsigned do_gzip; unsigned do_gunzip; unsigned do_stream; + + /* Stream stuff */ + ssize_t stream_max; + struct storage *stream_frontchunk; + unsigned stream_stopped; }; /* Object structure --------------------------------------------------*/ @@ -738,6 +750,10 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk); struct busyobj *VBO_RefBusyObj(struct busyobj *busyobj); unsigned VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj); void VBO_Free(struct vbo **vbo); +void VBO_StreamStopped(struct busyobj *busyobj); +void VBO_StreamWait(struct busyobj *busyobj); +void VBO_StreamData(struct busyobj *busyobj); +void VBO_StreamSync(struct worker *wrk); /* cache_center.c [CNT] */ void CNT_Session(struct sess *sp); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index 423df93..7651999 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -208,3 +208,94 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo) return (r); } + +/* Signal that the fetch thread has stopped */ +void +VBO_StreamStopped(struct busyobj *busyobj) +{ + if (!busyobj->use_locks) { + busyobj->stream_stopped = 1; + return; + } + Lck_Lock(&busyobj->vbo->mtx); + busyobj->stream_stopped = 1; + AZ(pthread_cond_broadcast(&busyobj->vbo->cond)); + Lck_Unlock(&busyobj->vbo->mtx); +} + +/* Wait for the fetch thread to finish reading the pipeline buffer */ +void +VBO_StreamWait(struct busyobj *busyobj) +{ + if (!busyobj->use_locks) + return; + Lck_Lock(&busyobj->vbo->mtx); + while (busyobj->htc.pipeline.b != NULL && busyobj->stream_stopped == 0) + Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL); + Lck_Unlock(&busyobj->vbo->mtx); +} + +/* Signal additional data available */ +void +VBO_StreamData(struct busyobj *busyobj) +{ + CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); + CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC); + + if (busyobj->use_locks) + Lck_Lock(&busyobj->vbo->mtx); + assert(busyobj->fetch_obj->len >= busyobj->stream_max); + if (busyobj->fetch_obj->len > busyobj->stream_max) { + busyobj->stream_max = busyobj->fetch_obj->len; + if (busyobj->use_locks) + AZ(pthread_cond_broadcast(&busyobj->vbo->cond)); + } + if (busyobj->use_locks) + Lck_Unlock(&busyobj->vbo->mtx); +} + +/* Sync the client's stream_ctx with the busyobj, and block on no more + * data available */ +void +VBO_StreamSync(struct worker *wrk) +{ + struct busyobj *busyobj; + struct stream_ctx *sctx; + + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); + busyobj = wrk->busyobj; + CHECK_OBJ_NOTNULL(wrk->sp, SESS_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sp->req, REQ_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + sctx = wrk->sctx; + + if (busyobj->use_locks) + Lck_Lock(&busyobj->vbo->mtx); + assert(sctx->stream_max <= busyobj->stream_max); + + if (wrk->sp->req->obj->objcore == NULL || + (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) { + /* Give notice to backend fetch that we are finished + * with all chunks before this one */ + busyobj->stream_frontchunk = sctx->stream_frontchunk; + } + + sctx->stream_stopped = busyobj->stream_stopped; + sctx->stream_max = busyobj->stream_max; + + if (busyobj->use_locks && !sctx->stream_stopped && + sctx->stream_next == sctx->stream_max) { + while (!busyobj->stream_stopped && + sctx->stream_max == busyobj->stream_max) { + Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, + NULL); + } + sctx->stream_stopped = busyobj->stream_stopped; + sctx->stream_max = busyobj->stream_max; + } + + if (busyobj->use_locks) + Lck_Unlock(&busyobj->vbo->mtx); +} -- 1.7.4.1 _______________________________________________ varnish-dev mailing list varnish-dev [at] varnish-cache https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev
|