at v2.6.19 184 lines 4.8 kB view raw
1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5** 6** This copyrighted material is made available to anyone wishing to use, 7** modify, copy, or redistribute it subject to the terms and conditions 8** of the GNU General Public License v.2. 9** 10******************************************************************************* 11******************************************************************************/ 12 13#include "dlm_internal.h" 14#include "member.h" 15#include "lock.h" 16#include "dir.h" 17#include "config.h" 18#include "requestqueue.h" 19 20struct rq_entry { 21 struct list_head list; 22 int nodeid; 23 char request[1]; 24}; 25 26/* 27 * Requests received while the lockspace is in recovery get added to the 28 * request queue and processed when recovery is complete. This happens when 29 * the lockspace is suspended on some nodes before it is on others, or the 30 * lockspace is enabled on some while still suspended on others. 31 */ 32 33void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) 34{ 35 struct rq_entry *e; 36 int length = hd->h_length; 37 38 if (dlm_is_removed(ls, nodeid)) 39 return; 40 41 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); 42 if (!e) { 43 log_print("dlm_add_requestqueue: out of memory\n"); 44 return; 45 } 46 47 e->nodeid = nodeid; 48 memcpy(e->request, hd, length); 49 50 mutex_lock(&ls->ls_requestqueue_mutex); 51 list_add_tail(&e->list, &ls->ls_requestqueue); 52 mutex_unlock(&ls->ls_requestqueue_mutex); 53} 54 55int dlm_process_requestqueue(struct dlm_ls *ls) 56{ 57 struct rq_entry *e; 58 struct dlm_header *hd; 59 int error = 0; 60 61 mutex_lock(&ls->ls_requestqueue_mutex); 62 63 for (;;) { 64 if (list_empty(&ls->ls_requestqueue)) { 65 mutex_unlock(&ls->ls_requestqueue_mutex); 66 error = 0; 67 break; 68 } 69 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 70 mutex_unlock(&ls->ls_requestqueue_mutex); 71 72 hd = (struct dlm_header *) e->request; 73 error = dlm_receive_message(hd, e->nodeid, 1); 74 75 if (error == -EINTR) { 76 /* entry is left on requestqueue */ 77 log_debug(ls, "process_requestqueue abort eintr"); 78 break; 79 } 80 81 mutex_lock(&ls->ls_requestqueue_mutex); 82 list_del(&e->list); 83 kfree(e); 84 85 if (dlm_locking_stopped(ls)) { 86 log_debug(ls, "process_requestqueue abort running"); 87 mutex_unlock(&ls->ls_requestqueue_mutex); 88 error = -EINTR; 89 break; 90 } 91 schedule(); 92 } 93 94 return error; 95} 96 97/* 98 * After recovery is done, locking is resumed and dlm_recoverd takes all the 99 * saved requests and processes them as they would have been by dlm_recvd. At 100 * the same time, dlm_recvd will start receiving new requests from remote 101 * nodes. We want to delay dlm_recvd processing new requests until 102 * dlm_recoverd has finished processing the old saved requests. 103 */ 104 105void dlm_wait_requestqueue(struct dlm_ls *ls) 106{ 107 for (;;) { 108 mutex_lock(&ls->ls_requestqueue_mutex); 109 if (list_empty(&ls->ls_requestqueue)) 110 break; 111 if (dlm_locking_stopped(ls)) 112 break; 113 mutex_unlock(&ls->ls_requestqueue_mutex); 114 schedule(); 115 } 116 mutex_unlock(&ls->ls_requestqueue_mutex); 117} 118 119static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) 120{ 121 uint32_t type = ms->m_type; 122 123 if (dlm_is_removed(ls, nodeid)) 124 return 1; 125 126 /* directory operations are always purged because the directory is 127 always rebuilt during recovery and the lookups resent */ 128 129 if (type == DLM_MSG_REMOVE || 130 type == DLM_MSG_LOOKUP || 131 type == DLM_MSG_LOOKUP_REPLY) 132 return 1; 133 134 if (!dlm_no_directory(ls)) 135 return 0; 136 137 /* with no directory, the master is likely to change as a part of 138 recovery; requests to/from the defunct master need to be purged */ 139 140 switch (type) { 141 case DLM_MSG_REQUEST: 142 case DLM_MSG_CONVERT: 143 case DLM_MSG_UNLOCK: 144 case DLM_MSG_CANCEL: 145 /* we're no longer the master of this resource, the sender 146 will resend to the new master (see waiter_needs_recovery) */ 147 148 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid()) 149 return 1; 150 break; 151 152 case DLM_MSG_REQUEST_REPLY: 153 case DLM_MSG_CONVERT_REPLY: 154 case DLM_MSG_UNLOCK_REPLY: 155 case DLM_MSG_CANCEL_REPLY: 156 case DLM_MSG_GRANT: 157 /* this reply is from the former master of the resource, 158 we'll resend to the new master if needed */ 159 160 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid) 161 return 1; 162 break; 163 } 164 165 return 0; 166} 167 168void dlm_purge_requestqueue(struct dlm_ls *ls) 169{ 170 struct dlm_message *ms; 171 struct rq_entry *e, *safe; 172 173 mutex_lock(&ls->ls_requestqueue_mutex); 174 list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { 175 ms = (struct dlm_message *) e->request; 176 177 if (purge_request(ls, ms, e->nodeid)) { 178 list_del(&e->list); 179 kfree(e); 180 } 181 } 182 mutex_unlock(&ls->ls_requestqueue_mutex); 183} 184