1 #ifndef __HIREDIS_LIBUV_H__
2 #define __HIREDIS_LIBUV_H__
3 #include <stdlib.h>
4 #include <uv.h>
5 #include "../hiredis.h"
6 #include "../async.h"
7 #include <string.h>
8 
9 typedef struct redisLibuvEvents {
10   redisAsyncContext* context;
11   uv_poll_t          handle;
12   int                events;
13 } redisLibuvEvents;
14 
15 
redisLibuvPoll(uv_poll_t * handle,int status,int events)16 static void redisLibuvPoll(uv_poll_t* handle, int status, int events) {
17   redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
18 
19   if (status != 0) {
20     return;
21   }
22 
23   if (p->context != NULL && (events & UV_READABLE)) {
24     redisAsyncHandleRead(p->context);
25   }
26   if (p->context != NULL && (events & UV_WRITABLE)) {
27     redisAsyncHandleWrite(p->context);
28   }
29 }
30 
31 
redisLibuvAddRead(void * privdata)32 static void redisLibuvAddRead(void *privdata) {
33   redisLibuvEvents* p = (redisLibuvEvents*)privdata;
34 
35   p->events |= UV_READABLE;
36 
37   uv_poll_start(&p->handle, p->events, redisLibuvPoll);
38 }
39 
40 
redisLibuvDelRead(void * privdata)41 static void redisLibuvDelRead(void *privdata) {
42   redisLibuvEvents* p = (redisLibuvEvents*)privdata;
43 
44   p->events &= ~UV_READABLE;
45 
46   if (p->events) {
47     uv_poll_start(&p->handle, p->events, redisLibuvPoll);
48   } else {
49     uv_poll_stop(&p->handle);
50   }
51 }
52 
53 
redisLibuvAddWrite(void * privdata)54 static void redisLibuvAddWrite(void *privdata) {
55   redisLibuvEvents* p = (redisLibuvEvents*)privdata;
56 
57   p->events |= UV_WRITABLE;
58 
59   uv_poll_start(&p->handle, p->events, redisLibuvPoll);
60 }
61 
62 
redisLibuvDelWrite(void * privdata)63 static void redisLibuvDelWrite(void *privdata) {
64   redisLibuvEvents* p = (redisLibuvEvents*)privdata;
65 
66   p->events &= ~UV_WRITABLE;
67 
68   if (p->events) {
69     uv_poll_start(&p->handle, p->events, redisLibuvPoll);
70   } else {
71     uv_poll_stop(&p->handle);
72   }
73 }
74 
75 
on_close(uv_handle_t * handle)76 static void on_close(uv_handle_t* handle) {
77   redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
78 
79   free(p);
80 }
81 
82 
redisLibuvCleanup(void * privdata)83 static void redisLibuvCleanup(void *privdata) {
84   redisLibuvEvents* p = (redisLibuvEvents*)privdata;
85 
86   p->context = NULL; // indicate that context might no longer exist
87   uv_close((uv_handle_t*)&p->handle, on_close);
88 }
89 
90 
redisLibuvAttach(redisAsyncContext * ac,uv_loop_t * loop)91 static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) {
92   redisContext *c = &(ac->c);
93 
94   if (ac->ev.data != NULL) {
95     return REDIS_ERR;
96   }
97 
98   ac->ev.addRead  = redisLibuvAddRead;
99   ac->ev.delRead  = redisLibuvDelRead;
100   ac->ev.addWrite = redisLibuvAddWrite;
101   ac->ev.delWrite = redisLibuvDelWrite;
102   ac->ev.cleanup  = redisLibuvCleanup;
103 
104   redisLibuvEvents* p = (redisLibuvEvents*)malloc(sizeof(*p));
105 
106   if (!p) {
107     return REDIS_ERR;
108   }
109 
110   memset(p, 0, sizeof(*p));
111 
112   if (uv_poll_init(loop, &p->handle, c->fd) != 0) {
113     return REDIS_ERR;
114   }
115 
116   ac->ev.data    = p;
117   p->handle.data = p;
118   p->context     = ac;
119 
120   return REDIS_OK;
121 }
122 #endif
123