
martin at varnish-software
Jan 22, 2012, 9:53 AM
Post #1 of 1
(63 views)
Permalink
|
|
[PATCH 24/25] Thundering horde elimination
|
|
Deal with thundering horde problem by limiting the number of clients allowed to wait for additional data on the stream to stream_tokens. Only threads that acquire a token will be allowed to wait on the condvar. Others will have to go on a queue. Tokens are passed to queued clients when a token holding thread has sent some data to the client. The maximum wait time to get a token is controlled through the stream_token_timeout run-time parameter. This to prevent a token holding client that suddenly blocks from starving another client waiting for a token. --- bin/varnishd/cache/cache.h | 5 ++ bin/varnishd/cache/cache_busyobj.c | 96 +++++++++++++++++++++++++++++++++++- bin/varnishd/cache/cache_center.c | 4 ++ bin/varnishd/cache/cache_vrt_var.c | 20 ++++++++ bin/varnishd/common/params.h | 2 + bin/varnishd/mgt/mgt_param.c | 12 +++++ include/tbl/vsc_f_main.h | 3 + lib/libvcl/generate.py | 6 ++ 8 files changed, 146 insertions(+), 2 deletions(-) diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h index a64a0ae..b02bea6 100644 --- a/bin/varnishd/cache/cache.h +++ b/bin/varnishd/cache/cache.h @@ -278,6 +278,9 @@ struct stream_ctx { /* Backend fetch has finished */ unsigned stream_stopped; + + /* Are we currently holding a token from the busyobj */ + unsigned has_token; }; /*--------------------------------------------------------------------*/ @@ -545,6 +548,7 @@ struct busyobj { volatile struct storage *stream_frontchunk; unsigned stream_stopped; ssize_t stream_pass_bufsize; + unsigned stream_tokens; }; /* Object structure --------------------------------------------------*/ @@ -759,6 +763,7 @@ void VBO_StreamStopped(struct busyobj *busyobj); void VBO_StreamWait(struct busyobj *busyobj); void VBO_StreamData(struct busyobj *busyobj); void VBO_StreamSync(struct worker *wrk); +void VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj); /* cache_center.c [CNT] */ void CNT_Session(struct sess *sp); diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c index 8b97d4a..2b6ebc0 100644 --- a/bin/varnishd/cache/cache_busyobj.c +++ b/bin/varnishd/cache/cache_busyobj.c @@ -43,6 +43,7 @@ struct vbo { #define VBO_MAGIC 0xde3d8223 struct lock mtx; pthread_cond_t cond; + VTAILQ_HEAD(, worker) token_wait_queue; unsigned refcount; uint16_t nhttp; struct busyobj bo; @@ -51,6 +52,9 @@ struct vbo { static struct lock vbo_mtx; static struct vbo *nvbo; +static void vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj); +static void vbo_release_token(struct worker *wrk, struct busyobj *busyobj); + void VBO_Init(void) { @@ -82,6 +86,7 @@ vbo_New(void) vbo->nhttp = nhttp; Lck_New(&vbo->mtx, lck_busyobj); AZ(pthread_cond_init(&vbo->cond, NULL)); + VTAILQ_INIT(&vbo->token_wait_queue); return (vbo); } @@ -145,6 +150,7 @@ VBO_GetBusyObj(struct worker *wrk) vbo->bo.beresp = HTTP_create(p, vbo->nhttp); vbo->bo.stream_pass_bufsize = cache_param->stream_pass_bufsize; + vbo->bo.stream_tokens = cache_param->stream_tokens; return (&vbo->bo); } @@ -291,6 +297,11 @@ VBO_StreamSync(struct worker *wrk) Lck_Lock(&busyobj->vbo->mtx); assert(sctx->stream_max <= busyobj->stream_max); + if (sctx->has_token) + /* Release token to give other clients that has + * reached the end of data a chance */ + vbo_release_token(wrk, busyobj); + if (wrk->sp->req->obj->objcore == NULL || (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) { /* Give notice to backend fetch that we are finished @@ -303,8 +314,11 @@ VBO_StreamSync(struct worker *wrk) sctx->stream_stopped = busyobj->stream_stopped; sctx->stream_max = busyobj->stream_max; - if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) { - while (!busyobj->stream_stopped && + if (busyobj->use_locks && !sctx->stream_stopped && + sctx->stream_next == sctx->stream_max) { + /* We've exhausted available data, wait for more */ + vbo_acquire_token(wrk, busyobj); + while (sctx->has_token && !busyobj->stream_stopped && sctx->stream_max == busyobj->stream_max) { Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL); @@ -316,3 +330,81 @@ VBO_StreamSync(struct worker *wrk) if (busyobj->use_locks) Lck_Unlock(&busyobj->vbo->mtx); } + +/* Acquire a token for the worker from the busyobj. If none available, + wait for stream_token_timeout ms on the queue. + wrk->sctx->has_token will be true if succesful */ +static void +vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj) +{ + struct timespec ts; + + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + AZ(wrk->sctx->has_token); + CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); + AN(busyobj->use_locks); + + if (busyobj->stream_tokens > 0) { + busyobj->stream_tokens--; + wrk->sctx->has_token = 1; + return; + } + + AZ(clock_gettime(CLOCK_REALTIME, &ts)); + ts.tv_sec += cache_param->stream_token_timeout / 1000; + ts.tv_nsec += (cache_param->stream_token_timeout % 1000) * 1000000; + if (ts.tv_nsec >= 1000000000) { + ts.tv_sec++; + ts.tv_nsec -= 1000000000; + } + VTAILQ_INSERT_TAIL(&busyobj->vbo->token_wait_queue, wrk, list); + Lck_CondWait(&wrk->cond, &busyobj->vbo->mtx, &ts); + if (wrk->sctx->has_token == 0) { + VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk, list); + wrk->stats.n_tokentimeout++; + } +} + +/* Release our currently held token, passing it on to the first on the + queue if the queue is non-empty. Resets wrk->sctx->has_token */ +static void +vbo_release_token(struct worker *wrk, struct busyobj *busyobj) +{ + struct worker *wrk_waiting; + + CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC); + AN(wrk->sctx->has_token); + CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); + AN(busyobj->use_locks); + + wrk_waiting = VTAILQ_FIRST(&busyobj->vbo->token_wait_queue); + if (wrk_waiting != NULL) { + /* Transfer our token to the first on the waiting + list, and wake it */ + CHECK_OBJ_NOTNULL(wrk_waiting, WORKER_MAGIC); + CHECK_OBJ_NOTNULL(wrk_waiting->sctx, STREAM_CTX_MAGIC); + VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk_waiting, + list); + AZ(wrk_waiting->sctx->has_token); + wrk_waiting->sctx->has_token = 1; + AZ(pthread_cond_signal(&wrk_waiting->cond)); + } else { + busyobj->stream_tokens++; + } + + wrk->sctx->has_token = 0; +} + +void +VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj) +{ + CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC); + + if (busyobj->use_locks == 0) + return; + Lck_Lock(&busyobj->vbo->mtx); + vbo_release_token(wrk, busyobj); + Lck_Unlock(&busyobj->vbo->mtx); +} diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c index 96e23fc..5daa949 100644 --- a/bin/varnishd/cache/cache_center.c +++ b/bin/varnishd/cache/cache_center.c @@ -1084,6 +1084,10 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req) else AN(wrk->sctx->stream_stopped); + if (wrk->sctx->has_token) + VBO_ReleaseToken(wrk, wrk->busyobj); + AZ(wrk->sctx->has_token); + if (wrk->busyobj->htc.ws == wrk->ws) { /* Busyobj's htc has buffer on our workspace, wait for it to be released */ diff --git a/bin/varnishd/cache/cache_vrt_var.c b/bin/varnishd/cache/cache_vrt_var.c index e655abf..82d77c6 100644 --- a/bin/varnishd/cache/cache_vrt_var.c +++ b/bin/varnishd/cache/cache_vrt_var.c @@ -320,6 +320,26 @@ VRT_l_beresp_stream_pass_bufsize(const struct sess *sp, double val) sp->wrk->busyobj->stream_pass_bufsize = 0; } +int +VRT_r_beresp_stream_tokens(const struct sess *sp) +{ + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC); + return (sp->wrk->busyobj->stream_tokens); +} + +void +VRT_l_beresp_stream_tokens(const struct sess *sp, int val) +{ + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC); + if (val >= 1) + sp->wrk->busyobj->stream_tokens = val; + else + sp->wrk->busyobj->stream_tokens = 1; +} + + /*--------------------------------------------------------------------*/ void diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h index 20b346a..a68d608 100644 --- a/bin/varnishd/common/params.h +++ b/bin/varnishd/common/params.h @@ -101,6 +101,8 @@ struct params { ssize_t stream_maxchunksize; unsigned stream_grab_timeout; ssize_t stream_pass_bufsize; + unsigned stream_tokens; + unsigned stream_token_timeout; unsigned nuke_limit; diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c index 3a64b96..e5d6888 100644 --- a/bin/varnishd/mgt/mgt_param.c +++ b/bin/varnishd/mgt/mgt_param.c @@ -858,6 +858,18 @@ static const struct parspec input_parspec[] = { "Zero means unlimited.\n", EXPERIMENTAL, "10mb", "bytes" }, + { "stream_tokens", + tweak_uint, &mgt_param.stream_tokens, 1, UINT_MAX, + "Default number of tokens available for racing streaming " + "clients.\n", + EXPERIMENTAL, + "10", "tokens" }, + { "stream_token_timeout", + tweak_uint, &mgt_param.stream_token_timeout, 1, UINT_MAX, + "Timeout for acquiring token during streaming and waiting " + "for more data.\n", + EXPERIMENTAL, + "100", "ms" }, #ifdef SENDFILE_WORKS { "sendfile_threshold", tweak_bytes, &mgt_param.sendfile_threshold, 0, HUGE_VAL, diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h index 5761c51..415b07d 100644 --- a/include/tbl/vsc_f_main.h +++ b/include/tbl/vsc_f_main.h @@ -262,6 +262,9 @@ VSC_F(n_objwrite, uint64_t, 0, 'a', "Objects sent with write", "or if the sendfile call has been disabled") VSC_F(n_objoverflow, uint64_t, 1, 'a', "Objects overflowing workspace", "") +VSC_F(n_tokentimeout, uint64_t, 1, 'a', "Token wait timeouts", + "The number of times a fast streaming client has timed out waiting for " + "token to be notified about new incoming data.") VSC_F(s_sess, uint64_t, 1, 'a', "Total Sessions", "") VSC_F(s_req, uint64_t, 1, 'a', "Total Requests", "") diff --git a/lib/libvcl/generate.py b/lib/libvcl/generate.py index 51b2294..1dab49d 100755 --- a/lib/libvcl/generate.py +++ b/lib/libvcl/generate.py @@ -337,6 +337,12 @@ sp_variables = ( ( 'fetch',), 'const struct sess *' ), + ('beresp.stream_tokens', + 'INT', + ( 'fetch',), + ( 'fetch',), + 'const struct sess *' + ), ('beresp.ttl', 'DURATION', ( 'fetch',), -- 1.7.4.1 _______________________________________________ varnish-dev mailing list varnish-dev [at] varnish-cache https://www.varnish-cache.org/lists/mailman/listinfo/varnish-dev
|