Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

drbd: Remove the open-coded page pool

If the network stack keeps a reference for too long, DRBD keeps
references on a higher number of pages as a consequence.

Fix all that by no longer relying on page reference counts dropping to
an expected value. Instead, DRBD gives up its reference and lets the
system handle everything else. While at it, remove the open-coded
custom page pool mechanism and use the page_pool included in the
kernel.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Christoph Böhmwalder <christoph.boehmwalder@linbit.com>
Tested-by: Eric Hagberg <ehagberg@janestreet.com>
Link: https://lore.kernel.org/r/20250605103852.23029-1-christoph.boehmwalder@linbit.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>

authored by

Philipp Reisner and committed by
Jens Axboe
d5dd4098 8f5ae30d

+70 -346
+6 -33
drivers/block/drbd/drbd_int.h
··· 380 380 /* this is/was a write request */ 381 381 __EE_WRITE, 382 382 383 + /* hand back using mempool_free(e, drbd_buffer_page_pool) */ 384 + __EE_RELEASE_TO_MEMPOOL, 385 + 383 386 /* this is/was a write same request */ 384 387 __EE_WRITE_SAME, 385 388 ··· 405 402 #define EE_IN_INTERVAL_TREE (1<<__EE_IN_INTERVAL_TREE) 406 403 #define EE_SUBMITTED (1<<__EE_SUBMITTED) 407 404 #define EE_WRITE (1<<__EE_WRITE) 405 + #define EE_RELEASE_TO_MEMPOOL (1<<__EE_RELEASE_TO_MEMPOOL) 408 406 #define EE_WRITE_SAME (1<<__EE_WRITE_SAME) 409 407 #define EE_APPLICATION (1<<__EE_APPLICATION) 410 408 #define EE_RS_THIN_REQ (1<<__EE_RS_THIN_REQ) ··· 862 858 struct list_head sync_ee; /* IO in progress (P_RS_DATA_REPLY gets written to disk) */ 863 859 struct list_head done_ee; /* need to send P_WRITE_ACK */ 864 860 struct list_head read_ee; /* [RS]P_DATA_REQUEST being read */ 865 - struct list_head net_ee; /* zero-copy network send in progress */ 866 861 867 862 struct list_head resync_reads; 868 863 atomic_t pp_in_use; /* allocated from page pool */ ··· 1332 1329 extern mempool_t drbd_request_mempool; 1333 1330 extern mempool_t drbd_ee_mempool; 1334 1331 1335 - /* drbd's page pool, used to buffer data received from the peer, 1336 - * or data requested by the peer. 1337 - * 1338 - * This does not have an emergency reserve. 1339 - * 1340 - * When allocating from this pool, it first takes pages from the pool. 1341 - * Only if the pool is depleted will try to allocate from the system. 1342 - * 1343 - * The assumption is that pages taken from this pool will be processed, 1344 - * and given back, "quickly", and then can be recycled, so we can avoid 1345 - * frequent calls to alloc_page(), and still will be able to make progress even 1346 - * under memory pressure. 1347 - */ 1348 - extern struct page *drbd_pp_pool; 1349 - extern spinlock_t drbd_pp_lock; 1350 - extern int drbd_pp_vacant; 1351 - extern wait_queue_head_t drbd_pp_wait; 1352 - 1353 1332 /* We also need a standard (emergency-reserve backed) page pool 1354 1333 * for meta data IO (activity log, bitmap). 1355 1334 * We can keep it global, as long as it is used as "N pages at a time". ··· 1339 1354 */ 1340 1355 #define DRBD_MIN_POOL_PAGES 128 1341 1356 extern mempool_t drbd_md_io_page_pool; 1357 + extern mempool_t drbd_buffer_page_pool; 1342 1358 1343 1359 /* We also need to make sure we get a bio 1344 1360 * when we need it for housekeeping purposes */ ··· 1474 1488 sector_t, unsigned int, 1475 1489 unsigned int, 1476 1490 gfp_t) __must_hold(local); 1477 - extern void __drbd_free_peer_req(struct drbd_device *, struct drbd_peer_request *, 1478 - int); 1479 - #define drbd_free_peer_req(m,e) __drbd_free_peer_req(m, e, 0) 1480 - #define drbd_free_net_peer_req(m,e) __drbd_free_peer_req(m, e, 1) 1491 + extern void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *req); 1481 1492 extern struct page *drbd_alloc_pages(struct drbd_peer_device *, unsigned int, bool); 1482 1493 extern void _drbd_clear_done_ee(struct drbd_device *device, struct list_head *to_be_freed); 1483 1494 extern int drbd_connected(struct drbd_peer_device *); ··· 1592 1609 #define page_chain_for_each_safe(page, n) \ 1593 1610 for (; page && ({ n = page_chain_next(page); 1; }); page = n) 1594 1611 1595 - 1596 - static inline int drbd_peer_req_has_active_page(struct drbd_peer_request *peer_req) 1597 - { 1598 - struct page *page = peer_req->pages; 1599 - page_chain_for_each(page) { 1600 - if (page_count(page) > 1) 1601 - return 1; 1602 - } 1603 - return 0; 1604 - } 1605 1612 1606 1613 static inline union drbd_state drbd_read_state(struct drbd_device *device) 1607 1614 {
+15 -44
drivers/block/drbd/drbd_main.c
··· 114 114 mempool_t drbd_request_mempool; 115 115 mempool_t drbd_ee_mempool; 116 116 mempool_t drbd_md_io_page_pool; 117 + mempool_t drbd_buffer_page_pool; 117 118 struct bio_set drbd_md_io_bio_set; 118 119 struct bio_set drbd_io_bio_set; 119 - 120 - /* I do not use a standard mempool, because: 121 - 1) I want to hand out the pre-allocated objects first. 122 - 2) I want to be able to interrupt sleeping allocation with a signal. 123 - Note: This is a single linked list, the next pointer is the private 124 - member of struct page. 125 - */ 126 - struct page *drbd_pp_pool; 127 - DEFINE_SPINLOCK(drbd_pp_lock); 128 - int drbd_pp_vacant; 129 - wait_queue_head_t drbd_pp_wait; 130 120 131 121 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5); 132 122 ··· 1601 1611 static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device, 1602 1612 struct drbd_peer_request *peer_req) 1603 1613 { 1614 + bool use_sendpage = !(peer_req->flags & EE_RELEASE_TO_MEMPOOL); 1604 1615 struct page *page = peer_req->pages; 1605 1616 unsigned len = peer_req->i.size; 1606 1617 int err; ··· 1610 1619 page_chain_for_each(page) { 1611 1620 unsigned l = min_t(unsigned, len, PAGE_SIZE); 1612 1621 1613 - err = _drbd_send_page(peer_device, page, 0, l, 1614 - page_chain_next(page) ? MSG_MORE : 0); 1622 + if (likely(use_sendpage)) 1623 + err = _drbd_send_page(peer_device, page, 0, l, 1624 + page_chain_next(page) ? MSG_MORE : 0); 1625 + else 1626 + err = _drbd_no_send_page(peer_device, page, 0, l, 1627 + page_chain_next(page) ? MSG_MORE : 0); 1628 + 1615 1629 if (err) 1616 1630 return err; 1617 1631 len -= l; ··· 1958 1962 INIT_LIST_HEAD(&device->sync_ee); 1959 1963 INIT_LIST_HEAD(&device->done_ee); 1960 1964 INIT_LIST_HEAD(&device->read_ee); 1961 - INIT_LIST_HEAD(&device->net_ee); 1962 1965 INIT_LIST_HEAD(&device->resync_reads); 1963 1966 INIT_LIST_HEAD(&device->resync_work.list); 1964 1967 INIT_LIST_HEAD(&device->unplug_work.list); ··· 2038 2043 D_ASSERT(device, list_empty(&device->sync_ee)); 2039 2044 D_ASSERT(device, list_empty(&device->done_ee)); 2040 2045 D_ASSERT(device, list_empty(&device->read_ee)); 2041 - D_ASSERT(device, list_empty(&device->net_ee)); 2042 2046 D_ASSERT(device, list_empty(&device->resync_reads)); 2043 2047 D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q)); 2044 2048 D_ASSERT(device, list_empty(&device->resync_work.list)); ··· 2049 2055 2050 2056 static void drbd_destroy_mempools(void) 2051 2057 { 2052 - struct page *page; 2053 - 2054 - while (drbd_pp_pool) { 2055 - page = drbd_pp_pool; 2056 - drbd_pp_pool = (struct page *)page_private(page); 2057 - __free_page(page); 2058 - drbd_pp_vacant--; 2059 - } 2060 - 2061 2058 /* D_ASSERT(device, atomic_read(&drbd_pp_vacant)==0); */ 2062 2059 2063 2060 bioset_exit(&drbd_io_bio_set); 2064 2061 bioset_exit(&drbd_md_io_bio_set); 2062 + mempool_exit(&drbd_buffer_page_pool); 2065 2063 mempool_exit(&drbd_md_io_page_pool); 2066 2064 mempool_exit(&drbd_ee_mempool); 2067 2065 mempool_exit(&drbd_request_mempool); ··· 2072 2086 2073 2087 static int drbd_create_mempools(void) 2074 2088 { 2075 - struct page *page; 2076 2089 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count; 2077 - int i, ret; 2090 + int ret; 2078 2091 2079 2092 /* caches */ 2080 2093 drbd_request_cache = kmem_cache_create( ··· 2110 2125 if (ret) 2111 2126 goto Enomem; 2112 2127 2128 + ret = mempool_init_page_pool(&drbd_buffer_page_pool, number, 0); 2129 + if (ret) 2130 + goto Enomem; 2131 + 2113 2132 ret = mempool_init_slab_pool(&drbd_request_mempool, number, 2114 2133 drbd_request_cache); 2115 2134 if (ret) ··· 2122 2133 ret = mempool_init_slab_pool(&drbd_ee_mempool, number, drbd_ee_cache); 2123 2134 if (ret) 2124 2135 goto Enomem; 2125 - 2126 - for (i = 0; i < number; i++) { 2127 - page = alloc_page(GFP_HIGHUSER); 2128 - if (!page) 2129 - goto Enomem; 2130 - set_page_private(page, (unsigned long)drbd_pp_pool); 2131 - drbd_pp_pool = page; 2132 - } 2133 - drbd_pp_vacant = number; 2134 2136 2135 2137 return 0; 2136 2138 ··· 2149 2169 rr = drbd_free_peer_reqs(device, &device->done_ee); 2150 2170 if (rr) 2151 2171 drbd_err(device, "%d EEs in done list found!\n", rr); 2152 - 2153 - rr = drbd_free_peer_reqs(device, &device->net_ee); 2154 - if (rr) 2155 - drbd_err(device, "%d EEs in net list found!\n", rr); 2156 2172 } 2157 2173 2158 2174 /* caution. no locking. */ ··· 2838 2862 DRBD_MAJOR); 2839 2863 return err; 2840 2864 } 2841 - 2842 - /* 2843 - * allocate all necessary structs 2844 - */ 2845 - init_waitqueue_head(&drbd_pp_wait); 2846 2865 2847 2866 drbd_proc = NULL; /* play safe for drbd_cleanup */ 2848 2867 idr_init(&drbd_devices);
+31 -231
drivers/block/drbd/drbd_receiver.c
··· 33 33 #include <linux/string.h> 34 34 #include <linux/scatterlist.h> 35 35 #include <linux/part_stat.h> 36 + #include <linux/mempool.h> 36 37 #include "drbd_int.h" 37 38 #include "drbd_protocol.h" 38 39 #include "drbd_req.h" ··· 64 63 65 64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 66 65 67 - /* 68 - * some helper functions to deal with single linked page lists, 69 - * page->private being our "next" pointer. 70 - */ 71 - 72 - /* If at least n pages are linked at head, get n pages off. 73 - * Otherwise, don't modify head, and return NULL. 74 - * Locking is the responsibility of the caller. 75 - */ 76 - static struct page *page_chain_del(struct page **head, int n) 77 - { 78 - struct page *page; 79 - struct page *tmp; 80 - 81 - BUG_ON(!n); 82 - BUG_ON(!head); 83 - 84 - page = *head; 85 - 86 - if (!page) 87 - return NULL; 88 - 89 - while (page) { 90 - tmp = page_chain_next(page); 91 - if (--n == 0) 92 - break; /* found sufficient pages */ 93 - if (tmp == NULL) 94 - /* insufficient pages, don't use any of them. */ 95 - return NULL; 96 - page = tmp; 97 - } 98 - 99 - /* add end of list marker for the returned list */ 100 - set_page_private(page, 0); 101 - /* actual return value, and adjustment of head */ 102 - page = *head; 103 - *head = tmp; 104 - return page; 105 - } 106 - 107 - /* may be used outside of locks to find the tail of a (usually short) 108 - * "private" page chain, before adding it back to a global chain head 109 - * with page_chain_add() under a spinlock. */ 110 - static struct page *page_chain_tail(struct page *page, int *len) 111 - { 112 - struct page *tmp; 113 - int i = 1; 114 - while ((tmp = page_chain_next(page))) { 115 - ++i; 116 - page = tmp; 117 - } 118 - if (len) 119 - *len = i; 120 - return page; 121 - } 122 - 123 - static int page_chain_free(struct page *page) 124 - { 125 - struct page *tmp; 126 - int i = 0; 127 - page_chain_for_each_safe(page, tmp) { 128 - put_page(page); 129 - ++i; 130 - } 131 - return i; 132 - } 133 - 134 - static void page_chain_add(struct page **head, 135 - struct page *chain_first, struct page *chain_last) 136 - { 137 - #if 1 138 - struct page *tmp; 139 - tmp = page_chain_tail(chain_first, NULL); 140 - BUG_ON(tmp != chain_last); 141 - #endif 142 - 143 - /* add chain to head */ 144 - set_page_private(chain_last, (unsigned long)*head); 145 - *head = chain_first; 146 - } 147 - 148 - static struct page *__drbd_alloc_pages(struct drbd_device *device, 149 - unsigned int number) 66 + static struct page *__drbd_alloc_pages(unsigned int number) 150 67 { 151 68 struct page *page = NULL; 152 69 struct page *tmp = NULL; 153 70 unsigned int i = 0; 154 71 155 - /* Yes, testing drbd_pp_vacant outside the lock is racy. 156 - * So what. It saves a spin_lock. */ 157 - if (drbd_pp_vacant >= number) { 158 - spin_lock(&drbd_pp_lock); 159 - page = page_chain_del(&drbd_pp_pool, number); 160 - if (page) 161 - drbd_pp_vacant -= number; 162 - spin_unlock(&drbd_pp_lock); 163 - if (page) 164 - return page; 165 - } 166 - 167 72 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 168 73 * "criss-cross" setup, that might cause write-out on some other DRBD, 169 74 * which in turn might block on the other node at this very place. */ 170 75 for (i = 0; i < number; i++) { 171 - tmp = alloc_page(GFP_TRY); 76 + tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY); 172 77 if (!tmp) 173 - break; 78 + goto fail; 174 79 set_page_private(tmp, (unsigned long)page); 175 80 page = tmp; 176 81 } 177 - 178 - if (i == number) 179 - return page; 180 - 181 - /* Not enough pages immediately available this time. 182 - * No need to jump around here, drbd_alloc_pages will retry this 183 - * function "soon". */ 184 - if (page) { 185 - tmp = page_chain_tail(page, NULL); 186 - spin_lock(&drbd_pp_lock); 187 - page_chain_add(&drbd_pp_pool, page, tmp); 188 - drbd_pp_vacant += i; 189 - spin_unlock(&drbd_pp_lock); 82 + return page; 83 + fail: 84 + page_chain_for_each_safe(page, tmp) { 85 + set_page_private(page, 0); 86 + mempool_free(page, &drbd_buffer_page_pool); 190 87 } 191 88 return NULL; 192 - } 193 - 194 - static void reclaim_finished_net_peer_reqs(struct drbd_device *device, 195 - struct list_head *to_be_freed) 196 - { 197 - struct drbd_peer_request *peer_req, *tmp; 198 - 199 - /* The EEs are always appended to the end of the list. Since 200 - they are sent in order over the wire, they have to finish 201 - in order. As soon as we see the first not finished we can 202 - stop to examine the list... */ 203 - 204 - list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { 205 - if (drbd_peer_req_has_active_page(peer_req)) 206 - break; 207 - list_move(&peer_req->w.list, to_be_freed); 208 - } 209 - } 210 - 211 - static void drbd_reclaim_net_peer_reqs(struct drbd_device *device) 212 - { 213 - LIST_HEAD(reclaimed); 214 - struct drbd_peer_request *peer_req, *t; 215 - 216 - spin_lock_irq(&device->resource->req_lock); 217 - reclaim_finished_net_peer_reqs(device, &reclaimed); 218 - spin_unlock_irq(&device->resource->req_lock); 219 - list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 220 - drbd_free_net_peer_req(device, peer_req); 221 - } 222 - 223 - static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection) 224 - { 225 - struct drbd_peer_device *peer_device; 226 - int vnr; 227 - 228 - rcu_read_lock(); 229 - idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 230 - struct drbd_device *device = peer_device->device; 231 - if (!atomic_read(&device->pp_in_use_by_net)) 232 - continue; 233 - 234 - kref_get(&device->kref); 235 - rcu_read_unlock(); 236 - drbd_reclaim_net_peer_reqs(device); 237 - kref_put(&device->kref, drbd_destroy_device); 238 - rcu_read_lock(); 239 - } 240 - rcu_read_unlock(); 241 89 } 242 90 243 91 /** ··· 113 263 bool retry) 114 264 { 115 265 struct drbd_device *device = peer_device->device; 116 - struct page *page = NULL; 266 + struct page *page; 117 267 struct net_conf *nc; 118 - DEFINE_WAIT(wait); 119 268 unsigned int mxb; 120 269 121 270 rcu_read_lock(); ··· 122 273 mxb = nc ? nc->max_buffers : 1000000; 123 274 rcu_read_unlock(); 124 275 125 - if (atomic_read(&device->pp_in_use) < mxb) 126 - page = __drbd_alloc_pages(device, number); 127 - 128 - /* Try to keep the fast path fast, but occasionally we need 129 - * to reclaim the pages we lended to the network stack. */ 130 - if (page && atomic_read(&device->pp_in_use_by_net) > 512) 131 - drbd_reclaim_net_peer_reqs(device); 132 - 133 - while (page == NULL) { 134 - prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 135 - 136 - drbd_reclaim_net_peer_reqs(device); 137 - 138 - if (atomic_read(&device->pp_in_use) < mxb) { 139 - page = __drbd_alloc_pages(device, number); 140 - if (page) 141 - break; 142 - } 143 - 144 - if (!retry) 145 - break; 146 - 147 - if (signal_pending(current)) { 148 - drbd_warn(device, "drbd_alloc_pages interrupted!\n"); 149 - break; 150 - } 151 - 152 - if (schedule_timeout(HZ/10) == 0) 153 - mxb = UINT_MAX; 154 - } 155 - finish_wait(&drbd_pp_wait, &wait); 276 + if (atomic_read(&device->pp_in_use) >= mxb) 277 + schedule_timeout_interruptible(HZ / 10); 278 + page = __drbd_alloc_pages(number); 156 279 157 280 if (page) 158 281 atomic_add(number, &device->pp_in_use); ··· 135 314 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 136 315 * Either links the page chain back to the global pool, 137 316 * or returns all pages to the system. */ 138 - static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) 317 + static void drbd_free_pages(struct drbd_device *device, struct page *page) 139 318 { 140 - atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; 141 - int i; 319 + struct page *tmp; 320 + int i = 0; 142 321 143 322 if (page == NULL) 144 323 return; 145 324 146 - if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count) 147 - i = page_chain_free(page); 148 - else { 149 - struct page *tmp; 150 - tmp = page_chain_tail(page, &i); 151 - spin_lock(&drbd_pp_lock); 152 - page_chain_add(&drbd_pp_pool, page, tmp); 153 - drbd_pp_vacant += i; 154 - spin_unlock(&drbd_pp_lock); 325 + page_chain_for_each_safe(page, tmp) { 326 + set_page_private(page, 0); 327 + if (page_count(page) == 1) 328 + mempool_free(page, &drbd_buffer_page_pool); 329 + else 330 + put_page(page); 331 + i++; 155 332 } 156 - i = atomic_sub_return(i, a); 333 + i = atomic_sub_return(i, &device->pp_in_use); 157 334 if (i < 0) 158 - drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", 159 - is_net ? "pp_in_use_by_net" : "pp_in_use", i); 160 - wake_up(&drbd_pp_wait); 335 + drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); 161 336 } 162 337 163 338 /* ··· 197 380 gfpflags_allow_blocking(gfp_mask)); 198 381 if (!page) 199 382 goto fail; 383 + if (!mempool_is_saturated(&drbd_buffer_page_pool)) 384 + peer_req->flags |= EE_RELEASE_TO_MEMPOOL; 200 385 } 201 386 202 387 memset(peer_req, 0, sizeof(*peer_req)); ··· 222 403 return NULL; 223 404 } 224 405 225 - void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, 226 - int is_net) 406 + void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req) 227 407 { 228 408 might_sleep(); 229 409 if (peer_req->flags & EE_HAS_DIGEST) 230 410 kfree(peer_req->digest); 231 - drbd_free_pages(device, peer_req->pages, is_net); 411 + drbd_free_pages(device, peer_req->pages); 232 412 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 233 413 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 234 414 if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { ··· 242 424 LIST_HEAD(work_list); 243 425 struct drbd_peer_request *peer_req, *t; 244 426 int count = 0; 245 - int is_net = list == &device->net_ee; 246 427 247 428 spin_lock_irq(&device->resource->req_lock); 248 429 list_splice_init(list, &work_list); 249 430 spin_unlock_irq(&device->resource->req_lock); 250 431 251 432 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 252 - __drbd_free_peer_req(device, peer_req, is_net); 433 + drbd_free_peer_req(device, peer_req); 253 434 count++; 254 435 } 255 436 return count; ··· 260 443 static int drbd_finish_peer_reqs(struct drbd_device *device) 261 444 { 262 445 LIST_HEAD(work_list); 263 - LIST_HEAD(reclaimed); 264 446 struct drbd_peer_request *peer_req, *t; 265 447 int err = 0; 266 448 267 449 spin_lock_irq(&device->resource->req_lock); 268 - reclaim_finished_net_peer_reqs(device, &reclaimed); 269 450 list_splice_init(&device->done_ee, &work_list); 270 451 spin_unlock_irq(&device->resource->req_lock); 271 - 272 - list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 273 - drbd_free_net_peer_req(device, peer_req); 274 452 275 453 /* possible callbacks here: 276 454 * e_end_block, and e_end_resync_block, e_send_superseded. ··· 1787 1975 data_size -= len; 1788 1976 } 1789 1977 kunmap(page); 1790 - drbd_free_pages(peer_device->device, page, 0); 1978 + drbd_free_pages(peer_device->device, page); 1791 1979 return err; 1792 1980 } 1793 1981 ··· 5036 5224 put_ldev(device); 5037 5225 } 5038 5226 5039 - /* tcp_close and release of sendpage pages can be deferred. I don't 5040 - * want to use SO_LINGER, because apparently it can be deferred for 5041 - * more than 20 seconds (longest time I checked). 5042 - * 5043 - * Actually we don't care for exactly when the network stack does its 5044 - * put_page(), but release our reference on these pages right here. 5045 - */ 5046 - i = drbd_free_peer_reqs(device, &device->net_ee); 5047 - if (i) 5048 - drbd_info(device, "net_ee not empty, killed %u entries\n", i); 5049 5227 i = atomic_read(&device->pp_in_use_by_net); 5050 5228 if (i) 5051 5229 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); ··· 5781 5979 5782 5980 while (get_t_state(thi) == RUNNING) { 5783 5981 drbd_thread_current_set_cpu(thi); 5784 - 5785 - conn_reclaim_net_peer_reqs(connection); 5786 5982 5787 5983 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5788 5984 if (drbd_send_ping(connection)) {
+18 -38
drivers/block/drbd/drbd_worker.c
··· 1030 1030 return 1; 1031 1031 } 1032 1032 1033 - /* helper */ 1034 - static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 1035 - { 1036 - if (drbd_peer_req_has_active_page(peer_req)) { 1037 - /* This might happen if sendpage() has not finished */ 1038 - int i = PFN_UP(peer_req->i.size); 1039 - atomic_add(i, &device->pp_in_use_by_net); 1040 - atomic_sub(i, &device->pp_in_use); 1041 - spin_lock_irq(&device->resource->req_lock); 1042 - list_add_tail(&peer_req->w.list, &device->net_ee); 1043 - spin_unlock_irq(&device->resource->req_lock); 1044 - wake_up(&drbd_pp_wait); 1045 - } else 1046 - drbd_free_peer_req(device, peer_req); 1047 - } 1048 - 1049 1033 /** 1050 1034 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1051 1035 * @w: work object. ··· 1043 1059 int err; 1044 1060 1045 1061 if (unlikely(cancel)) { 1046 - drbd_free_peer_req(device, peer_req); 1047 - dec_unacked(device); 1048 - return 0; 1062 + err = 0; 1063 + goto out; 1049 1064 } 1050 1065 1051 1066 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { ··· 1057 1074 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1058 1075 } 1059 1076 1060 - dec_unacked(device); 1061 - 1062 - move_to_net_ee_or_free(device, peer_req); 1063 - 1064 1077 if (unlikely(err)) 1065 1078 drbd_err(device, "drbd_send_block() failed\n"); 1079 + out: 1080 + dec_unacked(device); 1081 + drbd_free_peer_req(device, peer_req); 1082 + 1066 1083 return err; 1067 1084 } 1068 1085 ··· 1103 1120 int err; 1104 1121 1105 1122 if (unlikely(cancel)) { 1106 - drbd_free_peer_req(device, peer_req); 1107 - dec_unacked(device); 1108 - return 0; 1123 + err = 0; 1124 + goto out; 1109 1125 } 1110 1126 1111 1127 if (get_ldev_if_state(device, D_FAILED)) { ··· 1137 1155 /* update resync data with failure */ 1138 1156 drbd_rs_failed_io(peer_device, peer_req->i.sector, peer_req->i.size); 1139 1157 } 1140 - 1141 - dec_unacked(device); 1142 - 1143 - move_to_net_ee_or_free(device, peer_req); 1144 - 1145 1158 if (unlikely(err)) 1146 1159 drbd_err(device, "drbd_send_block() failed\n"); 1160 + out: 1161 + dec_unacked(device); 1162 + drbd_free_peer_req(device, peer_req); 1163 + 1147 1164 return err; 1148 1165 } 1149 1166 ··· 1157 1176 int err, eq = 0; 1158 1177 1159 1178 if (unlikely(cancel)) { 1160 - drbd_free_peer_req(device, peer_req); 1161 - dec_unacked(device); 1162 - return 0; 1179 + err = 0; 1180 + goto out; 1163 1181 } 1164 1182 1165 1183 if (get_ldev(device)) { ··· 1200 1220 if (drbd_ratelimit()) 1201 1221 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1202 1222 } 1203 - 1204 - dec_unacked(device); 1205 - move_to_net_ee_or_free(device, peer_req); 1206 - 1207 1223 if (unlikely(err)) 1208 1224 drbd_err(device, "drbd_send_block/ack() failed\n"); 1225 + out: 1226 + dec_unacked(device); 1227 + drbd_free_peer_req(device, peer_req); 1228 + 1209 1229 return err; 1210 1230 } 1211 1231