at v2.6.34 186 lines 4.9 kB view raw
1#include "ceph_debug.h" 2 3#include <linux/err.h> 4#include <linux/sched.h> 5#include <linux/types.h> 6#include <linux/vmalloc.h> 7 8#include "msgpool.h" 9 10/* 11 * We use msg pools to preallocate memory for messages we expect to 12 * receive over the wire, to avoid getting ourselves into OOM 13 * conditions at unexpected times. We take use a few different 14 * strategies: 15 * 16 * - for request/response type interactions, we preallocate the 17 * memory needed for the response when we generate the request. 18 * 19 * - for messages we can receive at any time from the MDS, we preallocate 20 * a pool of messages we can re-use. 21 * 22 * - for writeback, we preallocate some number of messages to use for 23 * requests and their replies, so that we always make forward 24 * progress. 25 * 26 * The msgpool behaves like a mempool_t, but keeps preallocated 27 * ceph_msgs strung together on a list_head instead of using a pointer 28 * vector. This avoids vector reallocation when we adjust the number 29 * of preallocated items (which happens frequently). 30 */ 31 32 33/* 34 * Allocate or release as necessary to meet our target pool size. 35 */ 36static int __fill_msgpool(struct ceph_msgpool *pool) 37{ 38 struct ceph_msg *msg; 39 40 while (pool->num < pool->min) { 41 dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num, 42 pool->min); 43 spin_unlock(&pool->lock); 44 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL); 45 spin_lock(&pool->lock); 46 if (IS_ERR(msg)) 47 return PTR_ERR(msg); 48 msg->pool = pool; 49 list_add(&msg->list_head, &pool->msgs); 50 pool->num++; 51 } 52 while (pool->num > pool->min) { 53 msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head); 54 dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num, 55 pool->min, msg); 56 list_del_init(&msg->list_head); 57 pool->num--; 58 ceph_msg_kfree(msg); 59 } 60 return 0; 61} 62 63int ceph_msgpool_init(struct ceph_msgpool *pool, 64 int front_len, int min, bool blocking) 65{ 66 int ret; 67 68 dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min); 69 spin_lock_init(&pool->lock); 70 pool->front_len = front_len; 71 INIT_LIST_HEAD(&pool->msgs); 72 pool->num = 0; 73 pool->min = min; 74 pool->blocking = blocking; 75 init_waitqueue_head(&pool->wait); 76 77 spin_lock(&pool->lock); 78 ret = __fill_msgpool(pool); 79 spin_unlock(&pool->lock); 80 return ret; 81} 82 83void ceph_msgpool_destroy(struct ceph_msgpool *pool) 84{ 85 dout("msgpool_destroy %p\n", pool); 86 spin_lock(&pool->lock); 87 pool->min = 0; 88 __fill_msgpool(pool); 89 spin_unlock(&pool->lock); 90} 91 92int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta) 93{ 94 int ret; 95 96 spin_lock(&pool->lock); 97 dout("msgpool_resv %p delta %d\n", pool, delta); 98 pool->min += delta; 99 ret = __fill_msgpool(pool); 100 spin_unlock(&pool->lock); 101 return ret; 102} 103 104struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len) 105{ 106 wait_queue_t wait; 107 struct ceph_msg *msg; 108 109 if (front_len && front_len > pool->front_len) { 110 pr_err("msgpool_get pool %p need front %d, pool size is %d\n", 111 pool, front_len, pool->front_len); 112 WARN_ON(1); 113 114 /* try to alloc a fresh message */ 115 msg = ceph_msg_new(0, front_len, 0, 0, NULL); 116 if (!IS_ERR(msg)) 117 return msg; 118 } 119 120 if (!front_len) 121 front_len = pool->front_len; 122 123 if (pool->blocking) { 124 /* mempool_t behavior; first try to alloc */ 125 msg = ceph_msg_new(0, front_len, 0, 0, NULL); 126 if (!IS_ERR(msg)) 127 return msg; 128 } 129 130 while (1) { 131 spin_lock(&pool->lock); 132 if (likely(pool->num)) { 133 msg = list_entry(pool->msgs.next, struct ceph_msg, 134 list_head); 135 list_del_init(&msg->list_head); 136 pool->num--; 137 dout("msgpool_get %p got %p, now %d/%d\n", pool, msg, 138 pool->num, pool->min); 139 spin_unlock(&pool->lock); 140 return msg; 141 } 142 pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num, 143 pool->min, pool->blocking ? "waiting" : "may fail"); 144 spin_unlock(&pool->lock); 145 146 if (!pool->blocking) { 147 WARN_ON(1); 148 149 /* maybe we can allocate it now? */ 150 msg = ceph_msg_new(0, front_len, 0, 0, NULL); 151 if (!IS_ERR(msg)) 152 return msg; 153 154 pr_err("msgpool_get %p empty + alloc failed\n", pool); 155 return ERR_PTR(-ENOMEM); 156 } 157 158 init_wait(&wait); 159 prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE); 160 schedule(); 161 finish_wait(&pool->wait, &wait); 162 } 163} 164 165void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) 166{ 167 spin_lock(&pool->lock); 168 if (pool->num < pool->min) { 169 /* reset msg front_len; user may have changed it */ 170 msg->front.iov_len = pool->front_len; 171 msg->hdr.front_len = cpu_to_le32(pool->front_len); 172 173 kref_set(&msg->kref, 1); /* retake a single ref */ 174 list_add(&msg->list_head, &pool->msgs); 175 pool->num++; 176 dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg, 177 pool->num, pool->min); 178 spin_unlock(&pool->lock); 179 wake_up(&pool->wait); 180 } else { 181 dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg, 182 pool->num, pool->min); 183 spin_unlock(&pool->lock); 184 ceph_msg_kfree(msg); 185 } 186}