1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @file mt_notify.cpp
22 * @time 20130924
23 **/
24 #include <fcntl.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29
30 #include "micro_thread.h"
31 #include "mt_session.h"
32 #include "mt_msg.h"
33 #include "mt_notify.h"
34 #include "mt_connection.h"
35 #include "mt_sys_hook.h"
36 #include "ff_hook.h"
37
38 using namespace std;
39 using namespace NS_MICRO_THREAD;
40
InsertWriteWait(SessionProxy * proxy)41 void ISessionNtfy::InsertWriteWait(SessionProxy* proxy)
42 {
43 if (!proxy->_flag) {
44 TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry);
45 proxy->_flag = 1;
46 }
47 }
48
RemoveWriteWait(SessionProxy * proxy)49 void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy)
50 {
51 if (proxy->_flag) {
52 TAILQ_REMOVE(&_write_list, proxy, _write_entry);
53 proxy->_flag = 0;
54 }
55 }
56
NotifyWriteWait()57 void UdpSessionNtfy::NotifyWriteWait()
58 {
59 MtFrame* frame = MtFrame::Instance();
60 SessionProxy* proxy = NULL;
61 MicroThread* thread = NULL;
62 TAILQ_FOREACH(proxy, &_write_list, _write_entry)
63 {
64 proxy->SetRcvEvents(KQ_EVENT_WRITE);
65
66 thread = proxy->GetOwnerThread();
67 if (thread && thread->HasFlag(MicroThread::IO_LIST))
68 {
69 frame->RemoveIoWait(thread);
70 frame->InsertRunable(thread);
71 }
72 }
73 }
74
CreateSocket()75 int UdpSessionNtfy::CreateSocket()
76 {
77 int osfd = socket(AF_INET, SOCK_DGRAM, 0);
78 if (osfd < 0)
79 {
80 MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
81 return -1;
82 }
83
84 int flags = 1;
85 if (ioctl(osfd, FIONBIO, &flags) < 0)
86 {
87 MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
88 close(osfd);
89 osfd = -1;
90 return -2;
91 }
92
93 if (_local_addr.sin_port != 0)
94 {
95 int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr));
96 if (ret < 0)
97 {
98 MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)", inet_ntoa(_local_addr.sin_addr),
99 ntohs(_local_addr.sin_port), errno, strerror(errno));
100 close(osfd);
101 osfd = -1;
102 return -3;
103 }
104 }
105
106 this->SetOsfd(osfd);
107 this->EnableInput();
108 MtFrame* frame = MtFrame::Instance();
109 frame->KqueueNtfyReg(osfd, this);
110 frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ);
111
112 return osfd;
113 }
114
CloseSocket()115 void UdpSessionNtfy::CloseSocket()
116 {
117 int osfd = this->GetOsfd();
118 if (osfd > 0)
119 {
120 MtFrame* frame = MtFrame::Instance();
121 frame->KqueueCtrlDel(osfd, KQ_EVENT_READ);
122 frame->KqueueNtfyReg(osfd, NULL);
123 this->DisableInput();
124 this->SetOsfd(-1);
125 close(osfd);
126 }
127 }
128
InputNotify()129 int UdpSessionNtfy::InputNotify()
130 {
131 while (1)
132 {
133 int ret = 0;
134 int have_rcv_len = 0;
135
136 if (!_msg_buff) {
137 _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize());
138 if (NULL == _msg_buff) {
139 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize());
140 return 0;
141 }
142 _msg_buff->SetBuffType(BUFF_RECV);
143 }
144 char* buff = (char*)_msg_buff->GetMsgBuff();
145
146 int osfd = this->GetOsfd();
147 struct sockaddr_in from;
148 socklen_t fromlen = sizeof(from);
149 mt_hook_syscall(recvfrom);
150 ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(),
151 0, (struct sockaddr*)&from, &fromlen);
152 if (ret < 0)
153 {
154 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
155 {
156 return 0;
157 }
158 else
159 {
160 MTLOG_ERROR("recv error, fd %d", osfd);
161 return 0;
162 }
163 }
164 else if (ret == 0)
165 {
166 MTLOG_DEBUG("remote close connection, fd %d", osfd);
167 return 0;
168 }
169 else
170 {
171 have_rcv_len = ret;
172 _msg_buff->SetHaveRcvLen(have_rcv_len);
173 _msg_buff->SetMsgLen(have_rcv_len);
174 }
175
176 int sessionid = 0;
177 ret = this->GetSessionId(buff, have_rcv_len, sessionid);
178 if (ret <= 0)
179 {
180 MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it",
181 have_rcv_len, osfd);
182 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
183 _msg_buff = NULL;
184 return 0;
185 }
186
187 ISession* session = SessionMgr::Instance()->FindSession(sessionid);
188 if (NULL == session)
189 {
190 MT_ATTR_API(350403, 1);
191 MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid);
192 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
193 _msg_buff = NULL;
194 return 0;
195 }
196
197 IMtConnection* conn = session->GetSessionConn();
198 MicroThread* thread = session->GetOwnerThread();
199 if (!thread || !conn || !conn->GetNtfyObj())
200 {
201 MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong",
202 session, thread, conn);
203 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
204 _msg_buff = NULL;
205 return 0;
206 }
207 MtMsgBuf* msg = conn->GetMtMsgBuff();
208 if (msg) {
209 MsgBuffPool::Instance()->FreeMsgBuf(msg);
210 }
211 conn->SetMtMsgBuff(_msg_buff);
212 _msg_buff = NULL;
213
214 conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ);
215 if (thread->HasFlag(MicroThread::IO_LIST))
216 {
217 MtFrame* frame = MtFrame::Instance();
218 frame->RemoveIoWait(thread);
219 frame->InsertRunable(thread);
220 }
221 }
222
223 return 0;
224 }
225
OutputNotify()226 int UdpSessionNtfy::OutputNotify()
227 {
228 NotifyWriteWait();
229 return 0;
230 }
231
HangupNotify()232 int UdpSessionNtfy::HangupNotify()
233 {
234 MtFrame* frame = MtFrame::Instance();
235 frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
236
237 MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd());
238
239 CloseSocket();
240
241 CreateSocket();
242
243 return 0;
244 }
245
KqueueCtlAdd(void * args)246 int UdpSessionNtfy::KqueueCtlAdd(void* args)
247 {
248 MtFrame* frame = MtFrame::Instance();
249 KqFdRef* fd_ref = (KqFdRef*)args;
250 //ASSERT(fd_ref != NULL);
251
252 int osfd = this->GetOsfd();
253
254 KqueuerObj* old_obj = fd_ref->GetNotifyObj();
255 if ((old_obj != NULL) && (old_obj != this))
256 {
257 MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
258 return -1;
259 }
260
261 if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE))
262 {
263 MTLOG_ERROR("epfd ref add failed, log");
264 return -2;
265 }
266 this->EnableOutput();
267
268 return 0;
269 }
270
KqueueCtlDel(void * args)271 int UdpSessionNtfy::KqueueCtlDel(void* args)
272 {
273 MtFrame* frame = MtFrame::Instance();
274 KqFdRef* fd_ref = (KqFdRef*)args;
275 //ASSERT(fd_ref != NULL);
276
277 int osfd = this->GetOsfd();
278
279 KqueuerObj* old_obj = fd_ref->GetNotifyObj();
280 if (old_obj != this)
281 {
282 MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
283 return -1;
284 }
285
286 if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE))
287 {
288 MTLOG_ERROR("epfd ref del failed, log");
289 return -2;
290 }
291 this->DisableOutput();
292
293 return 0;
294
295 }
296
InputNotify()297 int TcpKeepNtfy::InputNotify()
298 {
299 KeepaliveClose();
300 return -1;
301 }
302
OutputNotify()303 int TcpKeepNtfy::OutputNotify()
304 {
305 KeepaliveClose();
306 return -1;
307 }
308
HangupNotify()309 int TcpKeepNtfy::HangupNotify()
310 {
311 KeepaliveClose();
312 return -1;
313 }
314
KeepaliveClose()315 void TcpKeepNtfy::KeepaliveClose()
316 {
317 if (_keep_conn) {
318 MTLOG_DEBUG("remote close, fd %d, close connection", _fd);
319 ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn);
320 } else {
321 MTLOG_ERROR("_keep_conn ptr null, error");
322 }
323 }
324
325 NtfyObjMgr* NtfyObjMgr::_instance = NULL;
Instance(void)326 NtfyObjMgr* NtfyObjMgr::Instance (void)
327 {
328 if (NULL == _instance)
329 {
330 _instance = new NtfyObjMgr;
331 }
332
333 return _instance;
334 }
335
Destroy()336 void NtfyObjMgr::Destroy()
337 {
338 if( _instance != NULL )
339 {
340 delete _instance;
341 _instance = NULL;
342 }
343 }
344
NtfyObjMgr()345 NtfyObjMgr::NtfyObjMgr()
346 {
347 }
348
~NtfyObjMgr()349 NtfyObjMgr::~NtfyObjMgr()
350 {
351 }
352
RegisterSession(int session_name,ISessionNtfy * session)353 int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session)
354 {
355 if (session_name <= 0 || NULL == session) {
356 MTLOG_ERROR("session %d, register %p failed", session_name, session);
357 return -1;
358 }
359
360 SessionMap::iterator it = _session_map.find(session_name);
361 if (it != _session_map.end())
362 {
363 MTLOG_ERROR("session %d, register %p already", session_name, session);
364 return -2;
365 }
366
367 _session_map.insert(SessionMap::value_type(session_name, session));
368
369 return 0;
370 }
371
GetNameSession(int session_name)372 ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name)
373 {
374 SessionMap::iterator it = _session_map.find(session_name);
375 if (it != _session_map.end())
376 {
377 return it->second;
378 }
379 else
380 {
381 return NULL;
382 }
383 }
384
GetNtfyObj(int type,int session_name)385 KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name)
386 {
387 KqueuerObj* obj = NULL;
388 SessionProxy* proxy = NULL;
389
390 switch (type)
391 {
392 case NTFY_OBJ_THREAD:
393 obj = _fd_ntfy_pool.AllocPtr();
394 break;
395
396 case NTFY_OBJ_SESSION:
397 proxy = _udp_proxy_pool.AllocPtr();
398 obj = proxy;
399 break;
400
401 case NTFY_OBJ_KEEPALIVE: // no need get this now
402 break;
403
404 default:
405 break;
406 }
407
408 if (proxy) {
409 ISessionNtfy* ntfy = this->GetNameSession(session_name);
410 if (!ntfy) {
411 MTLOG_ERROR("ntfy get session name(%d) failed", session_name);
412 this->FreeNtfyObj(proxy);
413 obj = NULL;
414 } else {
415 proxy->SetRealNtfyObj(ntfy);
416 }
417 }
418
419 return obj;
420
421 }
422
FreeNtfyObj(KqueuerObj * obj)423 void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj)
424 {
425 SessionProxy* proxy = NULL;
426 if (!obj) {
427 return;
428 }
429
430 int type = obj->GetNtfyType();
431 obj->Reset();
432
433 switch (type)
434 {
435 case NTFY_OBJ_THREAD:
436 return _fd_ntfy_pool.FreePtr(obj);
437 break;
438
439 case NTFY_OBJ_SESSION:
440 proxy = dynamic_cast<SessionProxy*>(obj);
441 return _udp_proxy_pool.FreePtr(proxy);
442 break;
443
444 case NTFY_OBJ_KEEPALIVE:
445 break;
446
447 default:
448 break;
449 }
450
451 delete obj;
452 return;
453 }
454