1-- New style request factories and backend request handling.
2--
3-- First, this API adds a "request func generation" step when a new request
4-- starts: if there is not already a cached function to use, call the
5-- "generator" function, then use the response to run the request. This generated
6-- function is reused until the parent generator is swapped out during reload.
7-- This allows the user to pre-allocate and pre-calculate objects and data,
8-- offering both safety and performance.
9-- Future API revisions (such as stats) will rely on this generation step to
10-- be more user friendly while retaining performance.
11--
12-- For backend IO's this unifies what was once two API's:
13--  - result = pool(request): the non-async API
14--  - table = mcp.await(etc)
15--
16-- It is now a single system governeed by a request context object (rctx).
17-- This new system allows queueing a nearly arbitrary set of requests,
18-- "blocking" a client on any individual response, and using callbacks to
19-- make decisions on if a response is "good", to resume processing early, or
20-- post-process once all responses are received.
21--
22-- The queueing system is now recursive: a fgen can new_handle() another fgen.
23-- Meaning configurations can be assembled as call graphs. IE: If you have a
24-- route function A and want to "shadow" some of its requests onto route
25-- function B, instead of making A more complex you can create a third
26-- function C which splits the traffic.
27--
28-- API docs: https://github.com/memcached/memcached/wiki/Proxy
29
30verbose = true
31-- global for an error handling test
32failgen_armed = false
33failgenret_armed = false
34
35function say(...)
36    if verbose then
37        print(...)
38    end
39end
40
41function mcp_config_pools()
42    local srv = mcp.backend
43
44    local b1 = srv('b1', '127.0.0.1', 12011)
45    local b2 = srv('b2', '127.0.0.1', 12012)
46    local b3 = srv('b3', '127.0.0.1', 12013)
47    local b4 = srv('b4', '127.0.0.1', 12014)
48    local b1z = mcp.pool({b1})
49    local b2z = mcp.pool({b2})
50    local b3z = mcp.pool({b3})
51    local b4z = mcp.pool({b4})
52    local p = {p = {b1z, b2z, b3z}, b = b4z}
53
54    --return mcp.pool(b1z, { iothread = false })
55    return p
56end
57
58-- many of these factories have the same basic init pattern, so we can save
59-- some code.
60function new_basic_factory(arg, func)
61    local fgen = mcp.funcgen_new()
62    local o = { t = {}, c = 0 }
63
64    -- some of them have a wait, some don't.
65    -- here would be a good place to do bounds checking on arguments in
66    -- similar functions.
67    o.wait = arg.wait
68    for _, v in pairs(arg.list) do
69        table.insert(o.t, fgen:new_handle(v))
70        o.c = o.c + 1
71    end
72
73    fgen:ready({ f = func, a = o, n = arg.name})
74    return fgen
75end
76
77function new_prefix_factory(arg)
78    local fgen = mcp.funcgen_new()
79    local o = {}
80    o.pattern = arg.pattern
81    o.default = fgen:new_handle(arg.default)
82
83    o.map = {}
84    -- get handler ids for each sub-route value
85    -- convert the map.
86    for k, v in pairs(arg.list) do
87        o.map[k] = fgen:new_handle(v)
88    end
89
90    fgen:ready({ f = prefix_factory_gen, a = o, n = arg.name })
91    return fgen
92end
93
94function prefix_factory_gen(rctx, arg)
95    local p = arg.pattern
96    local map = arg.map
97    local d = arg.default
98
99    say("generating a prefix factory function")
100
101    return function(r)
102        local key = r:key()
103
104        local handle = map[string.match(key, p)]
105        if handle == nil then
106            return rctx:enqueue_and_wait(r, d)
107        end
108        return rctx:enqueue_and_wait(r, handle)
109    end
110end
111
112function new_direct_factory(arg)
113    local fgen = mcp.funcgen_new()
114    local h = fgen:new_handle(arg.p)
115    fgen:ready({ f = direct_factory_gen, a = h, n = arg.name })
116    return fgen
117end
118
119function direct_factory_gen(rctx, h)
120    say("generating direct factory function")
121
122    return function(r)
123        say("waiting on a single pool")
124        return rctx:enqueue_and_wait(r, h)
125    end
126end
127
128function new_locality_factory(arg)
129    local fgen = mcp.funcgen_new()
130    local h = fgen:new_handle(arg.p)
131    fgen:ready({ f = locality_factory_gen, a = h, n = arg.name })
132    return fgen
133end
134
135-- factory for proving slots have unique environmental memory.
136-- we need to wait on a backend to allow the test to pipeline N requests in
137-- parallel, to prove that each parallel slot has a unique lua environment.
138function locality_factory_gen(rctx, h)
139    say("generating locality factory function")
140    local x = 0
141
142    return function(r)
143        x = x + 1
144        say("returning from locality: " .. x)
145        local res = rctx:enqueue_and_wait(r, h)
146        return "HD t" .. x .. "\r\n"
147    end
148end
149
150-- waits for only the _first_ queued handle to return.
151-- ie; position 1 in the table.
152-- we do a numeric for loop in the returned function to avoid allocations done
153-- by a call to pairs()
154function first_factory_gen(rctx, arg)
155    say("generating first factory function")
156    local t = arg.t
157    local count = arg.c
158
159    return function(r)
160        say("waiting on first of " .. count .. " pools")
161        for x=1, count do
162            rctx:enqueue(r, t[x])
163        end
164
165        return rctx:wait_handle(t[1])
166    end
167end
168
169-- wait on x out of y
170function partial_factory_gen(rctx, arg)
171    say("generating partial factory function")
172    local t = arg.t
173    local count = arg.c
174    local wait = arg.wait
175
176    return function(r)
177        say("waiting on first " .. wait .. " out of " .. count)
178        for x=1, count do
179            rctx:enqueue(r, t[x])
180        end
181
182        local done = rctx:wait_cond(wait)
183        for x=1, count do
184            -- :good will only return the result object if the handle's
185            -- response was considered "good"
186            local res = rctx:res_good(t[x])
187            if res ~= nil then
188                say("found a result")
189                return res
190            end
191            -- TODO: tally up responses and send summary for test.
192        end
193        say("found nothing")
194        -- didn't return anything good, so return one at random.
195        for x=1, count do
196            local res = rctx:res_any(t[x])
197            if res ~= nil then
198                return res
199            end
200        end
201    end
202end
203
204-- wait on all pool arguments
205function all_factory_gen(rctx, arg)
206    say("generating all factory function")
207    local t = arg.t
208    local count = arg.c
209    -- should be a minor speedup avoiding the table lookup.
210    local mode = mcp.WAIT_ANY
211
212    return function(r)
213        say("waiting on " .. count)
214
215        rctx:enqueue(r, t)
216        local done = rctx:wait_cond(count, mode)
217        -- :any will give us the result object for that handle, regardless
218        -- of return code/status.
219        local res = rctx:res_any(t[1])
220
221        -- TODO: tally up the responses and return summary for test.
222        return res
223    end
224end
225
226-- wait on the first good or N of total
227function fastgood_factory_gen(rctx, arg)
228    say("generating fastgood factory function")
229    local t = arg.t
230    local count = arg.c
231    local wait = arg.wait
232
233    local cb = function(res)
234        say("running in a callback!")
235        if res:hit() then
236            say("was a hit!")
237            -- return an extra arg telling us to shortcut the wait count
238            return mcp.WAIT_GOOD, mcp.WAIT_RESUME
239        end
240        -- default return code is mcp.WAIT_ANY
241    end
242
243    for _, v in pairs(t) do
244        rctx:handle_set_cb(v, cb)
245    end
246
247    return function(r)
248        say("first good or wait for N")
249
250        rctx:enqueue(r, t)
251        local done = rctx:wait_cond(wait, mcp.WAIT_GOOD)
252        say("fastgood done:", done)
253
254        if done == 1 then
255            -- if we just got one "good", we're probably happy.
256            for x=1, count do
257                -- loop to find the good handle.
258                local res = rctx:res_good(t[x])
259                if res ~= nil then
260                    return res
261                end
262            end
263        else
264            -- else we had to wait and now need to decide if it was a miss or
265            -- network error.
266            -- but for this test we'll just return the first result.
267            for x=1, count do
268                local res = rctx:res_any(t[x])
269                if res ~= nil then
270                    return res
271                end
272            end
273        end
274    end
275end
276
277-- fastgood implemented using internal fastgood state
278function fastgoodint_factory_gen(rctx, arg)
279    local t = arg.t
280    local count = arg.c
281    local wait = arg.wait
282
283    return function(r)
284        rctx:enqueue(r, t)
285        local done = rctx:wait_cond(wait, mcp.WAIT_FASTGOOD)
286        say("fastgoodint done:", done)
287
288        local final = nil
289        for x=1, count do
290            local res, mode = rctx:result(t[x])
291            if mode == mcp.WAIT_GOOD then
292                return res
293            elseif res ~= nil then
294                final = res
295            end
296        end
297        -- if no good found, return anything.
298        return final
299    end
300end
301
302function new_blocker_factory(arg)
303    local fgen = mcp.funcgen_new()
304    local o = { c = 0, t = {} }
305    o.b = fgen:new_handle(arg.blocker)
306
307    for _, v in pairs(arg.list) do
308        table.insert(o.t, fgen:new_handle(v))
309        o.c = o.c + 1
310    end
311
312    fgen:ready({ f = blocker_factory_gen, a = o, n = arg.name })
313    return fgen
314end
315
316-- queue a bunch, but shortcut if a special auxiliary handle fails
317function blocker_factory_gen(rctx, arg)
318    say("generating blocker factory function")
319    local t = arg.t
320    local count = arg.c
321    local blocker = arg.b
322    local was_blocked = false
323
324    local cb = function(res)
325        -- check the response or tokens or anything special to indicate
326        -- success.
327        -- for this test we just check if it was a hit.
328        if res:hit() then
329            was_blocked = false
330            return mcp.WAIT_GOOD
331        else
332            was_blocked = true
333            return mcp.WAIT_ANY
334        end
335    end
336
337    rctx:handle_set_cb(blocker, cb)
338
339    return function(r)
340        say("function blocker test")
341
342        -- queue up the real queries we wanted to run.
343        rctx:enqueue(r, t)
344
345        -- any wait command will execute all queued queries at once, but here
346        -- we only wait for the blocker to complete.
347        local bres = rctx:enqueue_and_wait(r, blocker)
348
349        -- another way of doing this is to ask:
350        -- local res = rctx:res_good(blocker)
351        -- if a result was returned, the callback had returned WAIT_GOOD
352        if was_blocked == false then
353            -- our blocker is happy...
354            -- wait for the rest of the handles to come in and make a decision
355            -- on what to return to the client.
356            local done = rctx:wait_cond(count, mcp.WAIT_ANY)
357            return rctx:res_any(t[1])
358        else
359            return "SERVER_ERROR blocked\r\n"
360        end
361    end
362end
363
364-- log on all callbacks, even if waiting for 1
365function logall_factory_gen(rctx, arg)
366    say("generating logall factory function")
367    local t = arg.t
368
369    local cb = function(res, req)
370        say("received a response, logging...")
371        mcp.log("received a response: " .. tostring(res:ok()))
372        mcp.log_req(req, res, "even more logs", rctx:cfd())
373        return mcp.WAIT_ANY
374    end
375
376    for _, v in pairs(t) do
377        rctx:handle_set_cb(v, cb)
378    end
379
380    return function(r)
381        rctx:enqueue(r, t)
382        return rctx:wait_handle(t[1])
383    end
384end
385
386-- log a summary after all callbacks run
387function summary_factory_gen(rctx, arg)
388    say("generating summary factory function")
389    local t = arg.t
390    local count = arg.c
391
392    local todo = 0
393    local cb = function(res)
394        say("responses TODO: " .. todo)
395        todo = todo - 1
396        if todo == 0 then
397            mcp.log("received all responses")
398        end
399    end
400
401    for _, v in pairs(t) do
402        rctx:handle_set_cb(v, cb)
403    end
404
405    return function(r)
406        -- re-seed the todo value that the callback uses
407        todo = count
408
409        rctx:enqueue(r, t)
410        -- we're just waiting for a single response, but we queue all of the
411        -- handles. the callback uses data from the shared environment and a
412        -- summary is logged.
413        return rctx:wait_handle(t[1])
414    end
415end
416
417-- testing various waitfor conditions.
418function waitfor_factory_gen(rctx, arg)
419    say("generating background factory function")
420    local t = arg.t
421    local count = arg.c
422
423    return function(r)
424        local key = r:key()
425        if key == "waitfor/a" then
426            rctx:enqueue(r, t)
427            rctx:wait_cond(0) -- issue the requests in the background
428            return "HD t1\r\n" -- return whatever to the client
429        elseif key == "waitfor/b" then
430            rctx:enqueue(r, t)
431            rctx:wait_cond(0) -- issue requests and resume
432            -- now go back into wait mode, but we've already dispatched
433            local done = rctx:wait_cond(2)
434            if done ~= 2 then
435                return "SERVER_ERROR invalid wait"
436            end
437            -- TODO: bonus points, count the goods or check that everyone's t
438            -- flag is right.
439            for x=1, count do
440                local res = rctx:res_good(x)
441                if res ~= nil then
442                    return res
443                end
444                return "SERVER_ERROR no good response"
445            end
446        elseif key == "waitfor/c" then
447            rctx:enqueue(r, t[1])
448            rctx:wait_cond(0) -- issue the first queued request
449            -- queue two more
450            rctx:enqueue(r, t[2])
451            rctx:enqueue(r, t[3])
452            -- wait explicitly for the first queued one.
453            return rctx:wait_handle(t[1])
454        elseif key == "waitfor/d" then
455            -- queue two then wait on each individually
456            rctx:enqueue(r, t[1])
457            rctx:enqueue(r, t[2])
458            rctx:wait_handle(t[1])
459            return rctx:wait_handle(t[2])
460        end
461    end
462end
463
464-- try "primary zone" and then fail over to secondary zones.
465-- using simplified code that just treats the first pool as the primary zone.
466function failover_factory_gen(rctx, arg)
467    say("generating failover factory function")
468    local t = {}
469    local count = arg.c
470    local first = arg.t[1]
471
472    for x=2, count do
473        table.insert(t, arg.t[x])
474    end
475
476    return function(r)
477        -- first try local
478        local fres = rctx:enqueue_and_wait(r, first)
479
480        if fres == nil or fres:hit() == false then
481            -- failed to get a local hit, queue all "far" zones.
482            rctx:enqueue(r, t)
483            -- wait for one.
484            local done = rctx:wait_cond(1, mcp.WAIT_GOOD)
485            -- find the good from the second set.
486            for x=1, count-1 do
487                local res = rctx:res_good(t[x])
488                if res ~= nil then
489                    say("found a result")
490                    return res
491                end
492            end
493            -- got nothing from second set, just return anything.
494            return rctx:res_any(first)
495        else
496            return fres
497        end
498    end
499end
500
501function new_error_factory(func, name)
502    local fgen = mcp.funcgen_new()
503    fgen:ready({ f = func, n = name })
504    return fgen
505end
506
507function errors_factory_gen(rctx)
508    say("generating errors factory")
509
510    return function(r)
511        local key = r:key()
512        -- failure scenarios that require a top-level request context
513        if key == "errors/reterror" then
514            error("test error")
515        elseif key == "errors/retnil" then
516            return nil
517        elseif key == "errors/retint" then
518            return 5
519        elseif key == "errors/retnone" then
520            return
521        end
522    end
523end
524
525function suberrors_factory_gen(rctx)
526    say("generating suberrors factory function")
527
528    return function(r)
529        local key = r:key()
530        if key == "suberrors/error" then
531            error("test error")
532        elseif key == "suberrors/nil" then
533            return nil
534        elseif key == "suberrors/int" then
535            return 5
536        elseif key == "suberrors/none" then
537            return
538        end
539
540    end
541end
542
543function new_split_factory(arg)
544    local fgen = mcp.funcgen_new()
545    local o = {}
546    o.a = fgen:new_handle(arg.a)
547    o.b = fgen:new_handle(arg.b)
548    fgen:ready({ f = split_factory_gen, a = o, n = name })
549    return fgen
550end
551
552-- example of a factory that takes two other factories and copies traffic
553-- across them.
554-- If an additional API's for hashing to numerics are added, keys can be
555-- hashed to allow "1/n" of keys to copy to one of the splits. This allows
556-- shadowing traffic to new/experimental pools, slow-warming traffic, etc.
557function split_factory_gen(rctx, arg)
558    say("generating split factory function")
559    local a = arg.a
560    local b = arg.b
561
562    return function(r)
563        say("splitting traffic")
564        -- b is the split path.
565        rctx:enqueue(r, b)
566
567        -- a is the main path. so we only explicitly wait on and return a.
568        return rctx:enqueue_and_wait(r, a)
569    end
570end
571
572-- test handling of failure to generate a function slot
573function failgen_factory_gen(rctx)
574    if failgen_armed then
575        say("throwing failgen error")
576        error("failgen")
577    end
578    say("arming failgen")
579    failgen_armed = true
580
581    return function(r)
582        return "NF\r\n"
583    end
584end
585
586function failgenret_factory_gen(rctx)
587    if failgenret_armed then
588        return nil
589    end
590    failgenret_armed = true
591
592    return function(r)
593        return "NF\r\n"
594    end
595end
596
597function badreturn_gen(rctx)
598    -- returning a userdata that isn't the correct kind of userdata.
599    -- shouldn't crash the daemon!
600    return function(r)
601        return rctx
602    end
603end
604
605-- TODO: this might be supported only in a later update.
606-- new queue after parent return
607-- - do an immediate return + cb queue, queue from that callback
608-- - should still work but requires worker lua vm
609-- requires removing the need of having an active client socket object to
610-- queue new requests for processing.
611function postreturn_factory(rctx, arg)
612
613end
614
615-- TODO: demonstrate a split call graph
616-- ie; an all split into two single
617
618function mcp_config_routes(p)
619    local b_pool = p.b
620    p = p.p
621    local single = new_direct_factory({ p = p[1], name = "single" })
622    -- use the typically unused backend.
623    local singletwo = new_direct_factory({ p = b_pool, name = "singletwo" })
624
625    local first = new_basic_factory({ list = p, name = "first" }, first_factory_gen)
626    local partial = new_basic_factory({ list = p, wait = 2, name = "partial" }, partial_factory_gen)
627    local all = new_basic_factory({ list = p, name = "all" }, all_factory_gen)
628    local fastgood = new_basic_factory({ list = p, wait = 2, name = "fastgood" }, fastgood_factory_gen)
629    local fastgoodint = new_basic_factory({ list = p, wait = 2, name = "fastgoodint" }, fastgoodint_factory_gen)
630    local blocker = new_blocker_factory({ blocker = b_pool, list = p, name = "blocker" })
631    local logall = new_basic_factory({ list = p, name = "logall" }, logall_factory_gen)
632    local summary = new_basic_factory({ list = p, name = "summary" }, summary_factory_gen)
633    local waitfor = new_basic_factory({ list = p, name = "waitfor" }, waitfor_factory_gen)
634    local failover = new_basic_factory({ list = p, name = "failover" }, failover_factory_gen)
635    local locality = new_locality_factory({ p = p[1], name = "locality" })
636
637    local errors = new_error_factory(errors_factory_gen, "errors")
638    local suberrors = new_error_factory(suberrors_factory_gen, "suberrors")
639    local suberr_wrap = new_direct_factory({ p = suberrors, name = "suberrwrap" })
640    local badreturn = new_error_factory(badreturn_gen, "badreturn")
641
642    -- for testing traffic splitting.
643    local split = new_split_factory({ a = single, b = singletwo, name = "split" })
644    local splitfailover = new_split_factory({ a = failover, b = singletwo, name = "splitfailover" })
645
646    local map = {
647        ["single"] = single,
648        ["first"] = first,
649        ["partial"] = partial,
650        ["all"] = all,
651        ["fastgood"] = fastgood,
652        ["fastgoodint"] = fastgoodint,
653        ["blocker"] = blocker,
654        ["logall"] = logall,
655        ["summary"] = summary,
656        ["waitfor"] = waitfor,
657        ["failover"] = failover,
658        ["suberrors"] = suberr_wrap,
659        ["errors"] = errors,
660        ["split"] = split,
661        ["splitfailover"] = splitfailover,
662        ["locality"] = locality,
663        ["badreturn"] = badreturn,
664    }
665
666    local parg = {
667        default = single,
668        list = map,
669        pattern = "^/(%a+)/"
670    }
671
672    local failgen = new_error_factory(failgen_factory_gen, "failgen")
673    local failgenret = new_error_factory(failgenret_factory_gen, "failgenret")
674
675    local mapfail = {
676        ["failgen"] = failgen,
677        ["failgenret"] = failgenret,
678    }
679    local farg = {
680        default = single,
681        list = mapfail,
682        pattern = "^(%a+)/",
683        name = "prefixfail"
684    }
685
686    local pfx = mcp.router_new({ map = map })
687    local pfxfail = new_prefix_factory(farg)
688
689    mcp.attach(mcp.CMD_ANY_STORAGE, pfx)
690    -- TODO: might need to move this fail stuff to another test file.
691    mcp.attach(mcp.CMD_MS, pfxfail)
692    mcp.attach(mcp.CMD_MD, pfxfail)
693end
694