Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph update from Sage Weil:
"There are a few different groups of commits here. The largest is
Alex's ongoing work to enable the coming RBD features (cloning,
striping). There is some cleanup in libceph that goes along with it.

Cyril and David have fixed some problems with NFS reexport (leaking
dentries and page locks), and there is a batch of patches from Yan
fixing problems with the fs client when running against a clustered
MDS. There are a few bug fixes mixed in for good measure, many of
which will be going to the stable trees once they're upstream.

My apologies for the late pull. There is still a gremlin in the rbd
map/unmap code and I was hoping to include the fix for that as well,
but we haven't been able to confirm the fix is correct yet; I'll send
that in a separate pull once it's nailed down."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (68 commits)
rbd: get rid of rbd_{get,put}_dev()
libceph: register request before unregister linger
libceph: don't use rb_init_node() in ceph_osdc_alloc_request()
libceph: init event->node in ceph_osdc_create_event()
libceph: init osd->o_node in create_osd()
libceph: report connection fault with warning
libceph: socket can close in any connection state
rbd: don't use ENOTSUPP
rbd: remove linger unconditionally
rbd: get rid of RBD_MAX_SEG_NAME_LEN
libceph: avoid using freed osd in __kick_osd_requests()
ceph: don't reference req after put
rbd: do not allow remove of mounted-on image
libceph: Unlock unprocessed pages in start_read() error path
ceph: call handle_cap_grant() for cap import message
ceph: Fix __ceph_do_pending_vmtruncate
ceph: Don't add dirty inode to dirty list if caps is in migration
ceph: Fix infinite loop in __wake_requests
ceph: Don't update i_max_size when handling non-auth cap
bdi_register: add __printf verification, fix arg mismatch
...

