1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 #include "proxy.h"
4 
5 // No GC necessary.
6 struct mcp_ratelim_tbf {
7     uint32_t bucket;
8     uint32_t limit;
9     uint32_t fill_rate; // tokens to add per tick rate
10     uint32_t tick_rate; // in milliseconds
11     int64_t last_update; // time in milliseconds
12 };
13 
14 struct mcp_ratelim_global_tbf {
15     struct mcp_globalobj_s g;
16     struct mcp_ratelim_tbf tbf;
17 };
18 
19 #define TIMEVAL_TO_MILLIS(n) (n.tv_usec / 1000 + n.tv_sec * (uint64_t)1000)
20 
21 // global config VM object GC
mcplib_ratelim_global_tbf_gc(lua_State * L)22 int mcplib_ratelim_global_tbf_gc(lua_State *L) {
23     struct mcp_ratelim_global_tbf *lim = luaL_checkudata(L, 1, "mcp.ratelim_global_tbf");
24     assert(lim->g.refcount == 0);
25     mcp_gobj_finalize(&lim->g);
26 
27     // no other memory to directly free, just kill the mutex.
28     return 0;
29 }
30 
31 // worker thread proxy object GC
mcplib_ratelim_proxy_tbf_gc(lua_State * L)32 int mcplib_ratelim_proxy_tbf_gc(lua_State *L) {
33     struct mcp_ratelim_global_tbf **lim_p = luaL_checkudata(L, 1, "mcp.ratelim_proxy_tbf");
34     struct mcp_ratelim_global_tbf *lim = *lim_p;
35     proxy_ctx_t *ctx = PROXY_GET_THR_CTX(L);
36     mcp_gobj_unref(ctx, &lim->g);
37 
38     return 0;
39 }
40 
mcp_ratelim_proxy_tbf(lua_State * from,lua_State * to)41 int mcp_ratelim_proxy_tbf(lua_State *from, lua_State *to) {
42     // from, -3 should have the userdata.
43     struct mcp_ratelim_global_tbf *lim = luaL_checkudata(from, -3, "mcp.ratelim_global_tbf");
44     struct mcp_ratelim_global_tbf **lim_p = lua_newuserdatauv(to, sizeof(struct mcp_ratelim_global_tbf *), 0);
45     luaL_setmetatable(to, "mcp.ratelim_proxy_tbf");
46 
47     *lim_p = lim;
48     lua_pushvalue(from, -3); // copy ratelim obj to ref below
49     mcp_gobj_ref(from, &lim->g); // pops obj copy
50 
51     return 0;
52 }
53 
_tbf_check(lua_State * L,char * key)54 static lua_Integer _tbf_check(lua_State *L, char *key) {
55     lua_Integer n = 0;
56     if (lua_getfield(L, 1, key) != LUA_TNIL) {
57         n = lua_tointeger(L, -1);
58         if (n < 0 || n > UINT_MAX-1) {
59             proxy_lua_error(L, "mcp.ratelim_tbf: arguments must be unsigned 32 bit integer");
60         }
61     }
62     lua_pop(L, 1); // pops value or nil.
63     return n;
64 }
65 
_setup_tbf(lua_State * L,struct mcp_ratelim_tbf * lim)66 static void _setup_tbf(lua_State *L, struct mcp_ratelim_tbf *lim) {
67     struct timeval now;
68     luaL_checktype(L, 1, LUA_TTABLE);
69     lim->limit = _tbf_check(L, "limit");
70     lim->fill_rate = _tbf_check(L, "fillrate");
71     lim->tick_rate = _tbf_check(L, "tickrate");
72 
73     // seed the token bucket filter.
74     lim->bucket = lim->limit;
75     gettimeofday(&now, NULL);
76     lim->last_update = TIMEVAL_TO_MILLIS(now);
77 }
78 
mcplib_ratelim_tbf(lua_State * L)79 int mcplib_ratelim_tbf(lua_State *L) {
80     struct mcp_ratelim_tbf *lim = lua_newuserdatauv(L, sizeof(*lim), 0);
81     memset(lim, 0, sizeof(*lim));
82     luaL_setmetatable(L, "mcp.ratelim_tbf");
83 
84     _setup_tbf(L, lim);
85     return 1;
86 }
87 
mcplib_ratelim_global_tbf(lua_State * L)88 int mcplib_ratelim_global_tbf(lua_State *L) {
89     struct mcp_ratelim_global_tbf *lim = lua_newuserdatauv(L, sizeof(*lim), 0);
90     memset(lim, 0, sizeof(*lim));
91     // TODO: during next refactor, add "globalobj init" phase, which probably
92     // just does this.
93     pthread_mutex_init(&lim->g.lock, NULL);
94     luaL_setmetatable(L, "mcp.ratelim_global_tbf");
95 
96     _setup_tbf(L, &lim->tbf);
97     return 1;
98 }
99 
_update_tbf(struct mcp_ratelim_tbf * lim,int take,uint64_t now)100 static int _update_tbf(struct mcp_ratelim_tbf *lim, int take, uint64_t now) {
101     uint64_t delta = 0;
102     delta = now - lim->last_update;
103 
104     if (delta > lim->tick_rate) {
105         // find how many ticks to add to the bucket.
106         uint32_t toadd = delta / lim->tick_rate;
107         // advance time up to the most recent tick.
108         lim->last_update += toadd * lim->tick_rate;
109         // add tokens to the bucket
110         lim->bucket += toadd * lim->fill_rate;
111         if (lim->bucket > lim->limit) {
112             lim->bucket = lim->limit;
113         }
114     }
115 
116     if (lim->bucket > take) {
117         lim->bucket -= take;
118         return 1;
119     } else {
120         return 0;
121     }
122 }
123 
mcplib_ratelim_tbf_call(lua_State * L)124 int mcplib_ratelim_tbf_call(lua_State *L) {
125     struct mcp_ratelim_tbf *lim = luaL_checkudata(L, 1, "mcp.ratelim_tbf");
126     luaL_checktype(L, 2, LUA_TNUMBER);
127     int take = lua_tointeger(L, 2);
128     struct timeval now;
129     uint64_t now_millis = 0;
130 
131     gettimeofday(&now, NULL);
132     now_millis = TIMEVAL_TO_MILLIS(now);
133     lua_pushboolean(L, _update_tbf(lim, take, now_millis));
134 
135     return 1;
136 }
137 
138 // NOTE: it should be possible to run a TBF using atomics, in the future when
139 // we start to support C11 atomics.
140 // Flip the concept of checking the time, updating, then subtracting the take
141 // to:
142 // - how much "time elapsed" is necessary for the take requested
143 // - atomically load the old time
144 // - if not enough time delta between old time and now, return false
145 // - else atomically swap the update time with the new time
146 //   - compare and update the oldtime to newtime
147 // - not sure how much perf this buys you. would have to test.
mcplib_ratelim_proxy_tbf_call(lua_State * L)148 int mcplib_ratelim_proxy_tbf_call(lua_State *L) {
149     struct mcp_ratelim_global_tbf **lim_p = luaL_checkudata(L, 1, "mcp.ratelim_proxy_tbf");
150     // line was kinda long / hard to read.
151     struct mcp_ratelim_global_tbf *lim = *lim_p;
152     struct timeval now;
153     luaL_checktype(L, 2, LUA_TNUMBER);
154     int take = lua_tointeger(L, 2);
155     gettimeofday(&now, NULL);
156     uint64_t now_millis = 0;
157     now_millis = TIMEVAL_TO_MILLIS(now);
158 
159     pthread_mutex_lock(&lim->g.lock);
160     int res = _update_tbf(&lim->tbf, take, now_millis);
161     pthread_mutex_unlock(&lim->g.lock);
162 
163     lua_pushboolean(L, res);
164     return 1;
165 }
166