1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 #include "proxy.h"
4
5 // No GC necessary.
6 struct mcp_ratelim_tbf {
7 uint32_t bucket;
8 uint32_t limit;
9 uint32_t fill_rate; // tokens to add per tick rate
10 uint32_t tick_rate; // in milliseconds
11 int64_t last_update; // time in milliseconds
12 };
13
14 struct mcp_ratelim_global_tbf {
15 struct mcp_globalobj_s g;
16 struct mcp_ratelim_tbf tbf;
17 };
18
19 #define TIMEVAL_TO_MILLIS(n) (n.tv_usec / 1000 + n.tv_sec * (uint64_t)1000)
20
21 // global config VM object GC
mcplib_ratelim_global_tbf_gc(lua_State * L)22 int mcplib_ratelim_global_tbf_gc(lua_State *L) {
23 struct mcp_ratelim_global_tbf *lim = luaL_checkudata(L, 1, "mcp.ratelim_global_tbf");
24 assert(lim->g.refcount == 0);
25 mcp_gobj_finalize(&lim->g);
26
27 // no other memory to directly free, just kill the mutex.
28 return 0;
29 }
30
31 // worker thread proxy object GC
mcplib_ratelim_proxy_tbf_gc(lua_State * L)32 int mcplib_ratelim_proxy_tbf_gc(lua_State *L) {
33 struct mcp_ratelim_global_tbf **lim_p = luaL_checkudata(L, 1, "mcp.ratelim_proxy_tbf");
34 struct mcp_ratelim_global_tbf *lim = *lim_p;
35 proxy_ctx_t *ctx = PROXY_GET_THR_CTX(L);
36 mcp_gobj_unref(ctx, &lim->g);
37
38 return 0;
39 }
40
mcp_ratelim_proxy_tbf(lua_State * from,lua_State * to)41 int mcp_ratelim_proxy_tbf(lua_State *from, lua_State *to) {
42 // from, -3 should have the userdata.
43 struct mcp_ratelim_global_tbf *lim = luaL_checkudata(from, -3, "mcp.ratelim_global_tbf");
44 struct mcp_ratelim_global_tbf **lim_p = lua_newuserdatauv(to, sizeof(struct mcp_ratelim_global_tbf *), 0);
45 luaL_setmetatable(to, "mcp.ratelim_proxy_tbf");
46
47 *lim_p = lim;
48 lua_pushvalue(from, -3); // copy ratelim obj to ref below
49 mcp_gobj_ref(from, &lim->g); // pops obj copy
50
51 return 0;
52 }
53
_tbf_check(lua_State * L,char * key)54 static lua_Integer _tbf_check(lua_State *L, char *key) {
55 lua_Integer n = 0;
56 if (lua_getfield(L, 1, key) != LUA_TNIL) {
57 n = lua_tointeger(L, -1);
58 if (n < 0 || n > UINT_MAX-1) {
59 proxy_lua_error(L, "mcp.ratelim_tbf: arguments must be unsigned 32 bit integer");
60 }
61 }
62 lua_pop(L, 1); // pops value or nil.
63 return n;
64 }
65
_setup_tbf(lua_State * L,struct mcp_ratelim_tbf * lim)66 static void _setup_tbf(lua_State *L, struct mcp_ratelim_tbf *lim) {
67 struct timeval now;
68 luaL_checktype(L, 1, LUA_TTABLE);
69 lim->limit = _tbf_check(L, "limit");
70 lim->fill_rate = _tbf_check(L, "fillrate");
71 lim->tick_rate = _tbf_check(L, "tickrate");
72
73 // seed the token bucket filter.
74 lim->bucket = lim->limit;
75 gettimeofday(&now, NULL);
76 lim->last_update = TIMEVAL_TO_MILLIS(now);
77 }
78
mcplib_ratelim_tbf(lua_State * L)79 int mcplib_ratelim_tbf(lua_State *L) {
80 struct mcp_ratelim_tbf *lim = lua_newuserdatauv(L, sizeof(*lim), 0);
81 memset(lim, 0, sizeof(*lim));
82 luaL_setmetatable(L, "mcp.ratelim_tbf");
83
84 _setup_tbf(L, lim);
85 return 1;
86 }
87
mcplib_ratelim_global_tbf(lua_State * L)88 int mcplib_ratelim_global_tbf(lua_State *L) {
89 struct mcp_ratelim_global_tbf *lim = lua_newuserdatauv(L, sizeof(*lim), 0);
90 memset(lim, 0, sizeof(*lim));
91 // TODO: during next refactor, add "globalobj init" phase, which probably
92 // just does this.
93 pthread_mutex_init(&lim->g.lock, NULL);
94 luaL_setmetatable(L, "mcp.ratelim_global_tbf");
95
96 _setup_tbf(L, &lim->tbf);
97 return 1;
98 }
99
_update_tbf(struct mcp_ratelim_tbf * lim,int take,uint64_t now)100 static int _update_tbf(struct mcp_ratelim_tbf *lim, int take, uint64_t now) {
101 uint64_t delta = 0;
102 delta = now - lim->last_update;
103
104 if (delta > lim->tick_rate) {
105 // find how many ticks to add to the bucket.
106 uint32_t toadd = delta / lim->tick_rate;
107 // advance time up to the most recent tick.
108 lim->last_update += toadd * lim->tick_rate;
109 // add tokens to the bucket
110 lim->bucket += toadd * lim->fill_rate;
111 if (lim->bucket > lim->limit) {
112 lim->bucket = lim->limit;
113 }
114 }
115
116 if (lim->bucket > take) {
117 lim->bucket -= take;
118 return 1;
119 } else {
120 return 0;
121 }
122 }
123
mcplib_ratelim_tbf_call(lua_State * L)124 int mcplib_ratelim_tbf_call(lua_State *L) {
125 struct mcp_ratelim_tbf *lim = luaL_checkudata(L, 1, "mcp.ratelim_tbf");
126 luaL_checktype(L, 2, LUA_TNUMBER);
127 int take = lua_tointeger(L, 2);
128 struct timeval now;
129 uint64_t now_millis = 0;
130
131 gettimeofday(&now, NULL);
132 now_millis = TIMEVAL_TO_MILLIS(now);
133 lua_pushboolean(L, _update_tbf(lim, take, now_millis));
134
135 return 1;
136 }
137
138 // NOTE: it should be possible to run a TBF using atomics, in the future when
139 // we start to support C11 atomics.
140 // Flip the concept of checking the time, updating, then subtracting the take
141 // to:
142 // - how much "time elapsed" is necessary for the take requested
143 // - atomically load the old time
144 // - if not enough time delta between old time and now, return false
145 // - else atomically swap the update time with the new time
146 // - compare and update the oldtime to newtime
147 // - not sure how much perf this buys you. would have to test.
mcplib_ratelim_proxy_tbf_call(lua_State * L)148 int mcplib_ratelim_proxy_tbf_call(lua_State *L) {
149 struct mcp_ratelim_global_tbf **lim_p = luaL_checkudata(L, 1, "mcp.ratelim_proxy_tbf");
150 // line was kinda long / hard to read.
151 struct mcp_ratelim_global_tbf *lim = *lim_p;
152 struct timeval now;
153 luaL_checktype(L, 2, LUA_TNUMBER);
154 int take = lua_tointeger(L, 2);
155 gettimeofday(&now, NULL);
156 uint64_t now_millis = 0;
157 now_millis = TIMEVAL_TO_MILLIS(now);
158
159 pthread_mutex_lock(&lim->g.lock);
160 int res = _update_tbf(&lim->tbf, take, now_millis);
161 pthread_mutex_unlock(&lim->g.lock);
162
163 lua_pushboolean(L, res);
164 return 1;
165 }
166