1-- WARNING: if you cause errors during configuration reload by putting
2-- incompatible data into the table returned by mcp_config_pools, the daeomon
3-- will exit.
4-- TODO: fallback cache for broken/overloaded zones.
5
6-- local zone could/should be fetched from environment or local file.
7-- doing so allows all configuration files to be identical, simplifying consistency checks.
8local my_zone = 'z1'
9
10local STAT_EXAMPLE <const> = 1
11local STAT_ANOTHER <const> = 2
12
13function mcp_config_pools(oldss)
14    mcp.add_stat(STAT_EXAMPLE, "example")
15    mcp.add_stat(STAT_ANOTHER, "another")
16    --mcp.tcp_keepalive(true)
17    mcp.backend_connect_timeout(5.5) -- 5 and a half second timeout.
18    -- alias mcp.backend for convenience.
19    -- important to alias global variables in routes where speed is concerned.
20    local srv = mcp.backend
21    -- local zones = { 'z1', 'z2', 'z3' }
22
23    -- IPs are "127" . "zone" . "pool" . "srv"
24    local pfx = 'fooz1'
25    local fooz1 = {
26        srv(pfx .. 'srv1', '127.1.1.1', 11212),
27        srv(pfx .. 'srv2', '127.1.1.2', 11212),
28        srv(pfx .. 'srv3', '127.1.1.3', 11212),
29    }
30    pfx = 'fooz2'
31    local fooz2 = {
32        srv(pfx .. 'srv1', '127.2.1.1', 11213),
33        srv(pfx .. 'srv2', '127.2.1.2', 11213),
34        srv(pfx .. 'srv3', '127.2.1.3', 11213),
35    }
36    pfx = 'fooz3'
37    local fooz3 = {
38        srv(pfx .. 'srv1', '127.3.1.1', 11214),
39        srv(pfx .. 'srv2', '127.3.1.2', 11214),
40        srv(pfx .. 'srv3', '127.3.1.3', 11214),
41    }
42
43    pfx = 'barz1'
44    -- zone "/bar/"-s primary zone should fail; all down.
45    local barz1 = {
46        srv(pfx .. 'srv1', '127.1.2.1', 11210),
47        srv(pfx .. 'srv2', '127.1.2.2', 11210),
48        srv(pfx .. 'srv3', '127.1.2.3', 11210),
49    }
50    pfx = 'barz2'
51    local barz2 = {
52        srv(pfx .. 'srv1', '127.2.2.1', 11215),
53        srv(pfx .. 'srv2', '127.2.2.2', 11215),
54        srv(pfx .. 'srv3', '127.2.2.3', 11215),
55    }
56    pfx = 'barz3'
57    local barz3 = {
58        srv(pfx .. 'srv1', '127.3.2.1', 11216),
59        srv(pfx .. 'srv2', '127.3.2.2', 11216),
60        srv(pfx .. 'srv3', '127.3.2.3', 11216),
61    }
62
63    -- fallback cache for any zone
64    -- NOT USED YET
65    pfx = 'fallz1'
66    local fallz1 = {
67        srv(pfx .. 'srv1', '127.0.2.1', 11212),
68    }
69    pfx = 'fallz2'
70    local fallz2 = {
71        srv(pfx .. 'srv1', '127.0.2.2', 11212),
72    }
73    pfx = 'fallz3'
74    local fallz3 = {
75        srv(pfx .. 'srv1', '127.0.2.3', 11212),
76    }
77
78    local main_zones = {
79        foo = { z1 = fooz1, z2 = fooz2, z3 = fooz3 },
80        bar = { z1 = barz1, z2 = barz2, z3 = barz3 },
81        -- fall = { z1 = fallz1, z2 = fallz2, z3 = fallz3 },
82    }
83
84    -- FIXME: should we copy the table to keep the pool tables around?
85    -- does the hash selector hold a reference to the pool (but only available in main config?)
86
87    -- convert the pools into hash selectors.
88    -- TODO: is this a good place to add prefixing/hash editing?
89    for _, subs in pairs(main_zones) do
90        for k, v in pairs(subs) do
91            -- next line uses a ring hash in "evcache compat" mode. note the
92            -- hash= override to use MD5 key hashing from ketama.
93            -- subs[k] = mcp.pool(v, { dist = mcp.dist_ring_hash, omode = "evcache", hash = mcp.dist_ring_hash.hash })
94            -- override the number of buckets per server.
95            -- subs[k] = mcp.pool(v, { dist = mcp.dist_ring_hash, omode = "evcache", hash = mcp.dist_ring_hash.hash, obuckets = 240 })
96            -- this line uses the default (currently xxhash + jump hash)
97            subs[k] = mcp.pool(v)
98
99            -- use this next line instead for jump hash.
100            -- the order of servers in the pool argument _must_ not change!
101            -- adding the seed string will give a different key distribution
102            -- for each zone.
103            -- NOTE: 'k' may not be the right seed here:
104            -- instead stitch main_zone's key + the sub key?
105            -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k })
106            -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k, filter = "stop", filter_conf = "|#|" })
107            -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k, filter = "tags", filter_conf = "{}" })
108        end
109    end
110
111    return main_zones
112end
113
114-- WORKER CODE:
115
116-- need to redefine main_zones using fetched selectors?
117
118function reqlog_factory(route)
119    local nr = route
120    return function(r)
121        local res, detail = nr(r)
122        mcp.log_req(r, res, detail)
123        return res
124    end
125end
126
127-- TODO: Fallback zone here?
128function failover_factory(zones, local_zone)
129    local near_zone = zones[local_zone]
130    local far_zones = {}
131    -- NOTE: could shuffle/sort to re-order zone retry order
132    -- or use 'next(far_zones, idx)' via a stored upvalue here
133    for k, v in pairs(zones) do
134        if k ~= local_zone then
135            far_zones[k] = v
136        end
137    end
138    return function(r)
139        local res = near_zone(r)
140        if res:hit() == false then
141            -- example for mcp.log... Don't do this though :)
142            -- mcp.log("failed to find " .. r:key() .. " in zone: " .. local_zone)
143            --for _, zone in pairs(far_zones) do
144            --    res = zone(r)
145            local restable = mcp.await(r, far_zones, 1)
146            for _, res in pairs(restable) do
147                if res:hit() then
148                    --break
149                    return res, "failover_backup_hit"
150                end
151            end
152            return restable[1], "failover_backup_miss"
153        end
154        -- example of making a new set request on the side.
155        -- local nr = mcp.request("set /foo/asdf 0 0 " .. res:vlen() .. "\r\n", res)
156        -- local nr = mcp.request("set /foo/asdf 0 0 2\r\n", "mo\r\n")
157        -- near_zone(nr)
158        return res, "failover_hit" -- send result back to client
159    end
160end
161
162function meta_get_factory(zones, local_zone)
163    local near_zone = zones[local_zone]
164    -- in this test function we only fetch from the local zone.
165    return function(r)
166        if r:has_flag("l") == true then
167            print("client asking for last access time")
168        end
169        local texists, token = r:flag_token("O")
170        -- next example returns the previous token and replaces it.
171        -- local texists, token = r:flag_token("O", "Odoot")
172        if token ~= nil then
173            print("meta opaque flag token: " .. token)
174        end
175        local res = near_zone(r)
176
177        return res
178    end
179end
180
181function meta_set_factory(zones, local_zone)
182    local near_zone = zones[local_zone]
183    -- in this test function we only talk to the local zone.
184    return function(r)
185        local res = near_zone(r)
186        if res:code() == mcp.MCMC_CODE_NOT_FOUND then
187            print("got meta NF response")
188        end
189        print("meta response line: " .. res:line())
190
191        return res
192    end
193end
194
195-- SET's to main zone, issues deletes to far zones.
196function setinvalidate_factory(zones, local_zone)
197    local near_zone = zones[local_zone]
198    local far_zones = {}
199    -- NOTE: could shuffle/sort to re-order zone retry order
200    -- or use 'next(far_zones, idx)' via a stored upvalue here
201    for k, v in pairs(zones) do
202        if k ~= local_zone then
203            far_zones[k] = v
204        end
205    end
206    local new_req = mcp.request
207    return function(r)
208        local res = near_zone(r)
209        if res:ok() == true then
210            -- create a new delete request
211            local dr = new_req("delete /testing/" .. r:key() .. "\r\n")
212            -- example of new request from existing request
213            -- note this isn't trimming the key so it'll make a weird one.
214            -- local dr = new_req("set /bar/" .. r:key() .. " 0 0 " .. r:token(5) .. "\r\n", r)
215            -- AWAIT_BACKGROUND allows us to immediately resume processing, executing the
216            -- delete requests in the background.
217            mcp.await(dr, far_zones, 0, mcp.AWAIT_BACKGROUND)
218            --mcp.await(dr, far_zones, 0)
219            mcp.log_req(r, res, "setinvalidate") -- time against the original request, since we have no result.
220        end
221        -- use original response for client, not DELETE's response.
222        -- else client won't understand.
223        return res -- send result back to client
224    end
225end
226
227-- NOTE: this function is culling key prefixes. it is an error to use it
228-- without a left anchored (^) pattern.
229function prefixtrim_factory(pattern, list, default)
230    local p = pattern
231    local l = list
232    local d = default
233    local s = mcp.stat
234    return function(r)
235        local i, j, match = string.find(r:key(), p)
236        local route
237        if match ~= nil then
238            -- remove the key prefix so we don't waste storage.
239            r:ltrimkey(j)
240            route = l[match]
241            if route == nil then
242                -- example counter: tick when default route hit.
243                s(STAT_EXAMPLE, 1)
244                return d(r)
245            end
246        end
247        return route(r)
248    end
249end
250
251function prefix_factory(pattern, list, default)
252    local p = pattern
253    local l = list
254    local d = default
255    local s = mcp.stat
256    return function(r)
257        local route = l[string.match(r:key(), p)]
258        if route == nil then
259            -- example counter: tick when default route hit.
260            s(STAT_EXAMPLE, 1)
261            return d(r)
262        end
263        return route(r)
264    end
265end
266
267-- TODO: Check tail call requirements?
268function command_factory(map, default)
269    local m = map
270    local d = default
271    return function(r)
272        local f = map[r:command()]
273        if f == nil then
274            -- print("default command")
275            return d(r)
276        end
277        -- testing options replacement...
278        -- if r:command() == mcp.CMD_SET then
279        --    r:token(4, "100") -- set exptime.
280        -- end
281        -- print("override command")
282        return f(r)
283    end
284end
285
286-- TODO: is the return value the average? anything special?
287-- walks a list of selectors and repeats the request.
288function walkall_factory(pool)
289    local p = {}
290    -- TODO: a shuffle could be useful here.
291    for _, v in pairs(pool) do
292        table.insert(p, v)
293    end
294    local x = #p
295    return function(r)
296        local restable = mcp.await(r, p)
297        -- walk results and return "best" result
298        -- print("length of await result table", #restable)
299        for _, res in pairs(restable) do
300            if res:ok() then
301                return res
302            end
303        end
304        -- else we return the first result.
305        return restable[1]
306    end
307end
308
309function mcp_config_routes(main_zones)
310    -- generate the prefix routes from zones.
311    local prefixes = {}
312    for pfx, z in pairs(main_zones) do
313        local failover = reqlog_factory(failover_factory(z, my_zone))
314        local all = walkall_factory(main_zones[pfx])
315        local setdel = setinvalidate_factory(z, my_zone)
316        local map = {}
317        map[mcp.CMD_SET] = all
318        -- NOTE: in t/proxy.t all the backends point to the same place
319        -- which makes replicating delete return NOT_FOUND
320        map[mcp.CMD_DELETE] = all
321        -- similar with ADD. will get an NOT_STORED back.
322        -- need better routes designed for the test suite (edit the key
323        -- prefix or something)
324        map[mcp.CMD_ADD] = failover_factory(z, my_zone)
325        map[mcp.CMD_MG] = meta_get_factory(z, my_zone)
326        map[mcp.CMD_MS] = meta_set_factory(z, my_zone)
327        prefixes[pfx] = command_factory(map, failover)
328    end
329
330    local routetop = prefix_factory("^/(%a+)/", prefixes, function(r) return "SERVER_ERROR no route\r\n" end)
331
332    -- internally run parser at top of tree
333    -- also wrap the request string with a convenience object until the C bits
334    -- are attached to the internal parser.
335    --mcp.attach(mcp.CMD_ANY, function (r) return routetop(r) end)
336    mcp.attach(mcp.CMD_ANY_STORAGE, routetop)
337    -- tagged top level attachments. ex: memcached -l tag[tagtest]:127.0.0.1:11212
338    -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR no route\r\n" end, "tagtest")
339    -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR my route\r\n" end, "newtag")
340end
341