1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "member.h" 15 #include "lock.h" 16 #include "dir.h" 17 #include "config.h" 18 #include "requestqueue.h" 19 20 struct rq_entry { 21 struct list_head list; 22 int nodeid; 23 char request[1]; 24 }; 25 26 /* 27 * Requests received while the lockspace is in recovery get added to the 28 * request queue and processed when recovery is complete. This happens when 29 * the lockspace is suspended on some nodes before it is on others, or the 30 * lockspace is enabled on some while still suspended on others. 31 */ 32 33 int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) 34 { 35 struct rq_entry *e; 36 int length = hd->h_length; 37 int rv = 0; 38 39 if (dlm_is_removed(ls, nodeid)) 40 return 0; 41 42 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); 43 if (!e) { 44 log_print("dlm_add_requestqueue: out of memory\n"); 45 return 0; 46 } 47 48 e->nodeid = nodeid; 49 memcpy(e->request, hd, length); 50 51 /* We need to check dlm_locking_stopped() after taking the mutex to 52 avoid a race where dlm_recoverd enables locking and runs 53 process_requestqueue between our earlier dlm_locking_stopped check 54 and this addition to the requestqueue. */ 55 56 mutex_lock(&ls->ls_requestqueue_mutex); 57 if (dlm_locking_stopped(ls)) 58 list_add_tail(&e->list, &ls->ls_requestqueue); 59 else { 60 log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid); 61 kfree(e); 62 rv = -EAGAIN; 63 } 64 mutex_unlock(&ls->ls_requestqueue_mutex); 65 return rv; 66 } 67 68 int dlm_process_requestqueue(struct dlm_ls *ls) 69 { 70 struct rq_entry *e; 71 struct dlm_header *hd; 72 int error = 0; 73 74 mutex_lock(&ls->ls_requestqueue_mutex); 75 76 for (;;) { 77 if (list_empty(&ls->ls_requestqueue)) { 78 mutex_unlock(&ls->ls_requestqueue_mutex); 79 error = 0; 80 break; 81 } 82 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 83 mutex_unlock(&ls->ls_requestqueue_mutex); 84 85 hd = (struct dlm_header *) e->request; 86 error = dlm_receive_message(hd, e->nodeid, 1); 87 88 if (error == -EINTR) { 89 /* entry is left on requestqueue */ 90 log_debug(ls, "process_requestqueue abort eintr"); 91 break; 92 } 93 94 mutex_lock(&ls->ls_requestqueue_mutex); 95 list_del(&e->list); 96 kfree(e); 97 98 if (dlm_locking_stopped(ls)) { 99 log_debug(ls, "process_requestqueue abort running"); 100 mutex_unlock(&ls->ls_requestqueue_mutex); 101 error = -EINTR; 102 break; 103 } 104 schedule(); 105 } 106 107 return error; 108 } 109 110 /* 111 * After recovery is done, locking is resumed and dlm_recoverd takes all the 112 * saved requests and processes them as they would have been by dlm_recvd. At 113 * the same time, dlm_recvd will start receiving new requests from remote 114 * nodes. We want to delay dlm_recvd processing new requests until 115 * dlm_recoverd has finished processing the old saved requests. 116 */ 117 118 void dlm_wait_requestqueue(struct dlm_ls *ls) 119 { 120 for (;;) { 121 mutex_lock(&ls->ls_requestqueue_mutex); 122 if (list_empty(&ls->ls_requestqueue)) 123 break; 124 if (dlm_locking_stopped(ls)) 125 break; 126 mutex_unlock(&ls->ls_requestqueue_mutex); 127 schedule(); 128 } 129 mutex_unlock(&ls->ls_requestqueue_mutex); 130 } 131 132 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) 133 { 134 uint32_t type = ms->m_type; 135 136 if (dlm_is_removed(ls, nodeid)) 137 return 1; 138 139 /* directory operations are always purged because the directory is 140 always rebuilt during recovery and the lookups resent */ 141 142 if (type == DLM_MSG_REMOVE || 143 type == DLM_MSG_LOOKUP || 144 type == DLM_MSG_LOOKUP_REPLY) 145 return 1; 146 147 if (!dlm_no_directory(ls)) 148 return 0; 149 150 /* with no directory, the master is likely to change as a part of 151 recovery; requests to/from the defunct master need to be purged */ 152 153 switch (type) { 154 case DLM_MSG_REQUEST: 155 case DLM_MSG_CONVERT: 156 case DLM_MSG_UNLOCK: 157 case DLM_MSG_CANCEL: 158 /* we're no longer the master of this resource, the sender 159 will resend to the new master (see waiter_needs_recovery) */ 160 161 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid()) 162 return 1; 163 break; 164 165 case DLM_MSG_REQUEST_REPLY: 166 case DLM_MSG_CONVERT_REPLY: 167 case DLM_MSG_UNLOCK_REPLY: 168 case DLM_MSG_CANCEL_REPLY: 169 case DLM_MSG_GRANT: 170 /* this reply is from the former master of the resource, 171 we'll resend to the new master if needed */ 172 173 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid) 174 return 1; 175 break; 176 } 177 178 return 0; 179 } 180 181 void dlm_purge_requestqueue(struct dlm_ls *ls) 182 { 183 struct dlm_message *ms; 184 struct rq_entry *e, *safe; 185 186 mutex_lock(&ls->ls_requestqueue_mutex); 187 list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { 188 ms = (struct dlm_message *) e->request; 189 190 if (purge_request(ls, ms, e->nodeid)) { 191 list_del(&e->list); 192 kfree(e); 193 } 194 } 195 mutex_unlock(&ls->ls_requestqueue_mutex); 196 } 197 198