
martin at varnish-software
Jan 22, 2012, 9:53 AM
Post #1 of 1
(46 views)
Permalink
|
|
[PATCH 12/25] Use background thread fetching when streaming
|
|
--- bin/varnishd/cache/cache.h | 5 ++ bin/varnishd/cache/cache_center.c | 96 +++++++++++++++++++++++++++++----- bin/varnishd/cache/cache_response.c | 84 ++++++++++++++++++++++-------- 3 files changed, 149 insertions(+), 36 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index da06965..880f5e3 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -537,6 +537,7 @@ struct busyobj { unsigned do_gzip; unsigned do_gunzip; unsigned do_stream; + unsigned do_stream_flipflop; /* Stream stuff */ ssize_t stream_max; @@ -793,6 +794,8 @@ int FetchError(struct worker *w, const char *error); int FetchError2(struct worker *w, const char *error, const char *more); int FetchHdr(struct sess *sp, int need_host_hdr); int FetchBody(struct worker *w, struct busyobj *bo); +void FetchBodyBackground(struct sess *sp, struct busyobj *bo); +void FetchBodyWait(struct busyobj *bo); int FetchReqBody(const struct sess *sp); void Fetch_Init(void); @@ -999,6 +1002,8 @@ void WSL_Flush(struct worker *w, int overflow); void RES_BuildHttp(const struct sess *sp); void RES_WriteObj(struct sess *sp); void RES_StreamStart(struct sess *sp); +void RES_StreamBody(struct sess *sp); +void RES_StreamWrite(struct sess *sp); void RES_StreamEnd(struct sess *sp); void RES_StreamPoll(struct worker *wrk); diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c index 386012c..1f1a239 100644 --- a/bin/varnishd/cache/cache_center.c +++ b/bin/varnishd/cache/cache_center.c @@ -963,13 +963,47 @@ DOT } DOT streambody -> DONE [style=bold,color=cyan] */ +/* Background fetch task. Should be called with ref on busyobj, and + the objcore if present */ + +static void +cnt_streambody_task(struct worker *wrk, void *priv) +{ + struct object *obj; + struct objcore *objcore; + unsigned u; + + AZ(wrk->busyobj); + CAST_OBJ_NOTNULL(wrk->busyobj, priv, BUSYOBJ_MAGIC); + AN(wrk->busyobj->use_locks); + + CHECK_OBJ_NOTNULL(wrk->busyobj->fetch_obj, OBJECT_MAGIC); + AN(wrk->busyobj->vbc); + obj = wrk->busyobj->fetch_obj; + objcore = obj->objcore; + + wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj); + AZ(wrk->busyobj->fetch_obj); + AZ(wrk->busyobj->vbc); + wrk->busyobj->vfp = NULL; + VBO_StreamStopped(wrk->busyobj); + + u = VBO_DerefBusyObj(wrk, &wrk->busyobj); + if (objcore != NULL || u == 0) { + /* Only deref object if it has it's own refcnt, or we + * were the last to deref the busyobj */ + (void)HSH_Deref(wrk, NULL, &obj); + } +} + static int cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req) { - int i; struct stream_ctx sctx; uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ? cache_param->gzip_stack_buffer : 1]; + struct worker *wrk_ex; + unsigned u; CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); @@ -987,28 +1021,56 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req) sctx.obuf_len = sizeof (obuf); } - RES_StreamStart(sp); - AssertObjCorePassOrBusy(req->obj->objcore); + RES_StreamStart(sp); + /* MBGXXX: Test on OC_F_BUSY to see if we should initiate + * fetch at all. This code now assumes all passes through here + * needs to do the fetch as well. (Multiple streaming clients + * not implemented yet) */ AZ(wrk->busyobj->fetch_obj); wrk->busyobj->fetch_obj = req->obj; - i = FetchBody(wrk, wrk->busyobj); - AZ(wrk->busyobj->fetch_obj); - http_Setup(wrk->busyobj->bereq, NULL); http_Setup(wrk->busyobj->beresp, NULL); - wrk->busyobj->vfp = NULL; - AZ(wrk->busyobj->vbc); - AN(req->director); + wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable + * thread grabbing + * timeout */ + if (wrk_ex != NULL) { + /* Set up separate thread fetch */ + wrk->busyobj->use_locks = 1; + if (req->obj->objcore != NULL) + /* Grab a ref on the objcore for the other thread */ + HSH_Ref(req->obj->objcore); + VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */ + WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj); + } else { + /* We have no worker */ + if (wrk->busyobj->fetch_obj->objcore == NULL || + wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) { + /* It's a pass, prefer flipflop + * streaming. (MBGXXX: Flipflop not finished + * yet) */ + wrk->busyobj->do_stream_flipflop = 1; + } + wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj); + VBO_StreamStopped(wrk->busyobj); + } - if (!i && req->obj->objcore != NULL) { + RES_StreamBody(sp); + + if (wrk->busyobj->htc.ws == wrk->ws) + /* Busyobj's htc has buffer on our workspace, + wait for it to be released */ + VBO_StreamWait(wrk->busyobj); + + if (wrk->busyobj->fetch_failed) { + req->doclose = "Stream error"; + } else if (req->obj->objcore != NULL) { + /* MBGXXX: This should be done on the bg task */ EXP_Insert(req->obj); AN(req->obj->objcore); AN(req->obj->objcore->ban); HSH_Unbusy(wrk); - } else { - req->doclose = "Stream error"; } wrk->acct_tmp.fetch++; req->director = NULL; @@ -1021,8 +1083,14 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req) wrk->sctx = NULL; assert(WRW_IsReleased(wrk)); assert(wrk->wrw.ciov == wrk->wrw.siov); - (void)HSH_Deref(wrk, NULL, &req->obj); - (void)VBO_DerefBusyObj(wrk, &wrk->busyobj); + u = VBO_DerefBusyObj(wrk, &wrk->busyobj); + if (req->obj->objcore != NULL || u == 0) { + /* Only deref object if it has it's own refcnt, or we + * were the last to deref the busyobj */ + (void)HSH_Deref(wrk, NULL, &req->obj); + } else + /* Object will be deref'ed by fetch thread */ + req->obj = NULL; http_Setup(req->resp, NULL); sp->step = STP_DONE; return (0); diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c index cb6ddd6..a49acbe 100644 --- a/bin/varnishd/cache/cache_response.c +++ b/bin/varnishd/cache/cache_response.c @@ -366,22 +366,39 @@ RES_StreamStart(struct sess *sp) } void -RES_StreamPoll(struct worker *wrk) +RES_StreamBody(struct sess *sp) { struct stream_ctx *sctx; + struct busyobj *bo; + + sctx = sp->wrk->sctx; + CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); + bo = sp->wrk->busyobj; + CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC); + AN(sp->req->wantbody); + + while (!sctx->stream_stopped || sctx->stream_next < sctx->stream_max) { + VBO_StreamSync(sp->wrk); + RES_StreamWrite(sp); + } +} + +void +RES_StreamWrite(struct sess *sp) +{ + struct worker *wrk; + struct stream_ctx *sctx; struct storage *st; - struct object *fetch_obj; ssize_t l, l2, stlen; void *ptr; + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + wrk = sp->wrk; CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC); sctx = wrk->sctx; CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC); - VBO_StreamData(wrk->busyobj); - VBO_StreamSync(wrk); - if (sctx->stream_max == sctx->stream_next) return; assert(sctx->stream_max > sctx->stream_next); @@ -418,23 +435,6 @@ RES_StreamPoll(struct worker *wrk) } if (!(wrk->res_mode & RES_GUNZIP)) (void)WRW_Flush(wrk); - - if (wrk->busyobj->stream_frontchunk == NULL) - return; - - /* It's a pass - remove chunks already delivered */ - fetch_obj = wrk->busyobj->fetch_obj; - CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); - assert(fetch_obj->objcore == NULL || - (fetch_obj->objcore->flags & OC_F_PASS)); - while (1) { - st = VTAILQ_FIRST(&fetch_obj->store); - if (st == NULL || st == wrk->busyobj->stream_frontchunk) - break; - CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); - VTAILQ_REMOVE(&fetch_obj->store, st, list); - STV_free(st); - } } void @@ -453,3 +453,43 @@ RES_StreamEnd(struct sess *sp) if (WRW_FlushRelease(sp->wrk)) SES_Close(sp, "remote closed"); } + +void +RES_StreamPoll(struct worker *wrk) +{ + struct object *fetch_obj; + struct storage *st; + + CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); + + VBO_StreamData(wrk->busyobj); + if (wrk->busyobj->do_stream_flipflop == 1) { + AN(wrk->sctx); + /* MBGXXX: Do flip-flop streaming */ + /* MBGXXX: Loop around waiting for the lag behind to + * be less than some configurable size, to keep the + * cache memory usage low (this for streaming + * extremely large objects with pass) */ + VBO_StreamSync(wrk); + RES_StreamWrite(wrk->sp); + } + + if (wrk->busyobj->stream_frontchunk == NULL) + return; + + /* It's a pass - remove chunks already delivered. Should be OK + * to do lock-free, as we are not fiddling pointers of any + * storage chunk passed busyobj->stream_frontchunk */ + fetch_obj = wrk->busyobj->fetch_obj; + CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC); + assert(fetch_obj->objcore == NULL || + (fetch_obj->objcore->flags & OC_F_PASS)); + while (1) { + st = VTAILQ_FIRST(&fetch_obj->store); + if (st == NULL || st == wrk->busyobj->stream_frontchunk) + break; + CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC); + VTAILQ_REMOVE(&fetch_obj->store, st, list); + STV_free(st); + } +} -- 1.7.4.1 _______________________________________________ varnish-dev mailing list varnish-dev [at] varnish-cache https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev
|