
martin at varnish-software
Jan 22, 2012, 9:53 AM
Post #1 of 1
(45 views)
Permalink
|
|
[PATCH 15/25] Make streaming work with multiple streaming clients
|
|
--- bin/varnishd/cache/cache_center.c | 117 ++++++++++++++++++++++--------------- bin/varnishd/cache/cache_expire.c | 1 - bin/varnishd/cache/cache_fetch.c | 2 - bin/varnishd/cache/cache_hash.c | 33 ++++++++++- bin/varnishd/hash/hash_slinger.h | 1 + 5 files changed, 103 insertions(+), 51 deletions(-) diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c index 7a9134e..23b5f71 100644 --- a/bin/varnishd/cache/cache_center.c +++ b/bin/varnishd/cache/cache_center.c @@ -224,7 +224,6 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req) if (wrk->busyobj != NULL) { CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); AN(wrk->busyobj->do_stream); - AssertObjCorePassOrBusy(req->obj->objcore); } wrk->res_mode = 0; @@ -301,14 +300,17 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req) case VCL_RET_RESTART: if (req->restarts >= cache_param->max_restarts) break; - if (wrk->busyobj != NULL) { + if (wrk->busyobj != NULL && + (req->obj->objcore == NULL || + req->obj->objcore->flags & OC_F_BUSY)) { AN(wrk->busyobj->do_stream); VDI_CloseFd(wrk, &wrk->busyobj->vbc); HSH_Drop(wrk); - VBO_DerefBusyObj(wrk, &wrk->busyobj); } else { (void)HSH_Deref(wrk, NULL, &req->obj); } + if (wrk->busyobj != NULL) + (void)VBO_DerefBusyObj(wrk, &wrk->busyobj); AZ(req->obj); req->restarts++; req->director = NULL; @@ -318,8 +320,8 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req) default: WRONG("Illegal action in vcl_deliver{}"); } - if (wrk->busyobj != NULL && wrk->busyobj->do_stream) { - AssertObjCorePassOrBusy(req->obj->objcore); + if (wrk->busyobj != NULL) { + AN(wrk->busyobj->do_stream); sp->step = STP_STREAMBODY; } else { sp->step = STP_DELIVER; @@ -983,13 +985,18 @@ cnt_streambody_task(struct worker *wrk, void *priv) objcore = obj->objcore; wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj); + VBO_StreamStopped(wrk->busyobj); wrk->stats.fetch_threaded++; + if (obj->objcore != NULL) { + HSH_RemoveBusyObj(wrk, obj->objcore); + if (wrk->busyobj->fetch_failed == 0) + EXP_Insert(obj); + } AZ(wrk->busyobj->fetch_obj); AZ(wrk->busyobj->vbc); wrk->busyobj->vfp = NULL; - VBO_StreamStopped(wrk->busyobj); u = VBO_DerefBusyObj(wrk, &wrk->busyobj); if (objcore != NULL || u == 0) { @@ -1013,6 +1020,7 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req) CHECK_OBJ_NOTNULL(req, REQ_MAGIC); CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC); + AN(wrk->busyobj->do_stream); memset(&sctx, 0, sizeof sctx); sctx.magic = STREAM_CTX_MAGIC; AZ(wrk->sctx); @@ -1024,58 +1032,72 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req) sctx.obuf_len = sizeof (obuf); } - AssertObjCorePassOrBusy(req->obj->objcore); RES_StreamStart(sp); - /* MBGXXX: Test on OC_F_BUSY to see if we should initiate - * fetch at all. This code now assumes all passes through here - * needs to do the fetch as well. (Multiple streaming clients - * not implemented yet) */ - wrk->acct_tmp.fetch++; - AZ(wrk->busyobj->fetch_obj); - wrk->busyobj->fetch_obj = req->obj; - http_Setup(wrk->busyobj->bereq, NULL); - http_Setup(wrk->busyobj->beresp, NULL); - wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable - * thread grabbing - * timeout */ - if (wrk_ex != NULL) { - /* Set up separate thread fetch */ - wrk->busyobj->use_locks = 1; - if (req->obj->objcore != NULL) - /* Grab a ref on the objcore for the other thread */ - HSH_Ref(req->obj->objcore); - VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */ - WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj); - } else { - /* We have no worker */ - if (wrk->busyobj->fetch_obj->objcore == NULL || - wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) { - /* It's a pass, prefer flipflop - * streaming. (MBGXXX: Flipflop not finished - * yet) */ - wrk->busyobj->do_stream_flipflop = 1; - wrk->stats.fetch_flipflop++; + if (req->obj->objcore == NULL || req->obj->objcore->flags & OC_F_BUSY) { + /* Initiate a fetch of the body */ + wrk->acct_tmp.fetch++; + AZ(wrk->busyobj->fetch_obj); + wrk->busyobj->fetch_obj = req->obj; + http_Setup(wrk->busyobj->bereq, NULL); + http_Setup(wrk->busyobj->beresp, NULL); + wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable + * thread grabbing + * timeout */ + if (wrk_ex != NULL) + wrk->busyobj->use_locks = 1; + if (req->obj->objcore != NULL) { + AN(req->obj->objcore->ban); + HSH_Unbusy(wrk); + AN(req->obj->objcore->busyobj); + } + if (wrk_ex != NULL) { + /* Set up separate thread fetch */ + if (req->obj->objcore != NULL) + /* Grab a ref on the objcore for the + * other thread */ + HSH_Ref(req->obj->objcore); + WRK_DoTask(wrk_ex, cnt_streambody_task, + VBO_RefBusyObj(wrk->busyobj)); + } else { + /* We have no fetch worker */ + if (req->obj->objcore == NULL || + req->obj->objcore->flags & OC_F_PASS) { + /* It's a pass, prefer flipflop + * streaming. (MBGXXX: Flipflop not + * finished yet) */ + wrk->busyobj->do_stream_flipflop = 1; + wrk->stats.fetch_flipflop++; + } + wrk->busyobj->fetch_failed = + FetchBody(wrk, wrk->busyobj); + VBO_StreamStopped(wrk->busyobj); + if (req->obj->objcore != NULL) { + HSH_RemoveBusyObj(wrk, req->obj->objcore); + if (wrk->busyobj->fetch_failed == 0) + EXP_Insert(req->obj); + } + AZ(wrk->busyobj->fetch_obj); + AZ(wrk->busyobj->vbc); + wrk->busyobj->vfp = NULL; + VBO_StreamSync(wrk); } - wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj); - VBO_StreamStopped(wrk->busyobj); } - RES_StreamBody(sp); + if (wrk->busyobj->do_stream_flipflop == 0) + RES_StreamBody(sp); + else + AN(wrk->sctx->stream_stopped); - if (wrk->busyobj->htc.ws == wrk->ws) + if (wrk->busyobj->htc.ws == wrk->ws) { /* Busyobj's htc has buffer on our workspace, wait for it to be released */ + AZ(wrk->busyobj->do_stream_flipflop); VBO_StreamWait(wrk->busyobj); + } if (wrk->busyobj->fetch_failed) { req->doclose = "Stream error"; - } else if (req->obj->objcore != NULL) { - /* MBGXXX: This should be done on the bg task */ - EXP_Insert(req->obj); - AN(req->obj->objcore); - AN(req->obj->objcore->ban); - HSH_Unbusy(wrk); } req->director = NULL; req->restarts = 0; @@ -1169,7 +1191,6 @@ cnt_hit(struct sess *sp, struct worker *wrk, struct req *req) CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC); CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC); - AZ(wrk->busyobj); assert(!(req->obj->objcore->flags & OC_F_PASS)); @@ -1315,6 +1336,8 @@ cnt_lookup(struct sess *sp, struct worker *wrk, struct req *req) wrk->stats.cache_hitpass++; WSP(sp, SLT_HitPass, "%u", req->obj->xid); (void)HSH_Deref(wrk, NULL, &req->obj); + if (wrk->busyobj != NULL) + (void)VBO_DerefBusyObj(wrk, &wrk->busyobj); req->objcore = NULL; sp->step = STP_PASS; return (0); diff --git a/bin/varnishd/cache/cache_expire.c b/bin/varnishd/cache/cache_expire.c index e93a1aa..212adf3 100644 --- a/bin/varnishd/cache/cache_expire.c +++ b/bin/varnishd/cache/cache_expire.c @@ -224,7 +224,6 @@ EXP_Insert(struct object *o) CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC); oc = o->objcore; CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); - AssertObjBusy(o); HSH_Ref(oc); assert(o->exp.entered != 0 && !isnan(o->exp.entered)); diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c index d5ae4d5..0643389 100644 --- a/bin/varnishd/cache/cache_fetch.c +++ b/bin/varnishd/cache/cache_fetch.c @@ -507,8 +507,6 @@ FetchBody(struct worker *wrk, struct busyobj *bo) if (bo->vfp == NULL) bo->vfp = &vfp_nop; - AssertObjCorePassOrBusy(obj->objcore); - AZ(bo->vgz_rx); AZ(VTAILQ_FIRST(&obj->store)); diff --git a/bin/varnishd/cache/cache_hash.c b/bin/varnishd/cache/cache_hash.c index e78eb60..a12d43a 100644 --- a/bin/varnishd/cache/cache_hash.c +++ b/bin/varnishd/cache/cache_hash.c @@ -63,6 +63,8 @@ static const struct hash_slinger *hash; +static void hsh_rush(struct objhead *oh); + /*---------------------------------------------------------------------*/ /* Precreate an objhead and object for later use */ void @@ -414,6 +416,13 @@ HSH_Lookup(struct sess *sp, struct objhead **poh) if (o->hits < INT_MAX) o->hits++; assert(oh->refcnt > 1); + if (oc->busyobj != NULL) { + /* It's streamable */ + CHECK_OBJ_NOTNULL(oc->busyobj, BUSYOBJ_MAGIC); + wrk->busyobj = VBO_RefBusyObj(oc->busyobj); + if (oh->waitinglist != NULL) + hsh_rush(oh); + } Lck_Unlock(&oh->mtx); assert(hash->deref(oh)); *poh = oh; @@ -623,7 +632,7 @@ HSH_Unbusy(struct worker *wrk) VTAILQ_REMOVE(&oh->objcs, oc, list); VTAILQ_INSERT_HEAD(&oh->objcs, oc, list); oc->flags &= ~OC_F_BUSY; - if (oc->busyobj != NULL) + if (oc->busyobj->do_stream == 0) (void)VBO_DerefBusyObj(wrk, &oc->busyobj); if (oh->waitinglist != NULL) hsh_rush(oh); @@ -632,6 +641,28 @@ HSH_Unbusy(struct worker *wrk) assert(oc_getobj(wrk, oc) == o); } +/*--------------------------------------------------------------------- + * Drop the objcore's ref on the busyobj while holding the objhead mutex + */ + +void +HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc) +{ + struct objhead *oh; + + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC); + oh = oc->objhead; + CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC); + + AZ(oc->flags & OC_F_BUSY); + AN(oc->busyobj); + assert(oc->busyobj->do_stream == 0 || oc->busyobj->stream_stopped == 1); + Lck_Lock(&oh->mtx); + (void)VBO_DerefBusyObj(wrk, &oc->busyobj); + Lck_Unlock(&oh->mtx); +} + void HSH_Ref(struct objcore *oc) { diff --git a/bin/varnishd/hash/hash_slinger.h b/bin/varnishd/hash/hash_slinger.h index b45e604..8affa42 100644 --- a/bin/varnishd/hash/hash_slinger.h +++ b/bin/varnishd/hash/hash_slinger.h @@ -55,6 +55,7 @@ void HSH_Prealloc(const struct sess *sp); void HSH_Cleanup(struct worker *w); struct objcore *HSH_Lookup(struct sess *sp, struct objhead **poh); void HSH_Unbusy(struct worker *wrk); +void HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc); void HSH_Ref(struct objcore *o); void HSH_Drop(struct worker *wrk); void HSH_Init(const struct hash_slinger *slinger); -- 1.7.4.1 _______________________________________________ varnish-dev mailing list varnish-dev [at] varnish-cache https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev
|