1-- New style request factories and backend request handling. 2-- 3-- First, this API adds a "request func generation" step when a new request 4-- starts: if there is not already a cached function to use, call the 5-- "generator" function, then use the response to run the request. This generated 6-- function is reused until the parent generator is swapped out during reload. 7-- This allows the user to pre-allocate and pre-calculate objects and data, 8-- offering both safety and performance. 9-- Future API revisions (such as stats) will rely on this generation step to 10-- be more user friendly while retaining performance. 11-- 12-- For backend IO's this unifies what was once two API's: 13-- - result = pool(request): the non-async API 14-- - table = mcp.await(etc) 15-- 16-- It is now a single system governeed by a request context object (rctx). 17-- This new system allows queueing a nearly arbitrary set of requests, 18-- "blocking" a client on any individual response, and using callbacks to 19-- make decisions on if a response is "good", to resume processing early, or 20-- post-process once all responses are received. 21-- 22-- The queueing system is now recursive: a fgen can new_handle() another fgen. 23-- Meaning configurations can be assembled as call graphs. IE: If you have a 24-- route function A and want to "shadow" some of its requests onto route 25-- function B, instead of making A more complex you can create a third 26-- function C which splits the traffic. 27-- 28-- API docs: https://github.com/memcached/memcached/wiki/Proxy 29 30verbose = true 31-- global for an error handling test 32failgen_armed = false 33failgenret_armed = false 34 35function say(...) 36 if verbose then 37 print(...) 38 end 39end 40 41function mcp_config_pools() 42 local srv = mcp.backend 43 44 local b1 = srv('b1', '127.0.0.1', 12011) 45 local b2 = srv('b2', '127.0.0.1', 12012) 46 local b3 = srv('b3', '127.0.0.1', 12013) 47 local b4 = srv('b4', '127.0.0.1', 12014) 48 local b1z = mcp.pool({b1}) 49 local b2z = mcp.pool({b2}) 50 local b3z = mcp.pool({b3}) 51 local b4z = mcp.pool({b4}) 52 local p = {p = {b1z, b2z, b3z}, b = b4z} 53 54 --return mcp.pool(b1z, { iothread = false }) 55 return p 56end 57 58-- many of these factories have the same basic init pattern, so we can save 59-- some code. 60function new_basic_factory(arg, func) 61 local fgen = mcp.funcgen_new() 62 local o = { t = {}, c = 0 } 63 64 -- some of them have a wait, some don't. 65 -- here would be a good place to do bounds checking on arguments in 66 -- similar functions. 67 o.wait = arg.wait 68 for _, v in pairs(arg.list) do 69 table.insert(o.t, fgen:new_handle(v)) 70 o.c = o.c + 1 71 end 72 73 fgen:ready({ f = func, a = o, n = arg.name}) 74 return fgen 75end 76 77function new_prefix_factory(arg) 78 local fgen = mcp.funcgen_new() 79 local o = {} 80 o.pattern = arg.pattern 81 o.default = fgen:new_handle(arg.default) 82 83 o.map = {} 84 -- get handler ids for each sub-route value 85 -- convert the map. 86 for k, v in pairs(arg.list) do 87 o.map[k] = fgen:new_handle(v) 88 end 89 90 fgen:ready({ f = prefix_factory_gen, a = o, n = arg.name }) 91 return fgen 92end 93 94function prefix_factory_gen(rctx, arg) 95 local p = arg.pattern 96 local map = arg.map 97 local d = arg.default 98 99 say("generating a prefix factory function") 100 101 return function(r) 102 local key = r:key() 103 104 local handle = map[string.match(key, p)] 105 if handle == nil then 106 return rctx:enqueue_and_wait(r, d) 107 end 108 return rctx:enqueue_and_wait(r, handle) 109 end 110end 111 112function new_direct_factory(arg) 113 local fgen = mcp.funcgen_new() 114 local h = fgen:new_handle(arg.p) 115 fgen:ready({ f = direct_factory_gen, a = h, n = arg.name }) 116 return fgen 117end 118 119function direct_factory_gen(rctx, h) 120 say("generating direct factory function") 121 122 return function(r) 123 say("waiting on a single pool") 124 return rctx:enqueue_and_wait(r, h) 125 end 126end 127 128function new_locality_factory(arg) 129 local fgen = mcp.funcgen_new() 130 local h = fgen:new_handle(arg.p) 131 fgen:ready({ f = locality_factory_gen, a = h, n = arg.name }) 132 return fgen 133end 134 135-- factory for proving slots have unique environmental memory. 136-- we need to wait on a backend to allow the test to pipeline N requests in 137-- parallel, to prove that each parallel slot has a unique lua environment. 138function locality_factory_gen(rctx, h) 139 say("generating locality factory function") 140 local x = 0 141 142 return function(r) 143 x = x + 1 144 say("returning from locality: " .. x) 145 local res = rctx:enqueue_and_wait(r, h) 146 return "HD t" .. x .. "\r\n" 147 end 148end 149 150-- waits for only the _first_ queued handle to return. 151-- ie; position 1 in the table. 152-- we do a numeric for loop in the returned function to avoid allocations done 153-- by a call to pairs() 154function first_factory_gen(rctx, arg) 155 say("generating first factory function") 156 local t = arg.t 157 local count = arg.c 158 159 return function(r) 160 say("waiting on first of " .. count .. " pools") 161 for x=1, count do 162 rctx:enqueue(r, t[x]) 163 end 164 165 return rctx:wait_handle(t[1]) 166 end 167end 168 169-- wait on x out of y 170function partial_factory_gen(rctx, arg) 171 say("generating partial factory function") 172 local t = arg.t 173 local count = arg.c 174 local wait = arg.wait 175 176 return function(r) 177 say("waiting on first " .. wait .. " out of " .. count) 178 for x=1, count do 179 rctx:enqueue(r, t[x]) 180 end 181 182 local done = rctx:wait_cond(wait) 183 for x=1, count do 184 -- :good will only return the result object if the handle's 185 -- response was considered "good" 186 local res = rctx:res_good(t[x]) 187 if res ~= nil then 188 say("found a result") 189 return res 190 end 191 -- TODO: tally up responses and send summary for test. 192 end 193 say("found nothing") 194 -- didn't return anything good, so return one at random. 195 for x=1, count do 196 local res = rctx:res_any(t[x]) 197 if res ~= nil then 198 return res 199 end 200 end 201 end 202end 203 204-- wait on all pool arguments 205function all_factory_gen(rctx, arg) 206 say("generating all factory function") 207 local t = arg.t 208 local count = arg.c 209 -- should be a minor speedup avoiding the table lookup. 210 local mode = mcp.WAIT_ANY 211 212 return function(r) 213 say("waiting on " .. count) 214 215 rctx:enqueue(r, t) 216 local done = rctx:wait_cond(count, mode) 217 -- :any will give us the result object for that handle, regardless 218 -- of return code/status. 219 local res = rctx:res_any(t[1]) 220 221 -- TODO: tally up the responses and return summary for test. 222 return res 223 end 224end 225 226-- wait on the first good or N of total 227function fastgood_factory_gen(rctx, arg) 228 say("generating fastgood factory function") 229 local t = arg.t 230 local count = arg.c 231 local wait = arg.wait 232 233 local cb = function(res) 234 say("running in a callback!") 235 if res:hit() then 236 say("was a hit!") 237 -- return an extra arg telling us to shortcut the wait count 238 return mcp.WAIT_GOOD, mcp.WAIT_RESUME 239 end 240 -- default return code is mcp.WAIT_ANY 241 end 242 243 for _, v in pairs(t) do 244 rctx:handle_set_cb(v, cb) 245 end 246 247 return function(r) 248 say("first good or wait for N") 249 250 rctx:enqueue(r, t) 251 local done = rctx:wait_cond(wait, mcp.WAIT_GOOD) 252 say("fastgood done:", done) 253 254 if done == 1 then 255 -- if we just got one "good", we're probably happy. 256 for x=1, count do 257 -- loop to find the good handle. 258 local res = rctx:res_good(t[x]) 259 if res ~= nil then 260 return res 261 end 262 end 263 else 264 -- else we had to wait and now need to decide if it was a miss or 265 -- network error. 266 -- but for this test we'll just return the first result. 267 for x=1, count do 268 local res = rctx:res_any(t[x]) 269 if res ~= nil then 270 return res 271 end 272 end 273 end 274 end 275end 276 277-- fastgood implemented using internal fastgood state 278function fastgoodint_factory_gen(rctx, arg) 279 local t = arg.t 280 local count = arg.c 281 local wait = arg.wait 282 283 return function(r) 284 rctx:enqueue(r, t) 285 local done = rctx:wait_cond(wait, mcp.WAIT_FASTGOOD) 286 say("fastgoodint done:", done) 287 288 local final = nil 289 for x=1, count do 290 local res, mode = rctx:result(t[x]) 291 if mode == mcp.WAIT_GOOD then 292 return res 293 elseif res ~= nil then 294 final = res 295 end 296 end 297 -- if no good found, return anything. 298 return final 299 end 300end 301 302function new_blocker_factory(arg) 303 local fgen = mcp.funcgen_new() 304 local o = { c = 0, t = {} } 305 o.b = fgen:new_handle(arg.blocker) 306 307 for _, v in pairs(arg.list) do 308 table.insert(o.t, fgen:new_handle(v)) 309 o.c = o.c + 1 310 end 311 312 fgen:ready({ f = blocker_factory_gen, a = o, n = arg.name }) 313 return fgen 314end 315 316-- queue a bunch, but shortcut if a special auxiliary handle fails 317function blocker_factory_gen(rctx, arg) 318 say("generating blocker factory function") 319 local t = arg.t 320 local count = arg.c 321 local blocker = arg.b 322 local was_blocked = false 323 324 local cb = function(res) 325 -- check the response or tokens or anything special to indicate 326 -- success. 327 -- for this test we just check if it was a hit. 328 if res:hit() then 329 was_blocked = false 330 return mcp.WAIT_GOOD 331 else 332 was_blocked = true 333 return mcp.WAIT_ANY 334 end 335 end 336 337 rctx:handle_set_cb(blocker, cb) 338 339 return function(r) 340 say("function blocker test") 341 342 -- queue up the real queries we wanted to run. 343 rctx:enqueue(r, t) 344 345 -- any wait command will execute all queued queries at once, but here 346 -- we only wait for the blocker to complete. 347 local bres = rctx:enqueue_and_wait(r, blocker) 348 349 -- another way of doing this is to ask: 350 -- local res = rctx:res_good(blocker) 351 -- if a result was returned, the callback had returned WAIT_GOOD 352 if was_blocked == false then 353 -- our blocker is happy... 354 -- wait for the rest of the handles to come in and make a decision 355 -- on what to return to the client. 356 local done = rctx:wait_cond(count, mcp.WAIT_ANY) 357 return rctx:res_any(t[1]) 358 else 359 return "SERVER_ERROR blocked\r\n" 360 end 361 end 362end 363 364-- log on all callbacks, even if waiting for 1 365function logall_factory_gen(rctx, arg) 366 say("generating logall factory function") 367 local t = arg.t 368 369 local cb = function(res, req) 370 say("received a response, logging...") 371 mcp.log("received a response: " .. tostring(res:ok())) 372 mcp.log_req(req, res, "even more logs", rctx:cfd()) 373 return mcp.WAIT_ANY 374 end 375 376 for _, v in pairs(t) do 377 rctx:handle_set_cb(v, cb) 378 end 379 380 return function(r) 381 rctx:enqueue(r, t) 382 return rctx:wait_handle(t[1]) 383 end 384end 385 386-- log a summary after all callbacks run 387function summary_factory_gen(rctx, arg) 388 say("generating summary factory function") 389 local t = arg.t 390 local count = arg.c 391 392 local todo = 0 393 local cb = function(res) 394 say("responses TODO: " .. todo) 395 todo = todo - 1 396 if todo == 0 then 397 mcp.log("received all responses") 398 end 399 end 400 401 for _, v in pairs(t) do 402 rctx:handle_set_cb(v, cb) 403 end 404 405 return function(r) 406 -- re-seed the todo value that the callback uses 407 todo = count 408 409 rctx:enqueue(r, t) 410 -- we're just waiting for a single response, but we queue all of the 411 -- handles. the callback uses data from the shared environment and a 412 -- summary is logged. 413 return rctx:wait_handle(t[1]) 414 end 415end 416 417-- testing various waitfor conditions. 418function waitfor_factory_gen(rctx, arg) 419 say("generating background factory function") 420 local t = arg.t 421 local count = arg.c 422 423 return function(r) 424 local key = r:key() 425 if key == "waitfor/a" then 426 rctx:enqueue(r, t) 427 rctx:wait_cond(0) -- issue the requests in the background 428 return "HD t1\r\n" -- return whatever to the client 429 elseif key == "waitfor/b" then 430 rctx:enqueue(r, t) 431 rctx:wait_cond(0) -- issue requests and resume 432 -- now go back into wait mode, but we've already dispatched 433 local done = rctx:wait_cond(2) 434 if done ~= 2 then 435 return "SERVER_ERROR invalid wait" 436 end 437 -- TODO: bonus points, count the goods or check that everyone's t 438 -- flag is right. 439 for x=1, count do 440 local res = rctx:res_good(x) 441 if res ~= nil then 442 return res 443 end 444 return "SERVER_ERROR no good response" 445 end 446 elseif key == "waitfor/c" then 447 rctx:enqueue(r, t[1]) 448 rctx:wait_cond(0) -- issue the first queued request 449 -- queue two more 450 rctx:enqueue(r, t[2]) 451 rctx:enqueue(r, t[3]) 452 -- wait explicitly for the first queued one. 453 return rctx:wait_handle(t[1]) 454 elseif key == "waitfor/d" then 455 -- queue two then wait on each individually 456 rctx:enqueue(r, t[1]) 457 rctx:enqueue(r, t[2]) 458 rctx:wait_handle(t[1]) 459 return rctx:wait_handle(t[2]) 460 end 461 end 462end 463 464-- try "primary zone" and then fail over to secondary zones. 465-- using simplified code that just treats the first pool as the primary zone. 466function failover_factory_gen(rctx, arg) 467 say("generating failover factory function") 468 local t = {} 469 local count = arg.c 470 local first = arg.t[1] 471 472 for x=2, count do 473 table.insert(t, arg.t[x]) 474 end 475 476 return function(r) 477 -- first try local 478 local fres = rctx:enqueue_and_wait(r, first) 479 480 if fres == nil or fres:hit() == false then 481 -- failed to get a local hit, queue all "far" zones. 482 rctx:enqueue(r, t) 483 -- wait for one. 484 local done = rctx:wait_cond(1, mcp.WAIT_GOOD) 485 -- find the good from the second set. 486 for x=1, count-1 do 487 local res = rctx:res_good(t[x]) 488 if res ~= nil then 489 say("found a result") 490 return res 491 end 492 end 493 -- got nothing from second set, just return anything. 494 return rctx:res_any(first) 495 else 496 return fres 497 end 498 end 499end 500 501function new_error_factory(func, name) 502 local fgen = mcp.funcgen_new() 503 fgen:ready({ f = func, n = name }) 504 return fgen 505end 506 507function errors_factory_gen(rctx) 508 say("generating errors factory") 509 510 return function(r) 511 local key = r:key() 512 -- failure scenarios that require a top-level request context 513 if key == "errors/reterror" then 514 error("test error") 515 elseif key == "errors/retnil" then 516 return nil 517 elseif key == "errors/retint" then 518 return 5 519 elseif key == "errors/retnone" then 520 return 521 end 522 end 523end 524 525function suberrors_factory_gen(rctx) 526 say("generating suberrors factory function") 527 528 return function(r) 529 local key = r:key() 530 if key == "suberrors/error" then 531 error("test error") 532 elseif key == "suberrors/nil" then 533 return nil 534 elseif key == "suberrors/int" then 535 return 5 536 elseif key == "suberrors/none" then 537 return 538 end 539 540 end 541end 542 543function new_split_factory(arg) 544 local fgen = mcp.funcgen_new() 545 local o = {} 546 o.a = fgen:new_handle(arg.a) 547 o.b = fgen:new_handle(arg.b) 548 fgen:ready({ f = split_factory_gen, a = o, n = name }) 549 return fgen 550end 551 552-- example of a factory that takes two other factories and copies traffic 553-- across them. 554-- If an additional API's for hashing to numerics are added, keys can be 555-- hashed to allow "1/n" of keys to copy to one of the splits. This allows 556-- shadowing traffic to new/experimental pools, slow-warming traffic, etc. 557function split_factory_gen(rctx, arg) 558 say("generating split factory function") 559 local a = arg.a 560 local b = arg.b 561 562 return function(r) 563 say("splitting traffic") 564 -- b is the split path. 565 rctx:enqueue(r, b) 566 567 -- a is the main path. so we only explicitly wait on and return a. 568 return rctx:enqueue_and_wait(r, a) 569 end 570end 571 572-- test handling of failure to generate a function slot 573function failgen_factory_gen(rctx) 574 if failgen_armed then 575 say("throwing failgen error") 576 error("failgen") 577 end 578 say("arming failgen") 579 failgen_armed = true 580 581 return function(r) 582 return "NF\r\n" 583 end 584end 585 586function failgenret_factory_gen(rctx) 587 if failgenret_armed then 588 return nil 589 end 590 failgenret_armed = true 591 592 return function(r) 593 return "NF\r\n" 594 end 595end 596 597function badreturn_gen(rctx) 598 -- returning a userdata that isn't the correct kind of userdata. 599 -- shouldn't crash the daemon! 600 return function(r) 601 return rctx 602 end 603end 604 605-- TODO: this might be supported only in a later update. 606-- new queue after parent return 607-- - do an immediate return + cb queue, queue from that callback 608-- - should still work but requires worker lua vm 609-- requires removing the need of having an active client socket object to 610-- queue new requests for processing. 611function postreturn_factory(rctx, arg) 612 613end 614 615-- TODO: demonstrate a split call graph 616-- ie; an all split into two single 617 618function mcp_config_routes(p) 619 local b_pool = p.b 620 p = p.p 621 local single = new_direct_factory({ p = p[1], name = "single" }) 622 -- use the typically unused backend. 623 local singletwo = new_direct_factory({ p = b_pool, name = "singletwo" }) 624 625 local first = new_basic_factory({ list = p, name = "first" }, first_factory_gen) 626 local partial = new_basic_factory({ list = p, wait = 2, name = "partial" }, partial_factory_gen) 627 local all = new_basic_factory({ list = p, name = "all" }, all_factory_gen) 628 local fastgood = new_basic_factory({ list = p, wait = 2, name = "fastgood" }, fastgood_factory_gen) 629 local fastgoodint = new_basic_factory({ list = p, wait = 2, name = "fastgoodint" }, fastgoodint_factory_gen) 630 local blocker = new_blocker_factory({ blocker = b_pool, list = p, name = "blocker" }) 631 local logall = new_basic_factory({ list = p, name = "logall" }, logall_factory_gen) 632 local summary = new_basic_factory({ list = p, name = "summary" }, summary_factory_gen) 633 local waitfor = new_basic_factory({ list = p, name = "waitfor" }, waitfor_factory_gen) 634 local failover = new_basic_factory({ list = p, name = "failover" }, failover_factory_gen) 635 local locality = new_locality_factory({ p = p[1], name = "locality" }) 636 637 local errors = new_error_factory(errors_factory_gen, "errors") 638 local suberrors = new_error_factory(suberrors_factory_gen, "suberrors") 639 local suberr_wrap = new_direct_factory({ p = suberrors, name = "suberrwrap" }) 640 local badreturn = new_error_factory(badreturn_gen, "badreturn") 641 642 -- for testing traffic splitting. 643 local split = new_split_factory({ a = single, b = singletwo, name = "split" }) 644 local splitfailover = new_split_factory({ a = failover, b = singletwo, name = "splitfailover" }) 645 646 local map = { 647 ["single"] = single, 648 ["first"] = first, 649 ["partial"] = partial, 650 ["all"] = all, 651 ["fastgood"] = fastgood, 652 ["fastgoodint"] = fastgoodint, 653 ["blocker"] = blocker, 654 ["logall"] = logall, 655 ["summary"] = summary, 656 ["waitfor"] = waitfor, 657 ["failover"] = failover, 658 ["suberrors"] = suberr_wrap, 659 ["errors"] = errors, 660 ["split"] = split, 661 ["splitfailover"] = splitfailover, 662 ["locality"] = locality, 663 ["badreturn"] = badreturn, 664 } 665 666 local parg = { 667 default = single, 668 list = map, 669 pattern = "^/(%a+)/" 670 } 671 672 local failgen = new_error_factory(failgen_factory_gen, "failgen") 673 local failgenret = new_error_factory(failgenret_factory_gen, "failgenret") 674 675 local mapfail = { 676 ["failgen"] = failgen, 677 ["failgenret"] = failgenret, 678 } 679 local farg = { 680 default = single, 681 list = mapfail, 682 pattern = "^(%a+)/", 683 name = "prefixfail" 684 } 685 686 local pfx = mcp.router_new({ map = map }) 687 local pfxfail = new_prefix_factory(farg) 688 689 mcp.attach(mcp.CMD_ANY_STORAGE, pfx) 690 -- TODO: might need to move this fail stuff to another test file. 691 mcp.attach(mcp.CMD_MS, pfxfail) 692 mcp.attach(mcp.CMD_MD, pfxfail) 693end 694