1 #include <string.h> 2 3 #include "memory_mgt.h" 4 #include "debug.h" 5 #include "tcp_send_buffer.h" 6 #include "tcp_sb_queue.h" 7 8 #define MAX(a, b) ((a)>(b)?(a):(b)) 9 #define MIN(a, b) ((a)<(b)?(a):(b)) 10 11 /*----------------------------------------------------------------------------*/ 12 struct sb_manager 13 { 14 size_t chunk_size; 15 uint32_t cur_num; 16 uint32_t cnum; 17 mem_pool_t mp; 18 sb_queue_t freeq; 19 20 } sb_manager; 21 /*----------------------------------------------------------------------------*/ 22 uint32_t 23 SBGetCurnum(sb_manager_t sbm) 24 { 25 return sbm->cur_num; 26 } 27 /*----------------------------------------------------------------------------*/ 28 sb_manager_t 29 SBManagerCreate(size_t chunk_size, uint8_t disable_rings, uint32_t concurrency) 30 { 31 sb_manager_t sbm = (sb_manager_t)calloc(1, sizeof(sb_manager)); 32 if (!sbm) { 33 TRACE_ERROR("SBManagerCreate() failed. %s\n", strerror(errno)); 34 return NULL; 35 } 36 37 sbm->chunk_size = chunk_size; 38 sbm->cnum = concurrency; 39 sbm->mp = (mem_pool_t)MPCreate(chunk_size, 40 (uint64_t)chunk_size * (!disable_rings * concurrency), 41 0); 42 if (!sbm->mp) { 43 TRACE_ERROR("Failed to create mem pool for sb.\n"); 44 free(sbm); 45 return NULL; 46 } 47 48 sbm->freeq = CreateSBQueue(concurrency); 49 if (!sbm->freeq) { 50 TRACE_ERROR("Failed to create free buffer queue.\n"); 51 return NULL; 52 } 53 54 return sbm; 55 } 56 /*----------------------------------------------------------------------------*/ 57 struct tcp_send_buffer * 58 SBInit(sb_manager_t sbm, uint32_t init_seq) 59 { 60 struct tcp_send_buffer *buf; 61 62 /* first try dequeue from free buffer queue */ 63 buf = SBDequeue(sbm->freeq); 64 if (!buf) { 65 buf = (struct tcp_send_buffer *)malloc(sizeof(struct tcp_send_buffer)); 66 if (!buf) { 67 perror("calloc() for buf"); 68 return NULL; 69 } 70 buf->data = MPAllocateChunk(sbm->mp); 71 if (!buf->data) { 72 TRACE_ERROR("Failed to fetch memory chunk for data.\n"); 73 return NULL; 74 } 75 sbm->cur_num++; 76 } 77 78 buf->head = buf->data; 79 80 buf->head_off = buf->tail_off = 0; 81 buf->len = buf->cum_len = 0; 82 buf->size = sbm->chunk_size; 83 84 buf->init_seq = buf->head_seq = init_seq; 85 86 return buf; 87 } 88 /*----------------------------------------------------------------------------*/ 89 void 90 SBFree(sb_manager_t sbm, struct tcp_send_buffer *buf) 91 { 92 if (!buf) 93 return; 94 95 SBEnqueue(sbm->freeq, buf); 96 } 97 /*----------------------------------------------------------------------------*/ 98 size_t 99 SBPut(sb_manager_t sbm, struct tcp_send_buffer *buf, const void *data, size_t len) 100 { 101 size_t to_put; 102 103 if (len <= 0) 104 return 0; 105 106 /* if no space, return -2 */ 107 to_put = MIN(len, buf->size - buf->len); 108 if (to_put <= 0) { 109 return -2; 110 } 111 112 if (buf->tail_off + to_put < buf->size) { 113 /* if the data fit into the buffer, copy it */ 114 memcpy(buf->data + buf->tail_off, data, to_put); 115 buf->tail_off += to_put; 116 } else { 117 /* if buffer overflows, move the existing payload and merge */ 118 memmove(buf->data, buf->head, buf->len); 119 buf->head = buf->data; 120 buf->head_off = 0; 121 memcpy(buf->head + buf->len, data, to_put); 122 buf->tail_off = buf->len + to_put; 123 } 124 buf->len += to_put; 125 buf->cum_len += to_put; 126 127 return to_put; 128 } 129 /*----------------------------------------------------------------------------*/ 130 size_t 131 SBRemove(sb_manager_t sbm, struct tcp_send_buffer *buf, size_t len) 132 { 133 size_t to_remove; 134 135 if (len <= 0) 136 return 0; 137 138 to_remove = MIN(len, buf->len); 139 if (to_remove <= 0) { 140 return -2; 141 } 142 143 buf->head_off += to_remove; 144 buf->head = buf->data + buf->head_off; 145 buf->head_seq += to_remove; 146 buf->len -= to_remove; 147 148 /* if buffer is empty, move the head to 0 */ 149 if (buf->len == 0 && buf->head_off > 0) { 150 buf->head = buf->data; 151 buf->head_off = buf->tail_off = 0; 152 } 153 154 return to_remove; 155 } 156 /*---------------------------------------------------------------------------*/ 157