1-- WARNING: if you cause errors during configuration reload by putting 2-- incompatible data into the table returned by mcp_config_pools, the daeomon 3-- will exit. 4-- TODO: fallback cache for broken/overloaded zones. 5 6-- local zone could/should be fetched from environment or local file. 7-- doing so allows all configuration files to be identical, simplifying consistency checks. 8local my_zone = 'z1' 9 10local STAT_EXAMPLE <const> = 1 11local STAT_ANOTHER <const> = 2 12 13function mcp_config_pools(oldss) 14 mcp.add_stat(STAT_EXAMPLE, "example") 15 mcp.add_stat(STAT_ANOTHER, "another") 16 --mcp.tcp_keepalive(true) 17 mcp.backend_connect_timeout(5.5) -- 5 and a half second timeout. 18 -- alias mcp.backend for convenience. 19 -- important to alias global variables in routes where speed is concerned. 20 local srv = mcp.backend 21 -- local zones = { 'z1', 'z2', 'z3' } 22 23 -- IPs are "127" . "zone" . "pool" . "srv" 24 local pfx = 'fooz1' 25 local fooz1 = { 26 srv(pfx .. 'srv1', '127.1.1.1', 11212), 27 srv(pfx .. 'srv2', '127.1.1.2', 11212), 28 srv(pfx .. 'srv3', '127.1.1.3', 11212), 29 } 30 pfx = 'fooz2' 31 local fooz2 = { 32 srv(pfx .. 'srv1', '127.2.1.1', 11213), 33 srv(pfx .. 'srv2', '127.2.1.2', 11213), 34 srv(pfx .. 'srv3', '127.2.1.3', 11213), 35 } 36 pfx = 'fooz3' 37 local fooz3 = { 38 srv(pfx .. 'srv1', '127.3.1.1', 11214), 39 srv(pfx .. 'srv2', '127.3.1.2', 11214), 40 srv(pfx .. 'srv3', '127.3.1.3', 11214), 41 } 42 43 pfx = 'barz1' 44 -- zone "/bar/"-s primary zone should fail; all down. 45 local barz1 = { 46 srv(pfx .. 'srv1', '127.1.2.1', 11210), 47 srv(pfx .. 'srv2', '127.1.2.2', 11210), 48 srv(pfx .. 'srv3', '127.1.2.3', 11210), 49 } 50 pfx = 'barz2' 51 local barz2 = { 52 srv(pfx .. 'srv1', '127.2.2.1', 11215), 53 srv(pfx .. 'srv2', '127.2.2.2', 11215), 54 srv(pfx .. 'srv3', '127.2.2.3', 11215), 55 } 56 pfx = 'barz3' 57 local barz3 = { 58 srv(pfx .. 'srv1', '127.3.2.1', 11216), 59 srv(pfx .. 'srv2', '127.3.2.2', 11216), 60 srv(pfx .. 'srv3', '127.3.2.3', 11216), 61 } 62 63 -- fallback cache for any zone 64 -- NOT USED YET 65 pfx = 'fallz1' 66 local fallz1 = { 67 srv(pfx .. 'srv1', '127.0.2.1', 11212), 68 } 69 pfx = 'fallz2' 70 local fallz2 = { 71 srv(pfx .. 'srv1', '127.0.2.2', 11212), 72 } 73 pfx = 'fallz3' 74 local fallz3 = { 75 srv(pfx .. 'srv1', '127.0.2.3', 11212), 76 } 77 78 local main_zones = { 79 foo = { z1 = fooz1, z2 = fooz2, z3 = fooz3 }, 80 bar = { z1 = barz1, z2 = barz2, z3 = barz3 }, 81 -- fall = { z1 = fallz1, z2 = fallz2, z3 = fallz3 }, 82 } 83 84 -- FIXME: should we copy the table to keep the pool tables around? 85 -- does the hash selector hold a reference to the pool (but only available in main config?) 86 87 -- convert the pools into hash selectors. 88 -- TODO: is this a good place to add prefixing/hash editing? 89 for _, subs in pairs(main_zones) do 90 for k, v in pairs(subs) do 91 -- next line uses a ring hash in "evcache compat" mode. note the 92 -- hash= override to use MD5 key hashing from ketama. 93 -- subs[k] = mcp.pool(v, { dist = mcp.dist_ring_hash, omode = "evcache", hash = mcp.dist_ring_hash.hash }) 94 -- override the number of buckets per server. 95 -- subs[k] = mcp.pool(v, { dist = mcp.dist_ring_hash, omode = "evcache", hash = mcp.dist_ring_hash.hash, obuckets = 240 }) 96 -- this line uses the default (currently xxhash + jump hash) 97 subs[k] = mcp.pool(v) 98 99 -- use this next line instead for jump hash. 100 -- the order of servers in the pool argument _must_ not change! 101 -- adding the seed string will give a different key distribution 102 -- for each zone. 103 -- NOTE: 'k' may not be the right seed here: 104 -- instead stitch main_zone's key + the sub key? 105 -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k }) 106 -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k, filter = "stop", filter_conf = "|#|" }) 107 -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k, filter = "tags", filter_conf = "{}" }) 108 end 109 end 110 111 return main_zones 112end 113 114-- WORKER CODE: 115 116-- need to redefine main_zones using fetched selectors? 117 118function reqlog_factory(route) 119 local nr = route 120 return function(r) 121 local res, detail = nr(r) 122 mcp.log_req(r, res, detail) 123 return res 124 end 125end 126 127-- TODO: Fallback zone here? 128function failover_factory(zones, local_zone) 129 local near_zone = zones[local_zone] 130 local far_zones = {} 131 -- NOTE: could shuffle/sort to re-order zone retry order 132 -- or use 'next(far_zones, idx)' via a stored upvalue here 133 for k, v in pairs(zones) do 134 if k ~= local_zone then 135 far_zones[k] = v 136 end 137 end 138 return function(r) 139 local res = near_zone(r) 140 if res:hit() == false then 141 -- example for mcp.log... Don't do this though :) 142 -- mcp.log("failed to find " .. r:key() .. " in zone: " .. local_zone) 143 --for _, zone in pairs(far_zones) do 144 -- res = zone(r) 145 local restable = mcp.await(r, far_zones, 1) 146 for _, res in pairs(restable) do 147 if res:hit() then 148 --break 149 return res, "failover_backup_hit" 150 end 151 end 152 return restable[1], "failover_backup_miss" 153 end 154 -- example of making a new set request on the side. 155 -- local nr = mcp.request("set /foo/asdf 0 0 " .. res:vlen() .. "\r\n", res) 156 -- local nr = mcp.request("set /foo/asdf 0 0 2\r\n", "mo\r\n") 157 -- near_zone(nr) 158 return res, "failover_hit" -- send result back to client 159 end 160end 161 162function meta_get_factory(zones, local_zone) 163 local near_zone = zones[local_zone] 164 -- in this test function we only fetch from the local zone. 165 return function(r) 166 if r:has_flag("l") == true then 167 print("client asking for last access time") 168 end 169 local texists, token = r:flag_token("O") 170 -- next example returns the previous token and replaces it. 171 -- local texists, token = r:flag_token("O", "Odoot") 172 if token ~= nil then 173 print("meta opaque flag token: " .. token) 174 end 175 local res = near_zone(r) 176 177 return res 178 end 179end 180 181function meta_set_factory(zones, local_zone) 182 local near_zone = zones[local_zone] 183 -- in this test function we only talk to the local zone. 184 return function(r) 185 local res = near_zone(r) 186 if res:code() == mcp.MCMC_CODE_NOT_FOUND then 187 print("got meta NF response") 188 end 189 print("meta response line: " .. res:line()) 190 191 return res 192 end 193end 194 195-- SET's to main zone, issues deletes to far zones. 196function setinvalidate_factory(zones, local_zone) 197 local near_zone = zones[local_zone] 198 local far_zones = {} 199 -- NOTE: could shuffle/sort to re-order zone retry order 200 -- or use 'next(far_zones, idx)' via a stored upvalue here 201 for k, v in pairs(zones) do 202 if k ~= local_zone then 203 far_zones[k] = v 204 end 205 end 206 local new_req = mcp.request 207 return function(r) 208 local res = near_zone(r) 209 if res:ok() == true then 210 -- create a new delete request 211 local dr = new_req("delete /testing/" .. r:key() .. "\r\n") 212 -- example of new request from existing request 213 -- note this isn't trimming the key so it'll make a weird one. 214 -- local dr = new_req("set /bar/" .. r:key() .. " 0 0 " .. r:token(5) .. "\r\n", r) 215 -- AWAIT_BACKGROUND allows us to immediately resume processing, executing the 216 -- delete requests in the background. 217 mcp.await(dr, far_zones, 0, mcp.AWAIT_BACKGROUND) 218 --mcp.await(dr, far_zones, 0) 219 mcp.log_req(r, res, "setinvalidate") -- time against the original request, since we have no result. 220 end 221 -- use original response for client, not DELETE's response. 222 -- else client won't understand. 223 return res -- send result back to client 224 end 225end 226 227-- NOTE: this function is culling key prefixes. it is an error to use it 228-- without a left anchored (^) pattern. 229function prefixtrim_factory(pattern, list, default) 230 local p = pattern 231 local l = list 232 local d = default 233 local s = mcp.stat 234 return function(r) 235 local i, j, match = string.find(r:key(), p) 236 local route 237 if match ~= nil then 238 -- remove the key prefix so we don't waste storage. 239 r:ltrimkey(j) 240 route = l[match] 241 if route == nil then 242 -- example counter: tick when default route hit. 243 s(STAT_EXAMPLE, 1) 244 return d(r) 245 end 246 end 247 return route(r) 248 end 249end 250 251function prefix_factory(pattern, list, default) 252 local p = pattern 253 local l = list 254 local d = default 255 local s = mcp.stat 256 return function(r) 257 local route = l[string.match(r:key(), p)] 258 if route == nil then 259 -- example counter: tick when default route hit. 260 s(STAT_EXAMPLE, 1) 261 return d(r) 262 end 263 return route(r) 264 end 265end 266 267-- TODO: Check tail call requirements? 268function command_factory(map, default) 269 local m = map 270 local d = default 271 return function(r) 272 local f = map[r:command()] 273 if f == nil then 274 -- print("default command") 275 return d(r) 276 end 277 -- testing options replacement... 278 -- if r:command() == mcp.CMD_SET then 279 -- r:token(4, "100") -- set exptime. 280 -- end 281 -- print("override command") 282 return f(r) 283 end 284end 285 286-- TODO: is the return value the average? anything special? 287-- walks a list of selectors and repeats the request. 288function walkall_factory(pool) 289 local p = {} 290 -- TODO: a shuffle could be useful here. 291 for _, v in pairs(pool) do 292 table.insert(p, v) 293 end 294 local x = #p 295 return function(r) 296 local restable = mcp.await(r, p) 297 -- walk results and return "best" result 298 -- print("length of await result table", #restable) 299 for _, res in pairs(restable) do 300 if res:ok() then 301 return res 302 end 303 end 304 -- else we return the first result. 305 return restable[1] 306 end 307end 308 309function mcp_config_routes(main_zones) 310 -- generate the prefix routes from zones. 311 local prefixes = {} 312 for pfx, z in pairs(main_zones) do 313 local failover = reqlog_factory(failover_factory(z, my_zone)) 314 local all = walkall_factory(main_zones[pfx]) 315 local setdel = setinvalidate_factory(z, my_zone) 316 local map = {} 317 map[mcp.CMD_SET] = all 318 -- NOTE: in t/proxy.t all the backends point to the same place 319 -- which makes replicating delete return NOT_FOUND 320 map[mcp.CMD_DELETE] = all 321 -- similar with ADD. will get an NOT_STORED back. 322 -- need better routes designed for the test suite (edit the key 323 -- prefix or something) 324 map[mcp.CMD_ADD] = failover_factory(z, my_zone) 325 map[mcp.CMD_MG] = meta_get_factory(z, my_zone) 326 map[mcp.CMD_MS] = meta_set_factory(z, my_zone) 327 prefixes[pfx] = command_factory(map, failover) 328 end 329 330 local routetop = prefix_factory("^/(%a+)/", prefixes, function(r) return "SERVER_ERROR no route\r\n" end) 331 332 -- internally run parser at top of tree 333 -- also wrap the request string with a convenience object until the C bits 334 -- are attached to the internal parser. 335 --mcp.attach(mcp.CMD_ANY, function (r) return routetop(r) end) 336 mcp.attach(mcp.CMD_ANY_STORAGE, routetop) 337 -- tagged top level attachments. ex: memcached -l tag[tagtest]:127.0.0.1:11212 338 -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR no route\r\n" end, "tagtest") 339 -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR my route\r\n" end, "newtag") 340end 341