Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

libceph: add new iov_iter-based ceph_msg_data_type and ceph_osd_data_type

Add an iov_iter to the unions in ceph_msg_data and ceph_msg_data_cursor.
Instead of requiring a list of pages or bvecs, we can just use an
iov_iter directly, and avoid extra allocations.

We assume that the pages represented by the iter are pinned such that
they shouldn't incur page faults, which is the case for the iov_iters
created by netfs.

While working on this, Al Viro informed me that he was going to change
iov_iter_get_pages to auto-advance the iterator as that pattern is more
or less required for ITER_PIPE anyway. We emulate that here for now by
advancing in the _next op and tracking that amount in the "lastlen"
field.

In the event that _next is called twice without an intervening
_advance, we revert the iov_iter by the remaining lastlen before
calling iov_iter_get_pages.

Cc: Al Viro <viro@zeniv.linux.org.uk>
Cc: David Howells <dhowells@redhat.com>
Signed-off-by: Jeff Layton <jlayton@kernel.org>
Reviewed-by: Xiubo Li <xiubli@redhat.com>
Reviewed-and-tested-by: Luís Henriques <lhenriques@suse.de>
Reviewed-by: Milind Changire <mchangir@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>

authored by

Jeff Layton and committed by
Ilya Dryomov
dee0c5f8 4c793d4c