+1199 -613
+4
Documentation/ABI/testing/sysfs-bus-rbd
··· 70 70 71 71 A directory per each snapshot 72 72 73 + parent 74 + 75 + Information identifying the pool, image, and snapshot id for 76 + the parent image in a layered rbd image (format 2 only). 73 77 74 78 Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name> 75 79 -------------------------------------------------------------
+969 -432
drivers/block/rbd.c
··· 61 61 62 62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 63 63 64 - #define RBD_MAX_SNAP_NAME_LEN 32 64 + #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 65 + #define RBD_MAX_SNAP_NAME_LEN \ 66 + (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 67 + 65 68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 66 69 #define RBD_MAX_OPT_LEN 1024 67 70 68 71 #define RBD_SNAP_HEAD_NAME "-" 69 72 73 + /* This allows a single page to hold an image name sent by OSD */ 74 + #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 70 75 #define RBD_IMAGE_ID_LEN_MAX 64 76 + 71 77 #define RBD_OBJ_PREFIX_LEN_MAX 64 78 + 79 + /* Feature bits */ 80 + 81 + #define RBD_FEATURE_LAYERING 1 82 + 83 + /* Features supported by this (client software) implementation. */ 84 + 85 + #define RBD_FEATURES_ALL (0) 72 86 73 87 /* 74 88 * An RBD device name will be "rbd#", where the "rbd" comes from ··· 113 99 u64 *snap_sizes; 114 100 115 101 u64 obj_version; 102 + }; 103 + 104 + /* 105 + * An rbd image specification. 106 + * 107 + * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 108 + * identify an image. 109 + */ 110 + struct rbd_spec { 111 + u64 pool_id; 112 + char *pool_name; 113 + 114 + char *image_id; 115 + size_t image_id_len; 116 + char *image_name; 117 + size_t image_name_len; 118 + 119 + u64 snap_id; 120 + char *snap_name; 121 + 122 + struct kref kref; 116 123 }; 117 124 118 125 struct rbd_options { ··· 190 155 }; 191 156 192 157 struct rbd_mapping { 193 - char *snap_name; 194 - u64 snap_id; 195 158 u64 size; 196 159 u64 features; 197 - bool snap_exists; 198 160 bool read_only; 199 161 }; 200 162 ··· 205 173 struct gendisk *disk; /* blkdev's gendisk and rq */ 206 174 207 175 u32 image_format; /* Either 1 or 2 */ 208 - struct rbd_options rbd_opts; 209 176 struct rbd_client *rbd_client; 210 177 211 178 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ ··· 212 181 spinlock_t lock; /* queue lock */ 213 182 214 183 struct rbd_image_header header; 215 - char *image_id; 216 - size_t image_id_len; 217 - char *image_name; 218 - size_t image_name_len; 184 + bool exists; 185 + struct rbd_spec *spec; 186 + 219 187 char *header_name; 220 - char *pool_name; 221 - int pool_id; 222 188 223 189 struct ceph_osd_event *watch_event; 224 190 struct ceph_osd_request *watch_request; 191 + 192 + struct rbd_spec *parent_spec; 193 + u64 parent_overlap; 225 194 226 195 /* protects updating the header */ 227 196 struct rw_semaphore header_rwsem; ··· 235 204 236 205 /* sysfs related */ 237 206 struct device dev; 207 + unsigned long open_count; 238 208 }; 239 209 240 210 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ ··· 250 218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 251 219 252 220 static void rbd_dev_release(struct device *dev); 253 - static void __rbd_remove_snap_dev(struct rbd_snap *snap); 221 + static void rbd_remove_snap_dev(struct rbd_snap *snap); 254 222 255 223 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 256 224 size_t count); ··· 290 258 # define rbd_assert(expr) ((void) 0) 291 259 #endif /* !RBD_DEBUG */ 292 260 293 - static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 294 - { 295 - return get_device(&rbd_dev->dev); 296 - } 297 - 298 - static void rbd_put_dev(struct rbd_device *rbd_dev) 299 - { 300 - put_device(&rbd_dev->dev); 301 - } 302 - 303 - static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver); 261 + static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); 262 + static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); 304 263 305 264 static int rbd_open(struct block_device *bdev, fmode_t mode) 306 265 { ··· 300 277 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 301 278 return -EROFS; 302 279 303 - rbd_get_dev(rbd_dev); 280 + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 281 + (void) get_device(&rbd_dev->dev); 304 282 set_device_ro(bdev, rbd_dev->mapping.read_only); 283 + rbd_dev->open_count++; 284 + mutex_unlock(&ctl_mutex); 305 285 306 286 return 0; 307 287 } ··· 313 287 { 314 288 struct rbd_device *rbd_dev = disk->private_data; 315 289 316 - rbd_put_dev(rbd_dev); 290 + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 291 + rbd_assert(rbd_dev->open_count > 0); 292 + rbd_dev->open_count--; 293 + put_device(&rbd_dev->dev); 294 + mutex_unlock(&ctl_mutex); 317 295 318 296 return 0; 319 297 } ··· 418 388 static match_table_t rbd_opts_tokens = { 419 389 /* int args above */ 420 390 /* string args above */ 421 - {Opt_read_only, "mapping.read_only"}, 391 + {Opt_read_only, "read_only"}, 422 392 {Opt_read_only, "ro"}, /* Alternate spelling */ 423 393 {Opt_read_write, "read_write"}, 424 394 {Opt_read_write, "rw"}, /* Alternate spelling */ ··· 471 441 * Get a ceph client with specific addr and configuration, if one does 472 442 * not exist create it. 473 443 */ 474 - static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 475 - size_t mon_addr_len, char *options) 444 + static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 476 445 { 477 - struct rbd_options *rbd_opts = &rbd_dev->rbd_opts; 478 - struct ceph_options *ceph_opts; 479 446 struct rbd_client *rbdc; 480 447 481 - rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 482 - 483 - ceph_opts = ceph_parse_options(options, mon_addr, 484 - mon_addr + mon_addr_len, 485 - parse_rbd_opts_token, rbd_opts); 486 - if (IS_ERR(ceph_opts)) 487 - return PTR_ERR(ceph_opts); 488 - 489 448 rbdc = rbd_client_find(ceph_opts); 490 - if (rbdc) { 491 - /* using an existing client */ 449 + if (rbdc) /* using an existing client */ 492 450 ceph_destroy_options(ceph_opts); 493 - } else { 451 + else 494 452 rbdc = rbd_client_create(ceph_opts); 495 - if (IS_ERR(rbdc)) 496 - return PTR_ERR(rbdc); 497 - } 498 - rbd_dev->rbd_client = rbdc; 499 453 500 - return 0; 454 + return rbdc; 501 455 } 502 456 503 457 /* ··· 506 492 * Drop reference to ceph client node. If it's not referenced anymore, release 507 493 * it. 508 494 */ 509 - static void rbd_put_client(struct rbd_device *rbd_dev) 495 + static void rbd_put_client(struct rbd_client *rbdc) 510 496 { 511 - kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 512 - rbd_dev->rbd_client = NULL; 497 + if (rbdc) 498 + kref_put(&rbdc->kref, rbd_client_release); 513 499 } 514 500 515 501 /* ··· 536 522 537 523 /* The header has to start with the magic rbd header text */ 538 524 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 525 + return false; 526 + 527 + /* The bio layer requires at least sector-sized I/O */ 528 + 529 + if (ondisk->options.order < SECTOR_SHIFT) 530 + return false; 531 + 532 + /* If we use u64 in a few spots we may be able to loosen this */ 533 + 534 + if (ondisk->options.order > 8 * sizeof (int) - 1) 539 535 return false; 540 536 541 537 /* ··· 659 635 return -ENOMEM; 660 636 } 661 637 638 + static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 639 + { 640 + struct rbd_snap *snap; 641 + 642 + if (snap_id == CEPH_NOSNAP) 643 + return RBD_SNAP_HEAD_NAME; 644 + 645 + list_for_each_entry(snap, &rbd_dev->snaps, node) 646 + if (snap_id == snap->id) 647 + return snap->name; 648 + 649 + return NULL; 650 + } 651 + 662 652 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 663 653 { 664 654 ··· 680 642 681 643 list_for_each_entry(snap, &rbd_dev->snaps, node) { 682 644 if (!strcmp(snap_name, snap->name)) { 683 - rbd_dev->mapping.snap_id = snap->id; 645 + rbd_dev->spec->snap_id = snap->id; 684 646 rbd_dev->mapping.size = snap->size; 685 647 rbd_dev->mapping.features = snap->features; 686 648 ··· 691 653 return -ENOENT; 692 654 } 693 655 694 - static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) 656 + static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) 695 657 { 696 658 int ret; 697 659 698 - if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME, 660 + if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, 699 661 sizeof (RBD_SNAP_HEAD_NAME))) { 700 - rbd_dev->mapping.snap_id = CEPH_NOSNAP; 662 + rbd_dev->spec->snap_id = CEPH_NOSNAP; 701 663 rbd_dev->mapping.size = rbd_dev->header.image_size; 702 664 rbd_dev->mapping.features = rbd_dev->header.features; 703 - rbd_dev->mapping.snap_exists = false; 704 - rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only; 705 665 ret = 0; 706 666 } else { 707 - ret = snap_by_name(rbd_dev, snap_name); 667 + ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 708 668 if (ret < 0) 709 669 goto done; 710 - rbd_dev->mapping.snap_exists = true; 711 670 rbd_dev->mapping.read_only = true; 712 671 } 713 - rbd_dev->mapping.snap_name = snap_name; 672 + rbd_dev->exists = true; 714 673 done: 715 674 return ret; 716 675 } ··· 730 695 u64 segment; 731 696 int ret; 732 697 733 - name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 698 + name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); 734 699 if (!name) 735 700 return NULL; 736 701 segment = offset >> rbd_dev->header.obj_order; 737 - ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx", 702 + ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", 738 703 rbd_dev->header.object_prefix, segment); 739 - if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) { 704 + if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 740 705 pr_err("error formatting segment name for #%llu (%d)\n", 741 706 segment, ret); 742 707 kfree(name); ··· 835 800 } 836 801 837 802 /* 838 - * bio_chain_clone - clone a chain of bios up to a certain length. 839 - * might return a bio_pair that will need to be released. 803 + * Clone a portion of a bio, starting at the given byte offset 804 + * and continuing for the number of bytes indicated. 840 805 */ 841 - static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 842 - struct bio_pair **bp, 843 - int len, gfp_t gfpmask) 806 + static struct bio *bio_clone_range(struct bio *bio_src, 807 + unsigned int offset, 808 + unsigned int len, 809 + gfp_t gfpmask) 844 810 { 845 - struct bio *old_chain = *old; 846 - struct bio *new_chain = NULL; 847 - struct bio *tail; 848 - int total = 0; 811 + struct bio_vec *bv; 812 + unsigned int resid; 813 + unsigned short idx; 814 + unsigned int voff; 815 + unsigned short end_idx; 816 + unsigned short vcnt; 817 + struct bio *bio; 849 818 850 - if (*bp) { 851 - bio_pair_release(*bp); 852 - *bp = NULL; 819 + /* Handle the easy case for the caller */ 820 + 821 + if (!offset && len == bio_src->bi_size) 822 + return bio_clone(bio_src, gfpmask); 823 + 824 + if (WARN_ON_ONCE(!len)) 825 + return NULL; 826 + if (WARN_ON_ONCE(len > bio_src->bi_size)) 827 + return NULL; 828 + if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) 829 + return NULL; 830 + 831 + /* Find first affected segment... */ 832 + 833 + resid = offset; 834 + __bio_for_each_segment(bv, bio_src, idx, 0) { 835 + if (resid < bv->bv_len) 836 + break; 837 + resid -= bv->bv_len; 838 + } 839 + voff = resid; 840 + 841 + /* ...and the last affected segment */ 842 + 843 + resid += len; 844 + __bio_for_each_segment(bv, bio_src, end_idx, idx) { 845 + if (resid <= bv->bv_len) 846 + break; 847 + resid -= bv->bv_len; 848 + } 849 + vcnt = end_idx - idx + 1; 850 + 851 + /* Build the clone */ 852 + 853 + bio = bio_alloc(gfpmask, (unsigned int) vcnt); 854 + if (!bio) 855 + return NULL; /* ENOMEM */ 856 + 857 + bio->bi_bdev = bio_src->bi_bdev; 858 + bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); 859 + bio->bi_rw = bio_src->bi_rw; 860 + bio->bi_flags |= 1 << BIO_CLONED; 861 + 862 + /* 863 + * Copy over our part of the bio_vec, then update the first 864 + * and last (or only) entries. 865 + */ 866 + memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], 867 + vcnt * sizeof (struct bio_vec)); 868 + bio->bi_io_vec[0].bv_offset += voff; 869 + if (vcnt > 1) { 870 + bio->bi_io_vec[0].bv_len -= voff; 871 + bio->bi_io_vec[vcnt - 1].bv_len = resid; 872 + } else { 873 + bio->bi_io_vec[0].bv_len = len; 853 874 } 854 875 855 - while (old_chain && (total < len)) { 856 - struct bio *tmp; 876 + bio->bi_vcnt = vcnt; 877 + bio->bi_size = len; 878 + bio->bi_idx = 0; 857 879 858 - tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 859 - if (!tmp) 860 - goto err_out; 861 - gfpmask &= ~__GFP_WAIT; /* can't wait after the first */ 880 + return bio; 881 + } 862 882 863 - if (total + old_chain->bi_size > len) { 864 - struct bio_pair *bp; 883 + /* 884 + * Clone a portion of a bio chain, starting at the given byte offset 885 + * into the first bio in the source chain and continuing for the 886 + * number of bytes indicated. The result is another bio chain of 887 + * exactly the given length, or a null pointer on error. 888 + * 889 + * The bio_src and offset parameters are both in-out. On entry they 890 + * refer to the first source bio and the offset into that bio where 891 + * the start of data to be cloned is located. 892 + * 893 + * On return, bio_src is updated to refer to the bio in the source 894 + * chain that contains first un-cloned byte, and *offset will 895 + * contain the offset of that byte within that bio. 896 + */ 897 + static struct bio *bio_chain_clone_range(struct bio **bio_src, 898 + unsigned int *offset, 899 + unsigned int len, 900 + gfp_t gfpmask) 901 + { 902 + struct bio *bi = *bio_src; 903 + unsigned int off = *offset; 904 + struct bio *chain = NULL; 905 + struct bio **end; 865 906 866 - /* 867 - * this split can only happen with a single paged bio, 868 - * split_bio will BUG_ON if this is not the case 869 - */ 870 - dout("bio_chain_clone split! total=%d remaining=%d" 871 - "bi_size=%u\n", 872 - total, len - total, old_chain->bi_size); 907 + /* Build up a chain of clone bios up to the limit */ 873 908 874 - /* split the bio. We'll release it either in the next 875 - call, or it will have to be released outside */ 876 - bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); 877 - if (!bp) 878 - goto err_out; 909 + if (!bi || off >= bi->bi_size || !len) 910 + return NULL; /* Nothing to clone */ 879 911 880 - __bio_clone(tmp, &bp->bio1); 912 + end = &chain; 913 + while (len) { 914 + unsigned int bi_size; 915 + struct bio *bio; 881 916 882 - *next = &bp->bio2; 883 - } else { 884 - __bio_clone(tmp, old_chain); 885 - *next = old_chain->bi_next; 917 + if (!bi) 918 + goto out_err; /* EINVAL; ran out of bio's */ 919 + bi_size = min_t(unsigned int, bi->bi_size - off, len); 920 + bio = bio_clone_range(bi, off, bi_size, gfpmask); 921 + if (!bio) 922 + goto out_err; /* ENOMEM */ 923 + 924 + *end = bio; 925 + end = &bio->bi_next; 926 + 927 + off += bi_size; 928 + if (off == bi->bi_size) { 929 + bi = bi->bi_next; 930 + off = 0; 886 931 } 887 - 888 - tmp->bi_bdev = NULL; 889 - tmp->bi_next = NULL; 890 - if (new_chain) 891 - tail->bi_next = tmp; 892 - else 893 - new_chain = tmp; 894 - tail = tmp; 895 - old_chain = old_chain->bi_next; 896 - 897 - total += tmp->bi_size; 932 + len -= bi_size; 898 933 } 934 + *bio_src = bi; 935 + *offset = off; 899 936 900 - rbd_assert(total == len); 937 + return chain; 938 + out_err: 939 + bio_chain_put(chain); 901 940 902 - *old = old_chain; 903 - 904 - return new_chain; 905 - 906 - err_out: 907 - dout("bio_chain_clone with err\n"); 908 - bio_chain_put(new_chain); 909 941 return NULL; 910 942 } 911 943 ··· 1090 988 req_data->coll_index = coll_index; 1091 989 } 1092 990 1093 - dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, 1094 - (unsigned long long) ofs, (unsigned long long) len); 991 + dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", 992 + object_name, (unsigned long long) ofs, 993 + (unsigned long long) len, coll, coll_index); 1095 994 1096 995 osdc = &rbd_dev->rbd_client->client->osdc; 1097 996 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, ··· 1122 1019 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1123 1020 layout->fl_stripe_count = cpu_to_le32(1); 1124 1021 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1125 - layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id); 1022 + layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id); 1126 1023 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 1127 1024 req, ops); 1128 1025 rbd_assert(ret == 0); ··· 1257 1154 static int rbd_do_op(struct request *rq, 1258 1155 struct rbd_device *rbd_dev, 1259 1156 struct ceph_snap_context *snapc, 1260 - u64 snapid, 1261 - int opcode, int flags, 1262 1157 u64 ofs, u64 len, 1263 1158 struct bio *bio, 1264 1159 struct rbd_req_coll *coll, ··· 1268 1167 int ret; 1269 1168 struct ceph_osd_req_op *ops; 1270 1169 u32 payload_len; 1170 + int opcode; 1171 + int flags; 1172 + u64 snapid; 1271 1173 1272 1174 seg_name = rbd_segment_name(rbd_dev, ofs); 1273 1175 if (!seg_name) ··· 1278 1174 seg_len = rbd_segment_length(rbd_dev, ofs, len); 1279 1175 seg_ofs = rbd_segment_offset(rbd_dev, ofs); 1280 1176 1281 - payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 1177 + if (rq_data_dir(rq) == WRITE) { 1178 + opcode = CEPH_OSD_OP_WRITE; 1179 + flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; 1180 + snapid = CEPH_NOSNAP; 1181 + payload_len = seg_len; 1182 + } else { 1183 + opcode = CEPH_OSD_OP_READ; 1184 + flags = CEPH_OSD_FLAG_READ; 1185 + snapc = NULL; 1186 + snapid = rbd_dev->spec->snap_id; 1187 + payload_len = 0; 1188 + } 1282 1189 1283 1190 ret = -ENOMEM; 1284 1191 ops = rbd_create_rw_ops(1, opcode, payload_len); ··· 1314 1199 done: 1315 1200 kfree(seg_name); 1316 1201 return ret; 1317 - } 1318 - 1319 - /* 1320 - * Request async osd write 1321 - */ 1322 - static int rbd_req_write(struct request *rq, 1323 - struct rbd_device *rbd_dev, 1324 - struct ceph_snap_context *snapc, 1325 - u64 ofs, u64 len, 1326 - struct bio *bio, 1327 - struct rbd_req_coll *coll, 1328 - int coll_index) 1329 - { 1330 - return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1331 - CEPH_OSD_OP_WRITE, 1332 - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1333 - ofs, len, bio, coll, coll_index); 1334 - } 1335 - 1336 - /* 1337 - * Request async osd read 1338 - */ 1339 - static int rbd_req_read(struct request *rq, 1340 - struct rbd_device *rbd_dev, 1341 - u64 snapid, 1342 - u64 ofs, u64 len, 1343 - struct bio *bio, 1344 - struct rbd_req_coll *coll, 1345 - int coll_index) 1346 - { 1347 - return rbd_do_op(rq, rbd_dev, NULL, 1348 - snapid, 1349 - CEPH_OSD_OP_READ, 1350 - CEPH_OSD_FLAG_READ, 1351 - ofs, len, bio, coll, coll_index); 1352 1202 } 1353 1203 1354 1204 /* ··· 1384 1304 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", 1385 1305 rbd_dev->header_name, (unsigned long long) notify_id, 1386 1306 (unsigned int) opcode); 1387 - rc = rbd_refresh_header(rbd_dev, &hver); 1307 + rc = rbd_dev_refresh(rbd_dev, &hver); 1388 1308 if (rc) 1389 1309 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1390 1310 " update snaps: %d\n", rbd_dev->major, rc); ··· 1540 1460 { 1541 1461 struct rbd_device *rbd_dev = q->queuedata; 1542 1462 struct request *rq; 1543 - struct bio_pair *bp = NULL; 1544 1463 1545 1464 while ((rq = blk_fetch_request(q))) { 1546 1465 struct bio *bio; 1547 - struct bio *rq_bio, *next_bio = NULL; 1548 1466 bool do_write; 1549 1467 unsigned int size; 1550 - u64 op_size = 0; 1551 1468 u64 ofs; 1552 1469 int num_segs, cur_seg = 0; 1553 1470 struct rbd_req_coll *coll; 1554 1471 struct ceph_snap_context *snapc; 1472 + unsigned int bio_offset; 1555 1473 1556 1474 dout("fetched request\n"); 1557 1475 ··· 1561 1483 1562 1484 /* deduce our operation (read, write) */ 1563 1485 do_write = (rq_data_dir(rq) == WRITE); 1564 - 1565 - size = blk_rq_bytes(rq); 1566 - ofs = blk_rq_pos(rq) * SECTOR_SIZE; 1567 - rq_bio = rq->bio; 1568 1486 if (do_write && rbd_dev->mapping.read_only) { 1569 1487 __blk_end_request_all(rq, -EROFS); 1570 1488 continue; ··· 1570 1496 1571 1497 down_read(&rbd_dev->header_rwsem); 1572 1498 1573 - if (rbd_dev->mapping.snap_id != CEPH_NOSNAP && 1574 - !rbd_dev->mapping.snap_exists) { 1499 + if (!rbd_dev->exists) { 1500 + rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 1575 1501 up_read(&rbd_dev->header_rwsem); 1576 1502 dout("request for non-existent snapshot"); 1577 1503 spin_lock_irq(q->queue_lock); ··· 1582 1508 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1583 1509 1584 1510 up_read(&rbd_dev->header_rwsem); 1511 + 1512 + size = blk_rq_bytes(rq); 1513 + ofs = blk_rq_pos(rq) * SECTOR_SIZE; 1514 + bio = rq->bio; 1585 1515 1586 1516 dout("%s 0x%x bytes at 0x%llx\n", 1587 1517 do_write ? "write" : "read", ··· 1606 1528 continue; 1607 1529 } 1608 1530 1531 + bio_offset = 0; 1609 1532 do { 1610 - /* a bio clone to be passed down to OSD req */ 1533 + u64 limit = rbd_segment_length(rbd_dev, ofs, size); 1534 + unsigned int chain_size; 1535 + struct bio *bio_chain; 1536 + 1537 + BUG_ON(limit > (u64) UINT_MAX); 1538 + chain_size = (unsigned int) limit; 1611 1539 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); 1612 - op_size = rbd_segment_length(rbd_dev, ofs, size); 1540 + 1613 1541 kref_get(&coll->kref); 1614 - bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1615 - op_size, GFP_ATOMIC); 1616 - if (!bio) { 1617 - rbd_coll_end_req_index(rq, coll, cur_seg, 1618 - -ENOMEM, op_size); 1619 - goto next_seg; 1620 - } 1621 1542 1543 + /* Pass a cloned bio chain via an osd request */ 1622 1544 1623 - /* init OSD command: write or read */ 1624 - if (do_write) 1625 - rbd_req_write(rq, rbd_dev, 1626 - snapc, 1627 - ofs, 1628 - op_size, bio, 1629 - coll, cur_seg); 1545 + bio_chain = bio_chain_clone_range(&bio, 1546 + &bio_offset, chain_size, 1547 + GFP_ATOMIC); 1548 + if (bio_chain) 1549 + (void) rbd_do_op(rq, rbd_dev, snapc, 1550 + ofs, chain_size, 1551 + bio_chain, coll, cur_seg); 1630 1552 else 1631 - rbd_req_read(rq, rbd_dev, 1632 - rbd_dev->mapping.snap_id, 1633 - ofs, 1634 - op_size, bio, 1635 - coll, cur_seg); 1636 - 1637 - next_seg: 1638 - size -= op_size; 1639 - ofs += op_size; 1553 + rbd_coll_end_req_index(rq, coll, cur_seg, 1554 + -ENOMEM, chain_size); 1555 + size -= chain_size; 1556 + ofs += chain_size; 1640 1557 1641 1558 cur_seg++; 1642 - rq_bio = next_bio; 1643 1559 } while (size > 0); 1644 1560 kref_put(&coll->kref, rbd_coll_release); 1645 1561 1646 - if (bp) 1647 - bio_pair_release(bp); 1648 1562 spin_lock_irq(q->queue_lock); 1649 1563 1650 1564 ceph_put_snap_context(snapc); ··· 1646 1576 /* 1647 1577 * a queue callback. Makes sure that we don't create a bio that spans across 1648 1578 * multiple osd objects. One exception would be with a single page bios, 1649 - * which we handle later at bio_chain_clone 1579 + * which we handle later at bio_chain_clone_range() 1650 1580 */ 1651 1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1652 1582 struct bio_vec *bvec) 1653 1583 { 1654 1584 struct rbd_device *rbd_dev = q->queuedata; 1655 - unsigned int chunk_sectors; 1656 - sector_t sector; 1657 - unsigned int bio_sectors; 1658 - int max; 1585 + sector_t sector_offset; 1586 + sector_t sectors_per_obj; 1587 + sector_t obj_sector_offset; 1588 + int ret; 1659 1589 1660 - chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 1661 - sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1662 - bio_sectors = bmd->bi_size >> SECTOR_SHIFT; 1590 + /* 1591 + * Find how far into its rbd object the partition-relative 1592 + * bio start sector is to offset relative to the enclosing 1593 + * device. 1594 + */ 1595 + sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; 1596 + sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 1597 + obj_sector_offset = sector_offset & (sectors_per_obj - 1); 1663 1598 1664 - max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1665 - + bio_sectors)) << SECTOR_SHIFT; 1666 - if (max < 0) 1667 - max = 0; /* bio_add cannot handle a negative return */ 1668 - if (max <= bvec->bv_len && bio_sectors == 0) 1669 - return bvec->bv_len; 1670 - return max; 1599 + /* 1600 + * Compute the number of bytes from that offset to the end 1601 + * of the object. Account for what's already used by the bio. 1602 + */ 1603 + ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; 1604 + if (ret > bmd->bi_size) 1605 + ret -= bmd->bi_size; 1606 + else 1607 + ret = 0; 1608 + 1609 + /* 1610 + * Don't send back more than was asked for. And if the bio 1611 + * was empty, let the whole thing through because: "Note 1612 + * that a block device *must* allow a single page to be 1613 + * added to an empty bio." 1614 + */ 1615 + rbd_assert(bvec->bv_len <= PAGE_SIZE); 1616 + if (ret > (int) bvec->bv_len || !bmd->bi_size) 1617 + ret = (int) bvec->bv_len; 1618 + 1619 + return ret; 1671 1620 } 1672 1621 1673 1622 static void rbd_free_disk(struct rbd_device *rbd_dev) ··· 1752 1663 ret = -ENXIO; 1753 1664 pr_warning("short header read for image %s" 1754 1665 " (want %zd got %d)\n", 1755 - rbd_dev->image_name, size, ret); 1666 + rbd_dev->spec->image_name, size, ret); 1756 1667 goto out_err; 1757 1668 } 1758 1669 if (!rbd_dev_ondisk_valid(ondisk)) { 1759 1670 ret = -ENXIO; 1760 1671 pr_warning("invalid header for image %s\n", 1761 - rbd_dev->image_name); 1672 + rbd_dev->spec->image_name); 1762 1673 goto out_err; 1763 1674 } 1764 1675 ··· 1796 1707 return ret; 1797 1708 } 1798 1709 1799 - static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1710 + static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1800 1711 { 1801 1712 struct rbd_snap *snap; 1802 1713 struct rbd_snap *next; 1803 1714 1804 1715 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 1805 - __rbd_remove_snap_dev(snap); 1716 + rbd_remove_snap_dev(snap); 1717 + } 1718 + 1719 + static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 1720 + { 1721 + sector_t size; 1722 + 1723 + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 1724 + return; 1725 + 1726 + size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; 1727 + dout("setting size to %llu sectors", (unsigned long long) size); 1728 + rbd_dev->mapping.size = (u64) size; 1729 + set_capacity(rbd_dev->disk, size); 1806 1730 } 1807 1731 1808 1732 /* 1809 1733 * only read the first part of the ondisk header, without the snaps info 1810 1734 */ 1811 - static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1735 + static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) 1812 1736 { 1813 1737 int ret; 1814 1738 struct rbd_image_header h; ··· 1832 1730 1833 1731 down_write(&rbd_dev->header_rwsem); 1834 1732 1835 - /* resized? */ 1836 - if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) { 1837 - sector_t size = (sector_t) h.image_size / SECTOR_SIZE; 1838 - 1839 - if (size != (sector_t) rbd_dev->mapping.size) { 1840 - dout("setting size to %llu sectors", 1841 - (unsigned long long) size); 1842 - rbd_dev->mapping.size = (u64) size; 1843 - set_capacity(rbd_dev->disk, size); 1844 - } 1845 - } 1733 + /* Update image size, and check for resize of mapped image */ 1734 + rbd_dev->header.image_size = h.image_size; 1735 + rbd_update_mapping_size(rbd_dev); 1846 1736 1847 1737 /* rbd_dev->header.object_prefix shouldn't change */ 1848 1738 kfree(rbd_dev->header.snap_sizes); ··· 1862 1768 return ret; 1863 1769 } 1864 1770 1865 - static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1771 + static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) 1866 1772 { 1867 1773 int ret; 1868 1774 1775 + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1869 1776 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1870 - ret = __rbd_refresh_header(rbd_dev, hver); 1777 + if (rbd_dev->image_format == 1) 1778 + ret = rbd_dev_v1_refresh(rbd_dev, hver); 1779 + else 1780 + ret = rbd_dev_v2_refresh(rbd_dev, hver); 1871 1781 mutex_unlock(&ctl_mutex); 1872 1782 1873 1783 return ret; ··· 1983 1885 { 1984 1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1985 1887 1986 - return sprintf(buf, "%s\n", rbd_dev->pool_name); 1888 + return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 1987 1889 } 1988 1890 1989 1891 static ssize_t rbd_pool_id_show(struct device *dev, ··· 1991 1893 { 1992 1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1993 1895 1994 - return sprintf(buf, "%d\n", rbd_dev->pool_id); 1896 + return sprintf(buf, "%llu\n", 1897 + (unsigned long long) rbd_dev->spec->pool_id); 1995 1898 } 1996 1899 1997 1900 static ssize_t rbd_name_show(struct device *dev, ··· 2000 1901 { 2001 1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2002 1903 2003 - return sprintf(buf, "%s\n", rbd_dev->image_name); 1904 + if (rbd_dev->spec->image_name) 1905 + return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 1906 + 1907 + return sprintf(buf, "(unknown)\n"); 2004 1908 } 2005 1909 2006 1910 static ssize_t rbd_image_id_show(struct device *dev, ··· 2011 1909 { 2012 1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2013 1911 2014 - return sprintf(buf, "%s\n", rbd_dev->image_id); 1912 + return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 2015 1913 } 2016 1914 2017 1915 /* ··· 2024 1922 { 2025 1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2026 1924 2027 - return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name); 1925 + return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 1926 + } 1927 + 1928 + /* 1929 + * For an rbd v2 image, shows the pool id, image id, and snapshot id 1930 + * for the parent image. If there is no parent, simply shows 1931 + * "(no parent image)". 1932 + */ 1933 + static ssize_t rbd_parent_show(struct device *dev, 1934 + struct device_attribute *attr, 1935 + char *buf) 1936 + { 1937 + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1938 + struct rbd_spec *spec = rbd_dev->parent_spec; 1939 + int count; 1940 + char *bufp = buf; 1941 + 1942 + if (!spec) 1943 + return sprintf(buf, "(no parent image)\n"); 1944 + 1945 + count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 1946 + (unsigned long long) spec->pool_id, spec->pool_name); 1947 + if (count < 0) 1948 + return count; 1949 + bufp += count; 1950 + 1951 + count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 1952 + spec->image_name ? spec->image_name : "(unknown)"); 1953 + if (count < 0) 1954 + return count; 1955 + bufp += count; 1956 + 1957 + count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 1958 + (unsigned long long) spec->snap_id, spec->snap_name); 1959 + if (count < 0) 1960 + return count; 1961 + bufp += count; 1962 + 1963 + count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); 1964 + if (count < 0) 1965 + return count; 1966 + bufp += count; 1967 + 1968 + return (ssize_t) (bufp - buf); 2028 1969 } 2029 1970 2030 1971 static ssize_t rbd_image_refresh(struct device *dev, ··· 2078 1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2079 1934 int ret; 2080 1935 2081 - ret = rbd_refresh_header(rbd_dev, NULL); 1936 + ret = rbd_dev_refresh(rbd_dev, NULL); 2082 1937 2083 1938 return ret < 0 ? ret : size; 2084 1939 } ··· 2093 1948 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 2094 1949 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 2095 1950 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 1951 + static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 2096 1952 2097 1953 static struct attribute *rbd_attrs[] = { 2098 1954 &dev_attr_size.attr, ··· 2105 1959 &dev_attr_name.attr, 2106 1960 &dev_attr_image_id.attr, 2107 1961 &dev_attr_current_snap.attr, 1962 + &dev_attr_parent.attr, 2108 1963 &dev_attr_refresh.attr, 2109 1964 NULL 2110 1965 }; ··· 2194 2047 .release = rbd_snap_dev_release, 2195 2048 }; 2196 2049 2050 + static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 2051 + { 2052 + kref_get(&spec->kref); 2053 + 2054 + return spec; 2055 + } 2056 + 2057 + static void rbd_spec_free(struct kref *kref); 2058 + static void rbd_spec_put(struct rbd_spec *spec) 2059 + { 2060 + if (spec) 2061 + kref_put(&spec->kref, rbd_spec_free); 2062 + } 2063 + 2064 + static struct rbd_spec *rbd_spec_alloc(void) 2065 + { 2066 + struct rbd_spec *spec; 2067 + 2068 + spec = kzalloc(sizeof (*spec), GFP_KERNEL); 2069 + if (!spec) 2070 + return NULL; 2071 + kref_init(&spec->kref); 2072 + 2073 + rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */ 2074 + 2075 + return spec; 2076 + } 2077 + 2078 + static void rbd_spec_free(struct kref *kref) 2079 + { 2080 + struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 2081 + 2082 + kfree(spec->pool_name); 2083 + kfree(spec->image_id); 2084 + kfree(spec->image_name); 2085 + kfree(spec->snap_name); 2086 + kfree(spec); 2087 + } 2088 + 2089 + struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 2090 + struct rbd_spec *spec) 2091 + { 2092 + struct rbd_device *rbd_dev; 2093 + 2094 + rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 2095 + if (!rbd_dev) 2096 + return NULL; 2097 + 2098 + spin_lock_init(&rbd_dev->lock); 2099 + INIT_LIST_HEAD(&rbd_dev->node); 2100 + INIT_LIST_HEAD(&rbd_dev->snaps); 2101 + init_rwsem(&rbd_dev->header_rwsem); 2102 + 2103 + rbd_dev->spec = spec; 2104 + rbd_dev->rbd_client = rbdc; 2105 + 2106 + return rbd_dev; 2107 + } 2108 + 2109 + static void rbd_dev_destroy(struct rbd_device *rbd_dev) 2110 + { 2111 + rbd_spec_put(rbd_dev->parent_spec); 2112 + kfree(rbd_dev->header_name); 2113 + rbd_put_client(rbd_dev->rbd_client); 2114 + rbd_spec_put(rbd_dev->spec); 2115 + kfree(rbd_dev); 2116 + } 2117 + 2197 2118 static bool rbd_snap_registered(struct rbd_snap *snap) 2198 2119 { 2199 2120 bool ret = snap->dev.type == &rbd_snap_device_type; ··· 2272 2057 return ret; 2273 2058 } 2274 2059 2275 - static void __rbd_remove_snap_dev(struct rbd_snap *snap) 2060 + static void rbd_remove_snap_dev(struct rbd_snap *snap) 2276 2061 { 2277 2062 list_del(&snap->node); 2278 2063 if (device_is_registered(&snap->dev)) ··· 2288 2073 dev->type = &rbd_snap_device_type; 2289 2074 dev->parent = parent; 2290 2075 dev->release = rbd_snap_dev_release; 2291 - dev_set_name(dev, "snap_%s", snap->name); 2076 + dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name); 2292 2077 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2293 2078 2294 2079 ret = device_register(dev); ··· 2404 2189 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2405 2190 if (ret < 0) 2406 2191 goto out; 2192 + ret = 0; /* rbd_req_sync_exec() can return positive */ 2407 2193 2408 2194 p = reply_buf; 2409 2195 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, ··· 2432 2216 __le64 features; 2433 2217 __le64 incompat; 2434 2218 } features_buf = { 0 }; 2219 + u64 incompat; 2435 2220 int ret; 2436 2221 2437 2222 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, ··· 2443 2226 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2444 2227 if (ret < 0) 2445 2228 return ret; 2229 + 2230 + incompat = le64_to_cpu(features_buf.incompat); 2231 + if (incompat & ~RBD_FEATURES_ALL) 2232 + return -ENXIO; 2233 + 2446 2234 *snap_features = le64_to_cpu(features_buf.features); 2447 2235 2448 2236 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", ··· 2462 2240 { 2463 2241 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 2464 2242 &rbd_dev->header.features); 2243 + } 2244 + 2245 + static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 2246 + { 2247 + struct rbd_spec *parent_spec; 2248 + size_t size; 2249 + void *reply_buf = NULL; 2250 + __le64 snapid; 2251 + void *p; 2252 + void *end; 2253 + char *image_id; 2254 + u64 overlap; 2255 + size_t len = 0; 2256 + int ret; 2257 + 2258 + parent_spec = rbd_spec_alloc(); 2259 + if (!parent_spec) 2260 + return -ENOMEM; 2261 + 2262 + size = sizeof (__le64) + /* pool_id */ 2263 + sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 2264 + sizeof (__le64) + /* snap_id */ 2265 + sizeof (__le64); /* overlap */ 2266 + reply_buf = kmalloc(size, GFP_KERNEL); 2267 + if (!reply_buf) { 2268 + ret = -ENOMEM; 2269 + goto out_err; 2270 + } 2271 + 2272 + snapid = cpu_to_le64(CEPH_NOSNAP); 2273 + ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2274 + "rbd", "get_parent", 2275 + (char *) &snapid, sizeof (snapid), 2276 + (char *) reply_buf, size, 2277 + CEPH_OSD_FLAG_READ, NULL); 2278 + dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2279 + if (ret < 0) 2280 + goto out_err; 2281 + 2282 + ret = -ERANGE; 2283 + p = reply_buf; 2284 + end = (char *) reply_buf + size; 2285 + ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 2286 + if (parent_spec->pool_id == CEPH_NOPOOL) 2287 + goto out; /* No parent? No problem. */ 2288 + 2289 + image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 2290 + if (IS_ERR(image_id)) { 2291 + ret = PTR_ERR(image_id); 2292 + goto out_err; 2293 + } 2294 + parent_spec->image_id = image_id; 2295 + parent_spec->image_id_len = len; 2296 + ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 2297 + ceph_decode_64_safe(&p, end, overlap, out_err); 2298 + 2299 + rbd_dev->parent_overlap = overlap; 2300 + rbd_dev->parent_spec = parent_spec; 2301 + parent_spec = NULL; /* rbd_dev now owns this */ 2302 + out: 2303 + ret = 0; 2304 + out_err: 2305 + kfree(reply_buf); 2306 + rbd_spec_put(parent_spec); 2307 + 2308 + return ret; 2309 + } 2310 + 2311 + static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 2312 + { 2313 + size_t image_id_size; 2314 + char *image_id; 2315 + void *p; 2316 + void *end; 2317 + size_t size; 2318 + void *reply_buf = NULL; 2319 + size_t len = 0; 2320 + char *image_name = NULL; 2321 + int ret; 2322 + 2323 + rbd_assert(!rbd_dev->spec->image_name); 2324 + 2325 + image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len; 2326 + image_id = kmalloc(image_id_size, GFP_KERNEL); 2327 + if (!image_id) 2328 + return NULL; 2329 + 2330 + p = image_id; 2331 + end = (char *) image_id + image_id_size; 2332 + ceph_encode_string(&p, end, rbd_dev->spec->image_id, 2333 + (u32) rbd_dev->spec->image_id_len); 2334 + 2335 + size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 2336 + reply_buf = kmalloc(size, GFP_KERNEL); 2337 + if (!reply_buf) 2338 + goto out; 2339 + 2340 + ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY, 2341 + "rbd", "dir_get_name", 2342 + image_id, image_id_size, 2343 + (char *) reply_buf, size, 2344 + CEPH_OSD_FLAG_READ, NULL); 2345 + if (ret < 0) 2346 + goto out; 2347 + p = reply_buf; 2348 + end = (char *) reply_buf + size; 2349 + image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 2350 + if (IS_ERR(image_name)) 2351 + image_name = NULL; 2352 + else 2353 + dout("%s: name is %s len is %zd\n", __func__, image_name, len); 2354 + out: 2355 + kfree(reply_buf); 2356 + kfree(image_id); 2357 + 2358 + return image_name; 2359 + } 2360 + 2361 + /* 2362 + * When a parent image gets probed, we only have the pool, image, 2363 + * and snapshot ids but not the names of any of them. This call 2364 + * is made later to fill in those names. It has to be done after 2365 + * rbd_dev_snaps_update() has completed because some of the 2366 + * information (in particular, snapshot name) is not available 2367 + * until then. 2368 + */ 2369 + static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) 2370 + { 2371 + struct ceph_osd_client *osdc; 2372 + const char *name; 2373 + void *reply_buf = NULL; 2374 + int ret; 2375 + 2376 + if (rbd_dev->spec->pool_name) 2377 + return 0; /* Already have the names */ 2378 + 2379 + /* Look up the pool name */ 2380 + 2381 + osdc = &rbd_dev->rbd_client->client->osdc; 2382 + name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 2383 + if (!name) 2384 + return -EIO; /* pool id too large (>= 2^31) */ 2385 + 2386 + rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 2387 + if (!rbd_dev->spec->pool_name) 2388 + return -ENOMEM; 2389 + 2390 + /* Fetch the image name; tolerate failure here */ 2391 + 2392 + name = rbd_dev_image_name(rbd_dev); 2393 + if (name) { 2394 + rbd_dev->spec->image_name_len = strlen(name); 2395 + rbd_dev->spec->image_name = (char *) name; 2396 + } else { 2397 + pr_warning(RBD_DRV_NAME "%d " 2398 + "unable to get image name for image id %s\n", 2399 + rbd_dev->major, rbd_dev->spec->image_id); 2400 + } 2401 + 2402 + /* Look up the snapshot name. */ 2403 + 2404 + name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 2405 + if (!name) { 2406 + ret = -EIO; 2407 + goto out_err; 2408 + } 2409 + rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); 2410 + if(!rbd_dev->spec->snap_name) 2411 + goto out_err; 2412 + 2413 + return 0; 2414 + out_err: 2415 + kfree(reply_buf); 2416 + kfree(rbd_dev->spec->pool_name); 2417 + rbd_dev->spec->pool_name = NULL; 2418 + 2419 + return ret; 2465 2420 } 2466 2421 2467 2422 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) ··· 2727 2328 int ret; 2728 2329 void *p; 2729 2330 void *end; 2730 - size_t snap_name_len; 2731 2331 char *snap_name; 2732 2332 2733 2333 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; ··· 2746 2348 2747 2349 p = reply_buf; 2748 2350 end = (char *) reply_buf + size; 2749 - snap_name_len = 0; 2750 - snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len, 2751 - GFP_KERNEL); 2351 + snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 2752 2352 if (IS_ERR(snap_name)) { 2753 2353 ret = PTR_ERR(snap_name); 2754 2354 goto out; ··· 2793 2397 return ERR_PTR(-EINVAL); 2794 2398 } 2795 2399 2400 + static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) 2401 + { 2402 + int ret; 2403 + __u8 obj_order; 2404 + 2405 + down_write(&rbd_dev->header_rwsem); 2406 + 2407 + /* Grab old order first, to see if it changes */ 2408 + 2409 + obj_order = rbd_dev->header.obj_order, 2410 + ret = rbd_dev_v2_image_size(rbd_dev); 2411 + if (ret) 2412 + goto out; 2413 + if (rbd_dev->header.obj_order != obj_order) { 2414 + ret = -EIO; 2415 + goto out; 2416 + } 2417 + rbd_update_mapping_size(rbd_dev); 2418 + 2419 + ret = rbd_dev_v2_snap_context(rbd_dev, hver); 2420 + dout("rbd_dev_v2_snap_context returned %d\n", ret); 2421 + if (ret) 2422 + goto out; 2423 + ret = rbd_dev_snaps_update(rbd_dev); 2424 + dout("rbd_dev_snaps_update returned %d\n", ret); 2425 + if (ret) 2426 + goto out; 2427 + ret = rbd_dev_snaps_register(rbd_dev); 2428 + dout("rbd_dev_snaps_register returned %d\n", ret); 2429 + out: 2430 + up_write(&rbd_dev->header_rwsem); 2431 + 2432 + return ret; 2433 + } 2434 + 2796 2435 /* 2797 2436 * Scan the rbd device's current snapshot list and compare it to the 2798 2437 * newly-received snapshot context. Remove any existing snapshots ··· 2867 2436 2868 2437 /* Existing snapshot not in the new snap context */ 2869 2438 2870 - if (rbd_dev->mapping.snap_id == snap->id) 2871 - rbd_dev->mapping.snap_exists = false; 2872 - __rbd_remove_snap_dev(snap); 2439 + if (rbd_dev->spec->snap_id == snap->id) 2440 + rbd_dev->exists = false; 2441 + rbd_remove_snap_dev(snap); 2873 2442 dout("%ssnap id %llu has been removed\n", 2874 - rbd_dev->mapping.snap_id == snap->id ? 2875 - "mapped " : "", 2443 + rbd_dev->spec->snap_id == snap->id ? 2444 + "mapped " : "", 2876 2445 (unsigned long long) snap->id); 2877 2446 2878 2447 /* Done with this list entry; advance */ ··· 2990 2559 do { 2991 2560 ret = rbd_req_sync_watch(rbd_dev); 2992 2561 if (ret == -ERANGE) { 2993 - rc = rbd_refresh_header(rbd_dev, NULL); 2562 + rc = rbd_dev_refresh(rbd_dev, NULL); 2994 2563 if (rc < 0) 2995 2564 return rc; 2996 2565 } ··· 3052 2621 struct rbd_device *rbd_dev; 3053 2622 3054 2623 rbd_dev = list_entry(tmp, struct rbd_device, node); 3055 - if (rbd_id > max_id) 3056 - max_id = rbd_id; 2624 + if (rbd_dev->dev_id > max_id) 2625 + max_id = rbd_dev->dev_id; 3057 2626 } 3058 2627 spin_unlock(&rbd_dev_list_lock); 3059 2628 ··· 3153 2722 } 3154 2723 3155 2724 /* 3156 - * This fills in the pool_name, image_name, image_name_len, rbd_dev, 3157 - * rbd_md_name, and name fields of the given rbd_dev, based on the 3158 - * list of monitor addresses and other options provided via 3159 - * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated 3160 - * copy of the snapshot name to map if successful, or a 3161 - * pointer-coded error otherwise. 2725 + * Parse the options provided for an "rbd add" (i.e., rbd image 2726 + * mapping) request. These arrive via a write to /sys/bus/rbd/add, 2727 + * and the data written is passed here via a NUL-terminated buffer. 2728 + * Returns 0 if successful or an error code otherwise. 3162 2729 * 3163 - * Note: rbd_dev is assumed to have been initially zero-filled. 2730 + * The information extracted from these options is recorded in 2731 + * the other parameters which return dynamically-allocated 2732 + * structures: 2733 + * ceph_opts 2734 + * The address of a pointer that will refer to a ceph options 2735 + * structure. Caller must release the returned pointer using 2736 + * ceph_destroy_options() when it is no longer needed. 2737 + * rbd_opts 2738 + * Address of an rbd options pointer. Fully initialized by 2739 + * this function; caller must release with kfree(). 2740 + * spec 2741 + * Address of an rbd image specification pointer. Fully 2742 + * initialized by this function based on parsed options. 2743 + * Caller must release with rbd_spec_put(). 2744 + * 2745 + * The options passed take this form: 2746 + * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 2747 + * where: 2748 + * <mon_addrs> 2749 + * A comma-separated list of one or more monitor addresses. 2750 + * A monitor address is an ip address, optionally followed 2751 + * by a port number (separated by a colon). 2752 + * I.e.: ip1[:port1][,ip2[:port2]...] 2753 + * <options> 2754 + * A comma-separated list of ceph and/or rbd options. 2755 + * <pool_name> 2756 + * The name of the rados pool containing the rbd image. 2757 + * <image_name> 2758 + * The name of the image in that pool to map. 2759 + * <snap_id> 2760 + * An optional snapshot id. If provided, the mapping will 2761 + * present data from the image at the time that snapshot was 2762 + * created. The image head is used if no snapshot id is 2763 + * provided. Snapshot mappings are always read-only. 3164 2764 */ 3165 - static char *rbd_add_parse_args(struct rbd_device *rbd_dev, 3166 - const char *buf, 3167 - const char **mon_addrs, 3168 - size_t *mon_addrs_size, 3169 - char *options, 3170 - size_t options_size) 2765 + static int rbd_add_parse_args(const char *buf, 2766 + struct ceph_options **ceph_opts, 2767 + struct rbd_options **opts, 2768 + struct rbd_spec **rbd_spec) 3171 2769 { 3172 2770 size_t len; 3173 - char *err_ptr = ERR_PTR(-EINVAL); 3174 - char *snap_name; 2771 + char *options; 2772 + const char *mon_addrs; 2773 + size_t mon_addrs_size; 2774 + struct rbd_spec *spec = NULL; 2775 + struct rbd_options *rbd_opts = NULL; 2776 + struct ceph_options *copts; 2777 + int ret; 3175 2778 3176 2779 /* The first four tokens are required */ 3177 2780 3178 2781 len = next_token(&buf); 3179 2782 if (!len) 3180 - return err_ptr; 3181 - *mon_addrs_size = len + 1; 3182 - *mon_addrs = buf; 3183 - 2783 + return -EINVAL; /* Missing monitor address(es) */ 2784 + mon_addrs = buf; 2785 + mon_addrs_size = len + 1; 3184 2786 buf += len; 3185 2787 3186 - len = copy_token(&buf, options, options_size); 3187 - if (!len || len >= options_size) 3188 - return err_ptr; 2788 + ret = -EINVAL; 2789 + options = dup_token(&buf, NULL); 2790 + if (!options) 2791 + return -ENOMEM; 2792 + if (!*options) 2793 + goto out_err; /* Missing options */ 3189 2794 3190 - err_ptr = ERR_PTR(-ENOMEM); 3191 - rbd_dev->pool_name = dup_token(&buf, NULL); 3192 - if (!rbd_dev->pool_name) 3193 - goto out_err; 2795 + spec = rbd_spec_alloc(); 2796 + if (!spec) 2797 + goto out_mem; 3194 2798 3195 - rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len); 3196 - if (!rbd_dev->image_name) 3197 - goto out_err; 2799 + spec->pool_name = dup_token(&buf, NULL); 2800 + if (!spec->pool_name) 2801 + goto out_mem; 2802 + if (!*spec->pool_name) 2803 + goto out_err; /* Missing pool name */ 3198 2804 3199 - /* Snapshot name is optional */ 2805 + spec->image_name = dup_token(&buf, &spec->image_name_len); 2806 + if (!spec->image_name) 2807 + goto out_mem; 2808 + if (!*spec->image_name) 2809 + goto out_err; /* Missing image name */ 2810 + 2811 + /* 2812 + * Snapshot name is optional; default is to use "-" 2813 + * (indicating the head/no snapshot). 2814 + */ 3200 2815 len = next_token(&buf); 3201 2816 if (!len) { 3202 2817 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 3203 2818 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 3204 - } 3205 - snap_name = kmalloc(len + 1, GFP_KERNEL); 3206 - if (!snap_name) 2819 + } else if (len > RBD_MAX_SNAP_NAME_LEN) { 2820 + ret = -ENAMETOOLONG; 3207 2821 goto out_err; 3208 - memcpy(snap_name, buf, len); 3209 - *(snap_name + len) = '\0'; 2822 + } 2823 + spec->snap_name = kmalloc(len + 1, GFP_KERNEL); 2824 + if (!spec->snap_name) 2825 + goto out_mem; 2826 + memcpy(spec->snap_name, buf, len); 2827 + *(spec->snap_name + len) = '\0'; 3210 2828 3211 - dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len); 2829 + /* Initialize all rbd options to the defaults */ 3212 2830 3213 - return snap_name; 2831 + rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 2832 + if (!rbd_opts) 2833 + goto out_mem; 3214 2834 2835 + rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 2836 + 2837 + copts = ceph_parse_options(options, mon_addrs, 2838 + mon_addrs + mon_addrs_size - 1, 2839 + parse_rbd_opts_token, rbd_opts); 2840 + if (IS_ERR(copts)) { 2841 + ret = PTR_ERR(copts); 2842 + goto out_err; 2843 + } 2844 + kfree(options); 2845 + 2846 + *ceph_opts = copts; 2847 + *opts = rbd_opts; 2848 + *rbd_spec = spec; 2849 + 2850 + return 0; 2851 + out_mem: 2852 + ret = -ENOMEM; 3215 2853 out_err: 3216 - kfree(rbd_dev->image_name); 3217 - rbd_dev->image_name = NULL; 3218 - rbd_dev->image_name_len = 0; 3219 - kfree(rbd_dev->pool_name); 3220 - rbd_dev->pool_name = NULL; 2854 + kfree(rbd_opts); 2855 + rbd_spec_put(spec); 2856 + kfree(options); 3221 2857 3222 - return err_ptr; 2858 + return ret; 3223 2859 } 3224 2860 3225 2861 /* ··· 3312 2814 void *p; 3313 2815 3314 2816 /* 2817 + * When probing a parent image, the image id is already 2818 + * known (and the image name likely is not). There's no 2819 + * need to fetch the image id again in this case. 2820 + */ 2821 + if (rbd_dev->spec->image_id) 2822 + return 0; 2823 + 2824 + /* 3315 2825 * First, see if the format 2 image id file exists, and if 3316 2826 * so, get the image's persistent id from it. 3317 2827 */ 3318 - size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len; 2828 + size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len; 3319 2829 object_name = kmalloc(size, GFP_NOIO); 3320 2830 if (!object_name) 3321 2831 return -ENOMEM; 3322 - sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name); 2832 + sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 3323 2833 dout("rbd id object name is %s\n", object_name); 3324 2834 3325 2835 /* Response will be an encoded string, which includes a length */ ··· 3347 2841 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 3348 2842 if (ret < 0) 3349 2843 goto out; 2844 + ret = 0; /* rbd_req_sync_exec() can return positive */ 3350 2845 3351 2846 p = response; 3352 - rbd_dev->image_id = ceph_extract_encoded_string(&p, 2847 + rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 3353 2848 p + RBD_IMAGE_ID_LEN_MAX, 3354 - &rbd_dev->image_id_len, 2849 + &rbd_dev->spec->image_id_len, 3355 2850 GFP_NOIO); 3356 - if (IS_ERR(rbd_dev->image_id)) { 3357 - ret = PTR_ERR(rbd_dev->image_id); 3358 - rbd_dev->image_id = NULL; 2851 + if (IS_ERR(rbd_dev->spec->image_id)) { 2852 + ret = PTR_ERR(rbd_dev->spec->image_id); 2853 + rbd_dev->spec->image_id = NULL; 3359 2854 } else { 3360 - dout("image_id is %s\n", rbd_dev->image_id); 2855 + dout("image_id is %s\n", rbd_dev->spec->image_id); 3361 2856 } 3362 2857 out: 3363 2858 kfree(response); ··· 3374 2867 3375 2868 /* Version 1 images have no id; empty string is used */ 3376 2869 3377 - rbd_dev->image_id = kstrdup("", GFP_KERNEL); 3378 - if (!rbd_dev->image_id) 2870 + rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 2871 + if (!rbd_dev->spec->image_id) 3379 2872 return -ENOMEM; 3380 - rbd_dev->image_id_len = 0; 2873 + rbd_dev->spec->image_id_len = 0; 3381 2874 3382 2875 /* Record the header object name for this rbd image. */ 3383 2876 3384 - size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX); 2877 + size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX); 3385 2878 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3386 2879 if (!rbd_dev->header_name) { 3387 2880 ret = -ENOMEM; 3388 2881 goto out_err; 3389 2882 } 3390 - sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX); 2883 + sprintf(rbd_dev->header_name, "%s%s", 2884 + rbd_dev->spec->image_name, RBD_SUFFIX); 3391 2885 3392 2886 /* Populate rbd image metadata */ 3393 2887 3394 2888 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 3395 2889 if (ret < 0) 3396 2890 goto out_err; 2891 + 2892 + /* Version 1 images have no parent (no layering) */ 2893 + 2894 + rbd_dev->parent_spec = NULL; 2895 + rbd_dev->parent_overlap = 0; 2896 + 3397 2897 rbd_dev->image_format = 1; 3398 2898 3399 2899 dout("discovered version 1 image, header name is %s\n", ··· 3411 2897 out_err: 3412 2898 kfree(rbd_dev->header_name); 3413 2899 rbd_dev->header_name = NULL; 3414 - kfree(rbd_dev->image_id); 3415 - rbd_dev->image_id = NULL; 2900 + kfree(rbd_dev->spec->image_id); 2901 + rbd_dev->spec->image_id = NULL; 3416 2902 3417 2903 return ret; 3418 2904 } ··· 3427 2913 * Image id was filled in by the caller. Record the header 3428 2914 * object name for this rbd image. 3429 2915 */ 3430 - size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len; 2916 + size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len; 3431 2917 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3432 2918 if (!rbd_dev->header_name) 3433 2919 return -ENOMEM; 3434 2920 sprintf(rbd_dev->header_name, "%s%s", 3435 - RBD_HEADER_PREFIX, rbd_dev->image_id); 2921 + RBD_HEADER_PREFIX, rbd_dev->spec->image_id); 3436 2922 3437 2923 /* Get the size and object order for the image */ 3438 2924 ··· 3446 2932 if (ret < 0) 3447 2933 goto out_err; 3448 2934 3449 - /* Get the features for the image */ 2935 + /* Get the and check features for the image */ 3450 2936 3451 2937 ret = rbd_dev_v2_features(rbd_dev); 3452 2938 if (ret < 0) 3453 2939 goto out_err; 2940 + 2941 + /* If the image supports layering, get the parent info */ 2942 + 2943 + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 2944 + ret = rbd_dev_v2_parent_info(rbd_dev); 2945 + if (ret < 0) 2946 + goto out_err; 2947 + } 3454 2948 3455 2949 /* crypto and compression type aren't (yet) supported for v2 images */ 3456 2950 ··· 3477 2955 dout("discovered version 2 image, header name is %s\n", 3478 2956 rbd_dev->header_name); 3479 2957 3480 - return -ENOTSUPP; 2958 + return 0; 3481 2959 out_err: 2960 + rbd_dev->parent_overlap = 0; 2961 + rbd_spec_put(rbd_dev->parent_spec); 2962 + rbd_dev->parent_spec = NULL; 3482 2963 kfree(rbd_dev->header_name); 3483 2964 rbd_dev->header_name = NULL; 3484 2965 kfree(rbd_dev->header.object_prefix); 3485 2966 rbd_dev->header.object_prefix = NULL; 2967 + 2968 + return ret; 2969 + } 2970 + 2971 + static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) 2972 + { 2973 + int ret; 2974 + 2975 + /* no need to lock here, as rbd_dev is not registered yet */ 2976 + ret = rbd_dev_snaps_update(rbd_dev); 2977 + if (ret) 2978 + return ret; 2979 + 2980 + ret = rbd_dev_probe_update_spec(rbd_dev); 2981 + if (ret) 2982 + goto err_out_snaps; 2983 + 2984 + ret = rbd_dev_set_mapping(rbd_dev); 2985 + if (ret) 2986 + goto err_out_snaps; 2987 + 2988 + /* generate unique id: find highest unique id, add one */ 2989 + rbd_dev_id_get(rbd_dev); 2990 + 2991 + /* Fill in the device name, now that we have its id. */ 2992 + BUILD_BUG_ON(DEV_NAME_LEN 2993 + < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 2994 + sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 2995 + 2996 + /* Get our block major device number. */ 2997 + 2998 + ret = register_blkdev(0, rbd_dev->name); 2999 + if (ret < 0) 3000 + goto err_out_id; 3001 + rbd_dev->major = ret; 3002 + 3003 + /* Set up the blkdev mapping. */ 3004 + 3005 + ret = rbd_init_disk(rbd_dev); 3006 + if (ret) 3007 + goto err_out_blkdev; 3008 + 3009 + ret = rbd_bus_add_dev(rbd_dev); 3010 + if (ret) 3011 + goto err_out_disk; 3012 + 3013 + /* 3014 + * At this point cleanup in the event of an error is the job 3015 + * of the sysfs code (initiated by rbd_bus_del_dev()). 3016 + */ 3017 + down_write(&rbd_dev->header_rwsem); 3018 + ret = rbd_dev_snaps_register(rbd_dev); 3019 + up_write(&rbd_dev->header_rwsem); 3020 + if (ret) 3021 + goto err_out_bus; 3022 + 3023 + ret = rbd_init_watch_dev(rbd_dev); 3024 + if (ret) 3025 + goto err_out_bus; 3026 + 3027 + /* Everything's ready. Announce the disk to the world. */ 3028 + 3029 + add_disk(rbd_dev->disk); 3030 + 3031 + pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 3032 + (unsigned long long) rbd_dev->mapping.size); 3033 + 3034 + return ret; 3035 + err_out_bus: 3036 + /* this will also clean up rest of rbd_dev stuff */ 3037 + 3038 + rbd_bus_del_dev(rbd_dev); 3039 + 3040 + return ret; 3041 + err_out_disk: 3042 + rbd_free_disk(rbd_dev); 3043 + err_out_blkdev: 3044 + unregister_blkdev(rbd_dev->major, rbd_dev->name); 3045 + err_out_id: 3046 + rbd_dev_id_put(rbd_dev); 3047 + err_out_snaps: 3048 + rbd_remove_all_snaps(rbd_dev); 3486 3049 3487 3050 return ret; 3488 3051 } ··· 3591 2984 ret = rbd_dev_v1_probe(rbd_dev); 3592 2985 else 3593 2986 ret = rbd_dev_v2_probe(rbd_dev); 3594 - if (ret) 2987 + if (ret) { 3595 2988 dout("probe failed, returning %d\n", ret); 2989 + 2990 + return ret; 2991 + } 2992 + 2993 + ret = rbd_dev_probe_finish(rbd_dev); 2994 + if (ret) 2995 + rbd_header_free(&rbd_dev->header); 3596 2996 3597 2997 return ret; 3598 2998 } ··· 3608 2994 const char *buf, 3609 2995 size_t count) 3610 2996 { 3611 - char *options; 3612 2997 struct rbd_device *rbd_dev = NULL; 3613 - const char *mon_addrs = NULL; 3614 - size_t mon_addrs_size = 0; 2998 + struct ceph_options *ceph_opts = NULL; 2999 + struct rbd_options *rbd_opts = NULL; 3000 + struct rbd_spec *spec = NULL; 3001 + struct rbd_client *rbdc; 3615 3002 struct ceph_osd_client *osdc; 3616 3003 int rc = -ENOMEM; 3617 - char *snap_name; 3618 3004 3619 3005 if (!try_module_get(THIS_MODULE)) 3620 3006 return -ENODEV; 3621 3007 3622 - options = kmalloc(count, GFP_KERNEL); 3623 - if (!options) 3624 - goto err_out_mem; 3625 - rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 3626 - if (!rbd_dev) 3627 - goto err_out_mem; 3628 - 3629 - /* static rbd_device initialization */ 3630 - spin_lock_init(&rbd_dev->lock); 3631 - INIT_LIST_HEAD(&rbd_dev->node); 3632 - INIT_LIST_HEAD(&rbd_dev->snaps); 3633 - init_rwsem(&rbd_dev->header_rwsem); 3634 - 3635 3008 /* parse add command */ 3636 - snap_name = rbd_add_parse_args(rbd_dev, buf, 3637 - &mon_addrs, &mon_addrs_size, options, count); 3638 - if (IS_ERR(snap_name)) { 3639 - rc = PTR_ERR(snap_name); 3640 - goto err_out_mem; 3641 - } 3642 - 3643 - rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options); 3009 + rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 3644 3010 if (rc < 0) 3011 + goto err_out_module; 3012 + 3013 + rbdc = rbd_get_client(ceph_opts); 3014 + if (IS_ERR(rbdc)) { 3015 + rc = PTR_ERR(rbdc); 3645 3016 goto err_out_args; 3017 + } 3018 + ceph_opts = NULL; /* rbd_dev client now owns this */ 3646 3019 3647 3020 /* pick the pool */ 3648 - osdc = &rbd_dev->rbd_client->client->osdc; 3649 - rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 3021 + osdc = &rbdc->client->osdc; 3022 + rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 3650 3023 if (rc < 0) 3651 3024 goto err_out_client; 3652 - rbd_dev->pool_id = rc; 3025 + spec->pool_id = (u64) rc; 3026 + 3027 + rbd_dev = rbd_dev_create(rbdc, spec); 3028 + if (!rbd_dev) 3029 + goto err_out_client; 3030 + rbdc = NULL; /* rbd_dev now owns this */ 3031 + spec = NULL; /* rbd_dev now owns this */ 3032 + 3033 + rbd_dev->mapping.read_only = rbd_opts->read_only; 3034 + kfree(rbd_opts); 3035 + rbd_opts = NULL; /* done with this */ 3653 3036 3654 3037 rc = rbd_dev_probe(rbd_dev); 3655 3038 if (rc < 0) 3656 - goto err_out_client; 3657 - rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3658 - 3659 - /* no need to lock here, as rbd_dev is not registered yet */ 3660 - rc = rbd_dev_snaps_update(rbd_dev); 3661 - if (rc) 3662 - goto err_out_header; 3663 - 3664 - rc = rbd_dev_set_mapping(rbd_dev, snap_name); 3665 - if (rc) 3666 - goto err_out_header; 3667 - 3668 - /* generate unique id: find highest unique id, add one */ 3669 - rbd_dev_id_get(rbd_dev); 3670 - 3671 - /* Fill in the device name, now that we have its id. */ 3672 - BUILD_BUG_ON(DEV_NAME_LEN 3673 - < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 3674 - sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 3675 - 3676 - /* Get our block major device number. */ 3677 - 3678 - rc = register_blkdev(0, rbd_dev->name); 3679 - if (rc < 0) 3680 - goto err_out_id; 3681 - rbd_dev->major = rc; 3682 - 3683 - /* Set up the blkdev mapping. */ 3684 - 3685 - rc = rbd_init_disk(rbd_dev); 3686 - if (rc) 3687 - goto err_out_blkdev; 3688 - 3689 - rc = rbd_bus_add_dev(rbd_dev); 3690 - if (rc) 3691 - goto err_out_disk; 3692 - 3693 - /* 3694 - * At this point cleanup in the event of an error is the job 3695 - * of the sysfs code (initiated by rbd_bus_del_dev()). 3696 - */ 3697 - 3698 - down_write(&rbd_dev->header_rwsem); 3699 - rc = rbd_dev_snaps_register(rbd_dev); 3700 - up_write(&rbd_dev->header_rwsem); 3701 - if (rc) 3702 - goto err_out_bus; 3703 - 3704 - rc = rbd_init_watch_dev(rbd_dev); 3705 - if (rc) 3706 - goto err_out_bus; 3707 - 3708 - /* Everything's ready. Announce the disk to the world. */ 3709 - 3710 - add_disk(rbd_dev->disk); 3711 - 3712 - pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 3713 - (unsigned long long) rbd_dev->mapping.size); 3039 + goto err_out_rbd_dev; 3714 3040 3715 3041 return count; 3716 - 3717 - err_out_bus: 3718 - /* this will also clean up rest of rbd_dev stuff */ 3719 - 3720 - rbd_bus_del_dev(rbd_dev); 3721 - kfree(options); 3722 - return rc; 3723 - 3724 - err_out_disk: 3725 - rbd_free_disk(rbd_dev); 3726 - err_out_blkdev: 3727 - unregister_blkdev(rbd_dev->major, rbd_dev->name); 3728 - err_out_id: 3729 - rbd_dev_id_put(rbd_dev); 3730 - err_out_header: 3731 - rbd_header_free(&rbd_dev->header); 3042 + err_out_rbd_dev: 3043 + rbd_dev_destroy(rbd_dev); 3732 3044 err_out_client: 3733 - kfree(rbd_dev->header_name); 3734 - rbd_put_client(rbd_dev); 3735 - kfree(rbd_dev->image_id); 3045 + rbd_put_client(rbdc); 3736 3046 err_out_args: 3737 - kfree(rbd_dev->mapping.snap_name); 3738 - kfree(rbd_dev->image_name); 3739 - kfree(rbd_dev->pool_name); 3740 - err_out_mem: 3741 - kfree(rbd_dev); 3742 - kfree(options); 3047 + if (ceph_opts) 3048 + ceph_destroy_options(ceph_opts); 3049 + kfree(rbd_opts); 3050 + rbd_spec_put(spec); 3051 + err_out_module: 3052 + module_put(THIS_MODULE); 3743 3053 3744 3054 dout("Error adding device %s\n", buf); 3745 - module_put(THIS_MODULE); 3746 3055 3747 3056 return (ssize_t) rc; 3748 3057 } ··· 3700 3163 if (rbd_dev->watch_event) 3701 3164 rbd_req_sync_unwatch(rbd_dev); 3702 3165 3703 - rbd_put_client(rbd_dev); 3704 3166 3705 3167 /* clean up and free blkdev */ 3706 3168 rbd_free_disk(rbd_dev); ··· 3709 3173 rbd_header_free(&rbd_dev->header); 3710 3174 3711 3175 /* done with the id, and with the rbd_dev */ 3712 - kfree(rbd_dev->mapping.snap_name); 3713 - kfree(rbd_dev->image_id); 3714 - kfree(rbd_dev->header_name); 3715 - kfree(rbd_dev->pool_name); 3716 - kfree(rbd_dev->image_name); 3717 3176 rbd_dev_id_put(rbd_dev); 3718 - kfree(rbd_dev); 3177 + rbd_assert(rbd_dev->rbd_client != NULL); 3178 + rbd_dev_destroy(rbd_dev); 3719 3179 3720 3180 /* release module ref */ 3721 3181 module_put(THIS_MODULE); ··· 3743 3211 goto done; 3744 3212 } 3745 3213 3746 - __rbd_remove_all_snaps(rbd_dev); 3214 + if (rbd_dev->open_count) { 3215 + ret = -EBUSY; 3216 + goto done; 3217 + } 3218 + 3219 + rbd_remove_all_snaps(rbd_dev); 3747 3220 rbd_bus_del_dev(rbd_dev); 3748 3221 3749 3222 done:
-2
drivers/block/rbd_types.h
··· 46 46 #define RBD_MIN_OBJ_ORDER 16 47 47 #define RBD_MAX_OBJ_ORDER 30 48 48 49 - #define RBD_MAX_SEG_NAME_LEN 128 50 - 51 49 #define RBD_COMP_NONE 0 52 50 #define RBD_CRYPT_NONE 0 53 51
+56 -4
fs/ceph/addr.c
··· 267 267 kfree(req->r_pages); 268 268 } 269 269 270 + static void ceph_unlock_page_vector(struct page **pages, int num_pages) 271 + { 272 + int i; 273 + 274 + for (i = 0; i < num_pages; i++) 275 + unlock_page(pages[i]); 276 + } 277 + 270 278 /* 271 279 * start an async read(ahead) operation. return nr_pages we submitted 272 280 * a read for on success, or negative error code. ··· 355 347 return nr_pages; 356 348 357 349 out_pages: 350 + ceph_unlock_page_vector(pages, nr_pages); 358 351 ceph_release_page_vector(pages, nr_pages); 359 352 out: 360 353 ceph_osdc_put_request(req); ··· 1087 1078 struct page **pagep, void **fsdata) 1088 1079 { 1089 1080 struct inode *inode = file->f_dentry->d_inode; 1081 + struct ceph_inode_info *ci = ceph_inode(inode); 1082 + struct ceph_file_info *fi = file->private_data; 1090 1083 struct page *page; 1091 1084 pgoff_t index = pos >> PAGE_CACHE_SHIFT; 1092 - int r; 1085 + int r, want, got = 0; 1086 + 1087 + if (fi->fmode & CEPH_FILE_MODE_LAZY) 1088 + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1089 + else 1090 + want = CEPH_CAP_FILE_BUFFER; 1091 + 1092 + dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n", 1093 + inode, ceph_vinop(inode), pos, len, inode->i_size); 1094 + r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len); 1095 + if (r < 0) 1096 + return r; 1097 + dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n", 1098 + inode, ceph_vinop(inode), pos, len, ceph_cap_string(got)); 1099 + if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) { 1100 + ceph_put_cap_refs(ci, got); 1101 + return -EAGAIN; 1102 + } 1093 1103 1094 1104 do { 1095 1105 /* get a page */ 1096 1106 page = grab_cache_page_write_begin(mapping, index, 0); 1097 - if (!page) 1098 - return -ENOMEM; 1099 - *pagep = page; 1107 + if (!page) { 1108 + r = -ENOMEM; 1109 + break; 1110 + } 1100 1111 1101 1112 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1102 1113 inode, page, (int)pos, (int)len); 1103 1114 1104 1115 r = ceph_update_writeable_page(file, pos, len, page); 1116 + if (r) 1117 + page_cache_release(page); 1105 1118 } while (r == -EAGAIN); 1106 1119 1120 + if (r) { 1121 + ceph_put_cap_refs(ci, got); 1122 + } else { 1123 + *pagep = page; 1124 + *(int *)fsdata = got; 1125 + } 1107 1126 return r; 1108 1127 } 1109 1128 ··· 1145 1108 struct page *page, void *fsdata) 1146 1109 { 1147 1110 struct inode *inode = file->f_dentry->d_inode; 1111 + struct ceph_inode_info *ci = ceph_inode(inode); 1148 1112 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1149 1113 struct ceph_mds_client *mdsc = fsc->mdsc; 1150 1114 unsigned from = pos & (PAGE_CACHE_SIZE - 1); 1151 1115 int check_cap = 0; 1116 + int got = (unsigned long)fsdata; 1152 1117 1153 1118 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, 1154 1119 inode, page, (int)pos, (int)copied, (int)len); ··· 1172 1133 unlock_page(page); 1173 1134 up_read(&mdsc->snap_rwsem); 1174 1135 page_cache_release(page); 1136 + 1137 + if (copied > 0) { 1138 + int dirty; 1139 + spin_lock(&ci->i_ceph_lock); 1140 + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1141 + spin_unlock(&ci->i_ceph_lock); 1142 + if (dirty) 1143 + __mark_inode_dirty(inode, dirty); 1144 + } 1145 + 1146 + dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n", 1147 + inode, ceph_vinop(inode), pos, len, ceph_cap_string(got)); 1148 + ceph_put_cap_refs(ci, got); 1175 1149 1176 1150 if (check_cap) 1177 1151 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
+12 -6
fs/ceph/caps.c
··· 236 236 if (!ctx) { 237 237 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 238 238 if (cap) { 239 + spin_lock(&mdsc->caps_list_lock); 239 240 mdsc->caps_use_count++; 240 241 mdsc->caps_total_count++; 242 + spin_unlock(&mdsc->caps_list_lock); 241 243 } 242 244 return cap; 243 245 } ··· 1351 1349 if (!ci->i_head_snapc) 1352 1350 ci->i_head_snapc = ceph_get_snap_context( 1353 1351 ci->i_snap_realm->cached_context); 1354 - dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode, 1355 - ci->i_head_snapc); 1352 + dout(" inode %p now dirty snapc %p auth cap %p\n", 1353 + &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1356 1354 BUG_ON(!list_empty(&ci->i_dirty_item)); 1357 1355 spin_lock(&mdsc->cap_dirty_lock); 1358 - list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1356 + if (ci->i_auth_cap) 1357 + list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1358 + else 1359 + list_add(&ci->i_dirty_item, 1360 + &mdsc->cap_dirty_migrating); 1359 1361 spin_unlock(&mdsc->cap_dirty_lock); 1360 1362 if (ci->i_flushing_caps == 0) { 1361 1363 ihold(inode); ··· 2394 2388 &atime); 2395 2389 2396 2390 /* max size increase? */ 2397 - if (max_size != ci->i_max_size) { 2391 + if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { 2398 2392 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2399 2393 ci->i_max_size = max_size; 2400 2394 if (max_size >= ci->i_wanted_max_size) { ··· 2751 2745 2752 2746 /* make sure we re-request max_size, if necessary */ 2753 2747 spin_lock(&ci->i_ceph_lock); 2748 + ci->i_wanted_max_size = 0; /* reset */ 2754 2749 ci->i_requested_max_size = 0; 2755 2750 spin_unlock(&ci->i_ceph_lock); 2756 2751 } ··· 2847 2840 case CEPH_CAP_OP_IMPORT: 2848 2841 handle_cap_import(mdsc, inode, h, session, 2849 2842 snaptrace, snaptrace_len); 2850 - ceph_check_caps(ceph_inode(inode), 0, session); 2851 - goto done_unlocked; 2852 2843 } 2853 2844 2854 2845 /* the rest require a cap */ ··· 2863 2858 switch (op) { 2864 2859 case CEPH_CAP_OP_REVOKE: 2865 2860 case CEPH_CAP_OP_GRANT: 2861 + case CEPH_CAP_OP_IMPORT: 2866 2862 handle_cap_grant(inode, h, session, cap, msg->middle); 2867 2863 goto done_unlocked; 2868 2864
+31 -44
fs/ceph/file.c
··· 712 712 struct ceph_osd_client *osdc = 713 713 &ceph_sb_to_client(inode->i_sb)->client->osdc; 714 714 loff_t endoff = pos + iov->iov_len; 715 - int want, got = 0; 716 - int ret, err; 715 + int got = 0; 716 + int ret, err, written; 717 717 718 718 if (ceph_snap(inode) != CEPH_NOSNAP) 719 719 return -EROFS; 720 720 721 721 retry_snap: 722 + written = 0; 722 723 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 723 724 return -ENOSPC; 724 725 __ceph_do_pending_vmtruncate(inode); 725 - dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n", 726 - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 727 - inode->i_size); 728 - if (fi->fmode & CEPH_FILE_MODE_LAZY) 729 - want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 730 - else 731 - want = CEPH_CAP_FILE_BUFFER; 732 - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); 733 - if (ret < 0) 734 - goto out_put; 735 726 736 - dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n", 737 - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 738 - ceph_cap_string(got)); 739 - 740 - if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 741 - (iocb->ki_filp->f_flags & O_DIRECT) || 742 - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || 743 - (fi->flags & CEPH_F_SYNC)) { 744 - ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, 745 - &iocb->ki_pos); 746 - } else { 747 - /* 748 - * buffered write; drop Fw early to avoid slow 749 - * revocation if we get stuck on balance_dirty_pages 750 - */ 751 - int dirty; 752 - 753 - spin_lock(&ci->i_ceph_lock); 754 - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 755 - spin_unlock(&ci->i_ceph_lock); 756 - ceph_put_cap_refs(ci, got); 757 - 727 + /* 728 + * try to do a buffered write. if we don't have sufficient 729 + * caps, we'll get -EAGAIN from generic_file_aio_write, or a 730 + * short write if we only get caps for some pages. 731 + */ 732 + if (!(iocb->ki_filp->f_flags & O_DIRECT) && 733 + !(inode->i_sb->s_flags & MS_SYNCHRONOUS) && 734 + !(fi->flags & CEPH_F_SYNC)) { 758 735 ret = generic_file_aio_write(iocb, iov, nr_segs, pos); 736 + if (ret >= 0) 737 + written = ret; 738 + 759 739 if ((ret >= 0 || ret == -EIOCBQUEUED) && 760 740 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) 761 741 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { 762 - err = vfs_fsync_range(file, pos, pos + ret - 1, 1); 742 + err = vfs_fsync_range(file, pos, pos + written - 1, 1); 763 743 if (err < 0) 764 744 ret = err; 765 745 } 766 - 767 - if (dirty) 768 - __mark_inode_dirty(inode, dirty); 769 - goto out; 746 + if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff) 747 + goto out; 770 748 } 771 749 750 + dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n", 751 + inode, ceph_vinop(inode), pos + written, 752 + (unsigned)iov->iov_len - written, inode->i_size); 753 + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff); 754 + if (ret < 0) 755 + goto out; 756 + 757 + dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n", 758 + inode, ceph_vinop(inode), pos + written, 759 + (unsigned)iov->iov_len - written, ceph_cap_string(got)); 760 + ret = ceph_sync_write(file, iov->iov_base + written, 761 + iov->iov_len - written, &iocb->ki_pos); 772 762 if (ret >= 0) { 773 763 int dirty; 774 764 spin_lock(&ci->i_ceph_lock); ··· 767 777 if (dirty) 768 778 __mark_inode_dirty(inode, dirty); 769 779 } 770 - 771 - out_put: 772 780 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", 773 - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 774 - ceph_cap_string(got)); 781 + inode, ceph_vinop(inode), pos + written, 782 + (unsigned)iov->iov_len - written, ceph_cap_string(got)); 775 783 ceph_put_cap_refs(ci, got); 776 - 777 784 out: 778 785 if (ret == -EOLDSNAPC) { 779 786 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
+9 -6
fs/ceph/inode.c
··· 1466 1466 { 1467 1467 struct ceph_inode_info *ci = ceph_inode(inode); 1468 1468 u64 to; 1469 - int wrbuffer_refs, wake = 0; 1469 + int wrbuffer_refs, finish = 0; 1470 1470 1471 1471 retry: 1472 1472 spin_lock(&ci->i_ceph_lock); ··· 1498 1498 truncate_inode_pages(inode->i_mapping, to); 1499 1499 1500 1500 spin_lock(&ci->i_ceph_lock); 1501 - ci->i_truncate_pending--; 1502 - if (ci->i_truncate_pending == 0) 1503 - wake = 1; 1501 + if (to == ci->i_truncate_size) { 1502 + ci->i_truncate_pending = 0; 1503 + finish = 1; 1504 + } 1504 1505 spin_unlock(&ci->i_ceph_lock); 1506 + if (!finish) 1507 + goto retry; 1505 1508 1506 1509 if (wrbuffer_refs == 0) 1507 1510 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 1508 - if (wake) 1509 - wake_up_all(&ci->i_cap_wq); 1511 + 1512 + wake_up_all(&ci->i_cap_wq); 1510 1513 } 1511 1514 1512 1515
+8 -3
fs/ceph/mds_client.c
··· 1590 1590 } else if (rpath || rino) { 1591 1591 *ino = rino; 1592 1592 *ppath = rpath; 1593 - *pathlen = strlen(rpath); 1593 + *pathlen = rpath ? strlen(rpath) : 0; 1594 1594 dout(" path %.*s\n", *pathlen, rpath); 1595 1595 } 1596 1596 ··· 1876 1876 static void __wake_requests(struct ceph_mds_client *mdsc, 1877 1877 struct list_head *head) 1878 1878 { 1879 - struct ceph_mds_request *req, *nreq; 1879 + struct ceph_mds_request *req; 1880 + LIST_HEAD(tmp_list); 1880 1881 1881 - list_for_each_entry_safe(req, nreq, head, r_wait) { 1882 + list_splice_init(head, &tmp_list); 1883 + 1884 + while (!list_empty(&tmp_list)) { 1885 + req = list_entry(tmp_list.next, 1886 + struct ceph_mds_request, r_wait); 1882 1887 list_del_init(&req->r_wait); 1883 1888 __do_request(mdsc, req); 1884 1889 }
+1 -3
fs/ceph/super.c
··· 403 403 seq_printf(m, ",mount_timeout=%d", opt->mount_timeout); 404 404 if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) 405 405 seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl); 406 - if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT) 407 - seq_printf(m, ",osdtimeout=%d", opt->osd_timeout); 408 406 if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) 409 407 seq_printf(m, ",osdkeepalivetimeout=%d", 410 408 opt->osd_keepalive_timeout); ··· 847 849 fsc->backing_dev_info.ra_pages = 848 850 default_backing_dev_info.ra_pages; 849 851 850 - err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d", 852 + err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld", 851 853 atomic_long_inc_return(&bdi_seq)); 852 854 if (!err) 853 855 sb->s_bdi = &fsc->backing_dev_info;
+1
include/linux/backing-dev.h
··· 114 114 int bdi_init(struct backing_dev_info *bdi); 115 115 void bdi_destroy(struct backing_dev_info *bdi); 116 116 117 + __printf(3, 4) 117 118 int bdi_register(struct backing_dev_info *bdi, struct device *parent, 118 119 const char *fmt, ...); 119 120 int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
-2
include/linux/ceph/libceph.h
··· 43 43 struct ceph_entity_addr my_addr; 44 44 int mount_timeout; 45 45 int osd_idle_ttl; 46 - int osd_timeout; 47 46 int osd_keepalive_timeout; 48 47 49 48 /* ··· 62 63 * defaults 63 64 */ 64 65 #define CEPH_MOUNT_TIMEOUT_DEFAULT 60 65 - #define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */ 66 66 #define CEPH_OSD_KEEPALIVE_DEFAULT 5 67 67 #define CEPH_OSD_IDLE_TTL_DEFAULT 60 68 68
+1
include/linux/ceph/osdmap.h
··· 123 123 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, 124 124 struct ceph_pg pgid); 125 125 126 + extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id); 126 127 extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); 127 128 128 129 #endif
+2
include/linux/ceph/rados.h
··· 87 87 * 88 88 * lpgp_num -- as above. 89 89 */ 90 + #define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */ 91 + 90 92 #define CEPH_PG_TYPE_REP 1 91 93 #define CEPH_PG_TYPE_RAID4 2 92 94 #define CEPH_PG_POOL_VERSION 2
+1 -2
net/ceph/ceph_common.c
··· 305 305 306 306 /* start with defaults */ 307 307 opt->flags = CEPH_OPT_DEFAULT; 308 - opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT; 309 308 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; 310 309 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ 311 310 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ ··· 390 391 391 392 /* misc */ 392 393 case Opt_osdtimeout: 393 - opt->osd_timeout = intval; 394 + pr_warning("ignoring deprecated osdtimeout option\n"); 394 395 break; 395 396 case Opt_osdkeepalivetimeout: 396 397 opt->osd_keepalive_timeout = intval;
+56 -51
net/ceph/messenger.c
··· 2244 2244 2245 2245 2246 2246 /* 2247 - * Atomically queue work on a connection. Bump @con reference to 2248 - * avoid races with connection teardown. 2247 + * Atomically queue work on a connection after the specified delay. 2248 + * Bump @con reference to avoid races with connection teardown. 2249 + * Returns 0 if work was queued, or an error code otherwise. 2249 2250 */ 2250 - static void queue_con(struct ceph_connection *con) 2251 + static int queue_con_delay(struct ceph_connection *con, unsigned long delay) 2251 2252 { 2252 2253 if (!con->ops->get(con)) { 2253 - dout("queue_con %p ref count 0\n", con); 2254 - return; 2254 + dout("%s %p ref count 0\n", __func__, con); 2255 + 2256 + return -ENOENT; 2255 2257 } 2256 2258 2257 - if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2258 - dout("queue_con %p - already queued\n", con); 2259 + if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { 2260 + dout("%s %p - already queued\n", __func__, con); 2259 2261 con->ops->put(con); 2260 - } else { 2261 - dout("queue_con %p\n", con); 2262 + 2263 + return -EBUSY; 2262 2264 } 2265 + 2266 + dout("%s %p %lu\n", __func__, con, delay); 2267 + 2268 + return 0; 2269 + } 2270 + 2271 + static void queue_con(struct ceph_connection *con) 2272 + { 2273 + (void) queue_con_delay(con, 0); 2274 + } 2275 + 2276 + static bool con_sock_closed(struct ceph_connection *con) 2277 + { 2278 + if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) 2279 + return false; 2280 + 2281 + #define CASE(x) \ 2282 + case CON_STATE_ ## x: \ 2283 + con->error_msg = "socket closed (con state " #x ")"; \ 2284 + break; 2285 + 2286 + switch (con->state) { 2287 + CASE(CLOSED); 2288 + CASE(PREOPEN); 2289 + CASE(CONNECTING); 2290 + CASE(NEGOTIATING); 2291 + CASE(OPEN); 2292 + CASE(STANDBY); 2293 + default: 2294 + pr_warning("%s con %p unrecognized state %lu\n", 2295 + __func__, con, con->state); 2296 + con->error_msg = "unrecognized con state"; 2297 + BUG(); 2298 + break; 2299 + } 2300 + #undef CASE 2301 + 2302 + return true; 2263 2303 } 2264 2304 2265 2305 /* ··· 2313 2273 2314 2274 mutex_lock(&con->mutex); 2315 2275 restart: 2316 - if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { 2317 - switch (con->state) { 2318 - case CON_STATE_CONNECTING: 2319 - con->error_msg = "connection failed"; 2320 - break; 2321 - case CON_STATE_NEGOTIATING: 2322 - con->error_msg = "negotiation failed"; 2323 - break; 2324 - case CON_STATE_OPEN: 2325 - con->error_msg = "socket closed"; 2326 - break; 2327 - default: 2328 - dout("unrecognized con state %d\n", (int)con->state); 2329 - con->error_msg = "unrecognized con state"; 2330 - BUG(); 2331 - } 2276 + if (con_sock_closed(con)) 2332 2277 goto fault; 2333 - } 2334 2278 2335 2279 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { 2336 2280 dout("con_work %p backing off\n", con); 2337 - if (queue_delayed_work(ceph_msgr_wq, &con->work, 2338 - round_jiffies_relative(con->delay))) { 2339 - dout("con_work %p backoff %lu\n", con, con->delay); 2340 - mutex_unlock(&con->mutex); 2341 - return; 2342 - } else { 2281 + ret = queue_con_delay(con, round_jiffies_relative(con->delay)); 2282 + if (ret) { 2343 2283 dout("con_work %p FAILED to back off %lu\n", con, 2344 2284 con->delay); 2285 + BUG_ON(ret == -ENOENT); 2345 2286 set_bit(CON_FLAG_BACKOFF, &con->flags); 2346 2287 } 2347 2288 goto done; ··· 2377 2356 static void ceph_fault(struct ceph_connection *con) 2378 2357 __releases(con->mutex) 2379 2358 { 2380 - pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2359 + pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2381 2360 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2382 2361 dout("fault %p state %lu to peer %s\n", 2383 2362 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); ··· 2419 2398 con->delay = BASE_DELAY_INTERVAL; 2420 2399 else if (con->delay < MAX_DELAY_INTERVAL) 2421 2400 con->delay *= 2; 2422 - con->ops->get(con); 2423 - if (queue_delayed_work(ceph_msgr_wq, &con->work, 2424 - round_jiffies_relative(con->delay))) { 2425 - dout("fault queued %p delay %lu\n", con, con->delay); 2426 - } else { 2427 - con->ops->put(con); 2428 - dout("fault failed to queue %p delay %lu, backoff\n", 2429 - con, con->delay); 2430 - /* 2431 - * In many cases we see a socket state change 2432 - * while con_work is running and end up 2433 - * queuing (non-delayed) work, such that we 2434 - * can't backoff with a delay. Set a flag so 2435 - * that when con_work restarts we schedule the 2436 - * delay then. 2437 - */ 2438 - set_bit(CON_FLAG_BACKOFF, &con->flags); 2439 - } 2401 + set_bit(CON_FLAG_BACKOFF, &con->flags); 2402 + queue_con(con); 2440 2403 } 2441 2404 2442 2405 out_unlock:
+12 -47
net/ceph/osd_client.c
··· 221 221 kref_init(&req->r_kref); 222 222 init_completion(&req->r_completion); 223 223 init_completion(&req->r_safe_completion); 224 + RB_CLEAR_NODE(&req->r_node); 224 225 INIT_LIST_HEAD(&req->r_unsafe_item); 225 226 INIT_LIST_HEAD(&req->r_linger_item); 226 227 INIT_LIST_HEAD(&req->r_linger_osd); ··· 581 580 582 581 dout("__kick_osd_requests osd%d\n", osd->o_osd); 583 582 err = __reset_osd(osdc, osd); 584 - if (err == -EAGAIN) 583 + if (err) 585 584 return; 586 585 587 586 list_for_each_entry(req, &osd->o_requests, r_osd_item) { ··· 608 607 } 609 608 } 610 609 611 - static void kick_osd_requests(struct ceph_osd_client *osdc, 612 - struct ceph_osd *kickosd) 613 - { 614 - mutex_lock(&osdc->request_mutex); 615 - __kick_osd_requests(osdc, kickosd); 616 - mutex_unlock(&osdc->request_mutex); 617 - } 618 - 619 610 /* 620 611 * If the osd connection drops, we need to resubmit all requests. 621 612 */ ··· 621 628 dout("osd_reset osd%d\n", osd->o_osd); 622 629 osdc = osd->o_osdc; 623 630 down_read(&osdc->map_sem); 624 - kick_osd_requests(osdc, osd); 631 + mutex_lock(&osdc->request_mutex); 632 + __kick_osd_requests(osdc, osd); 633 + mutex_unlock(&osdc->request_mutex); 625 634 send_queued(osdc); 626 635 up_read(&osdc->map_sem); 627 636 } ··· 642 647 atomic_set(&osd->o_ref, 1); 643 648 osd->o_osdc = osdc; 644 649 osd->o_osd = onum; 650 + RB_CLEAR_NODE(&osd->o_node); 645 651 INIT_LIST_HEAD(&osd->o_requests); 646 652 INIT_LIST_HEAD(&osd->o_linger_requests); 647 653 INIT_LIST_HEAD(&osd->o_osd_lru); ··· 746 750 if (list_empty(&osd->o_requests) && 747 751 list_empty(&osd->o_linger_requests)) { 748 752 __remove_osd(osdc, osd); 753 + ret = -ENODEV; 749 754 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 750 755 &osd->o_con.peer_addr, 751 756 sizeof(osd->o_con.peer_addr)) == 0 && ··· 873 876 req->r_osd = NULL; 874 877 } 875 878 879 + list_del_init(&req->r_req_lru_item); 876 880 ceph_osdc_put_request(req); 877 881 878 - list_del_init(&req->r_req_lru_item); 879 882 if (osdc->num_requests == 0) { 880 883 dout(" no requests, canceling timeout\n"); 881 884 __cancel_osd_timeout(osdc); ··· 907 910 struct ceph_osd_request *req) 908 911 { 909 912 dout("__unregister_linger_request %p\n", req); 913 + list_del_init(&req->r_linger_item); 910 914 if (req->r_osd) { 911 - list_del_init(&req->r_linger_item); 912 915 list_del_init(&req->r_linger_osd); 913 916 914 917 if (list_empty(&req->r_osd->o_requests) && ··· 1087 1090 { 1088 1091 struct ceph_osd_client *osdc = 1089 1092 container_of(work, struct ceph_osd_client, timeout_work.work); 1090 - struct ceph_osd_request *req, *last_req = NULL; 1093 + struct ceph_osd_request *req; 1091 1094 struct ceph_osd *osd; 1092 - unsigned long timeout = osdc->client->options->osd_timeout * HZ; 1093 1095 unsigned long keepalive = 1094 1096 osdc->client->options->osd_keepalive_timeout * HZ; 1095 - unsigned long last_stamp = 0; 1096 1097 struct list_head slow_osds; 1097 1098 dout("timeout\n"); 1098 1099 down_read(&osdc->map_sem); ··· 1098 1103 ceph_monc_request_next_osdmap(&osdc->client->monc); 1099 1104 1100 1105 mutex_lock(&osdc->request_mutex); 1101 - 1102 - /* 1103 - * reset osds that appear to be _really_ unresponsive. this 1104 - * is a failsafe measure.. we really shouldn't be getting to 1105 - * this point if the system is working properly. the monitors 1106 - * should mark the osd as failed and we should find out about 1107 - * it from an updated osd map. 1108 - */ 1109 - while (timeout && !list_empty(&osdc->req_lru)) { 1110 - req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 1111 - r_req_lru_item); 1112 - 1113 - /* hasn't been long enough since we sent it? */ 1114 - if (time_before(jiffies, req->r_stamp + timeout)) 1115 - break; 1116 - 1117 - /* hasn't been long enough since it was acked? */ 1118 - if (req->r_request->ack_stamp == 0 || 1119 - time_before(jiffies, req->r_request->ack_stamp + timeout)) 1120 - break; 1121 - 1122 - BUG_ON(req == last_req && req->r_stamp == last_stamp); 1123 - last_req = req; 1124 - last_stamp = req->r_stamp; 1125 - 1126 - osd = req->r_osd; 1127 - BUG_ON(!osd); 1128 - pr_warning(" tid %llu timed out on osd%d, will reset osd\n", 1129 - req->r_tid, osd->o_osd); 1130 - __kick_osd_requests(osdc, osd); 1131 - } 1132 1106 1133 1107 /* 1134 1108 * ping osds that are a bit slow. this ensures that if there ··· 1328 1364 1329 1365 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1330 1366 req->r_osd ? req->r_osd->o_osd : -1); 1331 - __unregister_linger_request(osdc, req); 1332 1367 __register_request(osdc, req); 1368 + __unregister_linger_request(osdc, req); 1333 1369 } 1334 1370 mutex_unlock(&osdc->request_mutex); 1335 1371 ··· 1563 1599 event->data = data; 1564 1600 event->osdc = osdc; 1565 1601 INIT_LIST_HEAD(&event->osd_node); 1602 + RB_CLEAR_NODE(&event->node); 1566 1603 kref_init(&event->kref); /* one ref for us */ 1567 1604 kref_get(&event->kref); /* one ref for the caller */ 1568 1605 init_completion(&event->completion);
+36 -11
net/ceph/osdmap.c
··· 469 469 return NULL; 470 470 } 471 471 472 + const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 473 + { 474 + struct ceph_pg_pool_info *pi; 475 + 476 + if (id == CEPH_NOPOOL) 477 + return NULL; 478 + 479 + if (WARN_ON_ONCE(id > (u64) INT_MAX)) 480 + return NULL; 481 + 482 + pi = __lookup_pg_pool(&map->pg_pools, (int) id); 483 + 484 + return pi ? pi->name : NULL; 485 + } 486 + EXPORT_SYMBOL(ceph_pg_pool_name_by_id); 487 + 472 488 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 473 489 { 474 490 struct rb_node *rbp; ··· 661 645 ceph_decode_32_safe(p, end, max, bad); 662 646 while (max--) { 663 647 ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); 648 + err = -ENOMEM; 664 649 pi = kzalloc(sizeof(*pi), GFP_NOFS); 665 650 if (!pi) 666 651 goto bad; 667 652 pi->id = ceph_decode_32(p); 653 + err = -EINVAL; 668 654 ev = ceph_decode_8(p); /* encoding version */ 669 655 if (ev > CEPH_PG_POOL_VERSION) { 670 656 pr_warning("got unknown v %d > %d of ceph_pg_pool\n", ··· 682 664 __insert_pg_pool(&map->pg_pools, pi); 683 665 } 684 666 685 - if (version >= 5 && __decode_pool_names(p, end, map) < 0) 686 - goto bad; 667 + if (version >= 5) { 668 + err = __decode_pool_names(p, end, map); 669 + if (err < 0) { 670 + dout("fail to decode pool names"); 671 + goto bad; 672 + } 673 + } 687 674 688 675 ceph_decode_32_safe(p, end, map->pool_max, bad); 689 676 ··· 768 745 return map; 769 746 770 747 bad: 771 - dout("osdmap_decode fail\n"); 748 + dout("osdmap_decode fail err %d\n", err); 772 749 ceph_osdmap_destroy(map); 773 750 return ERR_PTR(err); 774 751 } ··· 862 839 if (ev > CEPH_PG_POOL_VERSION) { 863 840 pr_warning("got unknown v %d > %d of ceph_pg_pool\n", 864 841 ev, CEPH_PG_POOL_VERSION); 842 + err = -EINVAL; 865 843 goto bad; 866 844 } 867 845 pi = __lookup_pg_pool(&map->pg_pools, pool); ··· 879 855 if (err < 0) 880 856 goto bad; 881 857 } 882 - if (version >= 5 && __decode_pool_names(p, end, map) < 0) 883 - goto bad; 858 + if (version >= 5) { 859 + err = __decode_pool_names(p, end, map); 860 + if (err < 0) 861 + goto bad; 862 + } 884 863 885 864 /* old_pool */ 886 865 ceph_decode_32_safe(p, end, len, bad); ··· 959 932 (void) __remove_pg_mapping(&map->pg_temp, pgid); 960 933 961 934 /* insert */ 962 - if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) { 963 - err = -EINVAL; 935 + err = -EINVAL; 936 + if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) 964 937 goto bad; 965 - } 938 + err = -ENOMEM; 966 939 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 967 - if (!pg) { 968 - err = -ENOMEM; 940 + if (!pg) 969 941 goto bad; 970 - } 971 942 pg->pgid = pgid; 972 943 pg->len = pglen; 973 944 for (j = 0; j < pglen; j++)