+116
+8
include/linux/ceph/messenger.h
··· 123 123 CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */ 124 124 #endif /* CONFIG_BLOCK */ 125 125 CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ 126 + CEPH_MSG_DATA_ITER, /* data source/destination is an iov_iter */ 126 127 }; 127 128 128 129 #ifdef CONFIG_BLOCK ··· 225 224 bool own_pages; 226 225 }; 227 226 struct ceph_pagelist *pagelist; 227 + struct iov_iter iter; 228 228 }; 229 229 }; 230 230 ··· 249 247 struct { /* pagelist */ 250 248 struct page *page; /* page from list */ 251 249 size_t offset; /* bytes from list */ 250 + }; 251 + struct { 252 + struct iov_iter iov_iter; 253 + unsigned int lastlen; 252 254 }; 253 255 }; 254 256 }; ··· 611 605 #endif /* CONFIG_BLOCK */ 612 606 void ceph_msg_data_add_bvecs(struct ceph_msg *msg, 613 607 struct ceph_bvec_iter *bvec_pos); 608 + void ceph_msg_data_add_iter(struct ceph_msg *msg, 609 + struct iov_iter *iter); 614 610 615 611 struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, 616 612 gfp_t flags, bool can_fail);
+4
include/linux/ceph/osd_client.h
··· 108 108 CEPH_OSD_DATA_TYPE_BIO, 109 109 #endif /* CONFIG_BLOCK */ 110 110 CEPH_OSD_DATA_TYPE_BVECS, 111 + CEPH_OSD_DATA_TYPE_ITER, 111 112 }; 112 113 113 114 struct ceph_osd_data { ··· 132 131 struct ceph_bvec_iter bvec_pos; 133 132 u32 num_bvecs; 134 133 }; 134 + struct iov_iter iter; 135 135 }; 136 136 }; 137 137 ··· 503 501 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, 504 502 unsigned int which, 505 503 struct ceph_bvec_iter *bvec_pos); 504 + void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req, 505 + unsigned int which, struct iov_iter *iter); 506 506 507 507 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *, 508 508 unsigned int which,
+77
net/ceph/messenger.c
··· 969 969 return true; 970 970 } 971 971 972 + static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor, 973 + size_t length) 974 + { 975 + struct ceph_msg_data *data = cursor->data; 976 + 977 + cursor->iov_iter = data->iter; 978 + cursor->lastlen = 0; 979 + iov_iter_truncate(&cursor->iov_iter, length); 980 + cursor->resid = iov_iter_count(&cursor->iov_iter); 981 + } 982 + 983 + static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor, 984 + size_t *page_offset, size_t *length) 985 + { 986 + struct page *page; 987 + ssize_t len; 988 + 989 + if (cursor->lastlen) 990 + iov_iter_revert(&cursor->iov_iter, cursor->lastlen); 991 + 992 + len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE, 993 + 1, page_offset); 994 + BUG_ON(len < 0); 995 + 996 + cursor->lastlen = len; 997 + 998 + /* 999 + * FIXME: The assumption is that the pages represented by the iov_iter 1000 + * are pinned, with the references held by the upper-level 1001 + * callers, or by virtue of being under writeback. Eventually, 1002 + * we'll get an iov_iter_get_pages2 variant that doesn't take 1003 + * page refs. Until then, just put the page ref. 1004 + */ 1005 + VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page); 1006 + put_page(page); 1007 + 1008 + *length = min_t(size_t, len, cursor->resid); 1009 + return page; 1010 + } 1011 + 1012 + static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor, 1013 + size_t bytes) 1014 + { 1015 + BUG_ON(bytes > cursor->resid); 1016 + cursor->resid -= bytes; 1017 + 1018 + if (bytes < cursor->lastlen) { 1019 + cursor->lastlen -= bytes; 1020 + } else { 1021 + iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen); 1022 + cursor->lastlen = 0; 1023 + } 1024 + 1025 + return cursor->resid; 1026 + } 1027 + 972 1028 /* 973 1029 * Message data is handled (sent or received) in pieces, where each 974 1030 * piece resides on a single page. The network layer might not ··· 1051 995 #endif /* CONFIG_BLOCK */ 1052 996 case CEPH_MSG_DATA_BVECS: 1053 997 ceph_msg_data_bvecs_cursor_init(cursor, length); 998 + break; 999 + case CEPH_MSG_DATA_ITER: 1000 + ceph_msg_data_iter_cursor_init(cursor, length); 1054 1001 break; 1055 1002 case CEPH_MSG_DATA_NONE: 1056 1003 default: ··· 1102 1043 case CEPH_MSG_DATA_BVECS: 1103 1044 page = ceph_msg_data_bvecs_next(cursor, page_offset, length); 1104 1045 break; 1046 + case CEPH_MSG_DATA_ITER: 1047 + page = ceph_msg_data_iter_next(cursor, page_offset, length); 1048 + break; 1105 1049 case CEPH_MSG_DATA_NONE: 1106 1050 default: 1107 1051 page = NULL; ··· 1142 1080 #endif /* CONFIG_BLOCK */ 1143 1081 case CEPH_MSG_DATA_BVECS: 1144 1082 new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); 1083 + break; 1084 + case CEPH_MSG_DATA_ITER: 1085 + new_piece = ceph_msg_data_iter_advance(cursor, bytes); 1145 1086 break; 1146 1087 case CEPH_MSG_DATA_NONE: 1147 1088 default: ··· 1944 1879 msg->data_length += bvec_pos->iter.bi_size; 1945 1880 } 1946 1881 EXPORT_SYMBOL(ceph_msg_data_add_bvecs); 1882 + 1883 + void ceph_msg_data_add_iter(struct ceph_msg *msg, 1884 + struct iov_iter *iter) 1885 + { 1886 + struct ceph_msg_data *data; 1887 + 1888 + data = ceph_msg_data_add(msg); 1889 + data->type = CEPH_MSG_DATA_ITER; 1890 + data->iter = *iter; 1891 + 1892 + msg->data_length += iov_iter_count(&data->iter); 1893 + } 1947 1894 1948 1895 /* 1949 1896 * construct a new message with given type, size
+27
net/ceph/osd_client.c
··· 171 171 osd_data->num_bvecs = num_bvecs; 172 172 } 173 173 174 + static void ceph_osd_iter_init(struct ceph_osd_data *osd_data, 175 + struct iov_iter *iter) 176 + { 177 + osd_data->type = CEPH_OSD_DATA_TYPE_ITER; 178 + osd_data->iter = *iter; 179 + } 180 + 174 181 static struct ceph_osd_data * 175 182 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 176 183 { ··· 271 264 } 272 265 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); 273 266 267 + /** 268 + * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer 269 + * @osd_req: The request to set up 270 + * @which: Index of the operation in which to set the iter 271 + * @iter: The buffer iterator 272 + */ 273 + void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req, 274 + unsigned int which, struct iov_iter *iter) 275 + { 276 + struct ceph_osd_data *osd_data; 277 + 278 + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 279 + ceph_osd_iter_init(osd_data, iter); 280 + } 281 + EXPORT_SYMBOL(osd_req_op_extent_osd_iter); 282 + 274 283 static void osd_req_op_cls_request_info_pagelist( 275 284 struct ceph_osd_request *osd_req, 276 285 unsigned int which, struct ceph_pagelist *pagelist) ··· 369 346 #endif /* CONFIG_BLOCK */ 370 347 case CEPH_OSD_DATA_TYPE_BVECS: 371 348 return osd_data->bvec_pos.iter.bi_size; 349 + case CEPH_OSD_DATA_TYPE_ITER: 350 + return iov_iter_count(&osd_data->iter); 372 351 default: 373 352 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 374 353 return 0; ··· 979 954 #endif 980 955 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { 981 956 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); 957 + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) { 958 + ceph_msg_data_add_iter(msg, &osd_data->iter); 982 959 } else { 983 960 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 984 961 }