Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph updates from Sage Weil:
"There are some updates and cleanups to the CRUSH placement code, a bug
fix with incremental maps, several cleanups and fixes from Josh Durgin
in the RBD block device code, a series of cleanups and bug fixes from
Alex Elder in the messenger code, and some miscellaneous bounds
checking and gfp cleanups/fixes."

Fix up trivial conflicts in net/ceph/{messenger.c,osdmap.c} due to the
networking people preferring "unsigned int" over just "unsigned".

* git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (45 commits)
libceph: fix pg_temp updates
libceph: avoid unregistering osd request when not registered
ceph: add auth buf in prepare_write_connect()
ceph: rename prepare_connect_authorizer()
ceph: return pointer from prepare_connect_authorizer()
ceph: use info returned by get_authorizer
ceph: have get_authorizer methods return pointers
ceph: ensure auth ops are defined before use
ceph: messenger: reduce args to create_authorizer
ceph: define ceph_auth_handshake type
ceph: messenger: check return from get_authorizer
ceph: messenger: rework prepare_connect_authorizer()
ceph: messenger: check prepare_write_connect() result
ceph: don't set WRITE_PENDING too early
ceph: drop msgr argument from prepare_write_connect()
ceph: messenger: send banner in process_connect()
ceph: messenger: reset connection kvec caller
libceph: don't reset kvec in prepare_write_banner()
ceph: ignore preferred_osd field
ceph: fully initialize new layout
...

+373 -450
+2 -2
Documentation/ABI/testing/sysfs-bus-rbd
··· 65 65 Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name> 66 66 ------------------------------------------------------------- 67 67 68 - id 68 + snap_id 69 69 70 70 The rados internal snapshot id assigned for this snapshot 71 71 72 - size 72 + snap_size 73 73 74 74 The size of the image when this snapshot was taken. 75 75
+29 -43
drivers/block/rbd.c
··· 141 141 struct rbd_snap { 142 142 struct device dev; 143 143 const char *name; 144 - size_t size; 144 + u64 size; 145 145 struct list_head node; 146 146 u64 id; 147 147 }; ··· 175 175 /* protects updating the header */ 176 176 struct rw_semaphore header_rwsem; 177 177 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 178 - u32 cur_snap; /* index+1 of current snapshot within snap context 179 - 0 - for the head */ 178 + u64 snap_id; /* current snapshot id */ 180 179 int read_only; 181 180 182 181 struct list_head node; ··· 240 241 put_device(&rbd_dev->dev); 241 242 } 242 243 243 - static int __rbd_update_snaps(struct rbd_device *rbd_dev); 244 + static int __rbd_refresh_header(struct rbd_device *rbd_dev); 244 245 245 246 static int rbd_open(struct block_device *bdev, fmode_t mode) 246 247 { ··· 449 450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 450 451 451 452 dout("rbd_release_client %p\n", rbdc); 453 + spin_lock(&rbd_client_list_lock); 452 454 list_del(&rbdc->node); 455 + spin_unlock(&rbd_client_list_lock); 453 456 454 457 ceph_destroy_client(rbdc->client); 455 458 kfree(rbdc->rbd_opts); ··· 464 463 */ 465 464 static void rbd_put_client(struct rbd_device *rbd_dev) 466 465 { 467 - spin_lock(&rbd_client_list_lock); 468 466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 469 - spin_unlock(&rbd_client_list_lock); 470 467 rbd_dev->rbd_client = NULL; 471 468 } 472 469 ··· 486 487 */ 487 488 static int rbd_header_from_disk(struct rbd_image_header *header, 488 489 struct rbd_image_header_ondisk *ondisk, 489 - int allocated_snaps, 490 + u32 allocated_snaps, 490 491 gfp_t gfp_flags) 491 492 { 492 - int i; 493 - u32 snap_count; 493 + u32 i, snap_count; 494 494 495 495 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) 496 496 return -ENXIO; 497 497 498 498 snap_count = le32_to_cpu(ondisk->snap_count); 499 + if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context)) 500 + / sizeof (*ondisk)) 501 + return -EINVAL; 499 502 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 500 503 snap_count * sizeof (*ondisk), 501 504 gfp_flags); ··· 507 506 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); 508 507 if (snap_count) { 509 508 header->snap_names = kmalloc(header->snap_names_len, 510 - GFP_KERNEL); 509 + gfp_flags); 511 510 if (!header->snap_names) 512 511 goto err_snapc; 513 512 header->snap_sizes = kmalloc(snap_count * sizeof(u64), 514 - GFP_KERNEL); 513 + gfp_flags); 515 514 if (!header->snap_sizes) 516 515 goto err_names; 517 516 } else { ··· 553 552 return -ENOMEM; 554 553 } 555 554 556 - static int snap_index(struct rbd_image_header *header, int snap_num) 557 - { 558 - return header->total_snaps - snap_num; 559 - } 560 - 561 - static u64 cur_snap_id(struct rbd_device *rbd_dev) 562 - { 563 - struct rbd_image_header *header = &rbd_dev->header; 564 - 565 - if (!rbd_dev->cur_snap) 566 - return 0; 567 - 568 - return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; 569 - } 570 - 571 555 static int snap_by_name(struct rbd_image_header *header, const char *snap_name, 572 556 u64 *seq, u64 *size) 573 557 { ··· 591 605 snapc->seq = header->snap_seq; 592 606 else 593 607 snapc->seq = 0; 594 - dev->cur_snap = 0; 608 + dev->snap_id = CEPH_NOSNAP; 595 609 dev->read_only = 0; 596 610 if (size) 597 611 *size = header->image_size; ··· 599 613 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size); 600 614 if (ret < 0) 601 615 goto done; 602 - 603 - dev->cur_snap = header->total_snaps - ret; 616 + dev->snap_id = snapc->seq; 604 617 dev->read_only = 1; 605 618 } 606 619 ··· 920 935 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 921 936 layout->fl_stripe_count = cpu_to_le32(1); 922 937 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 923 - layout->fl_pg_preferred = cpu_to_le32(-1); 924 938 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 925 939 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 926 940 req, ops); ··· 1152 1168 int coll_index) 1153 1169 { 1154 1170 return rbd_do_op(rq, rbd_dev, NULL, 1155 - (snapid ? snapid : CEPH_NOSNAP), 1171 + snapid, 1156 1172 CEPH_OSD_OP_READ, 1157 1173 CEPH_OSD_FLAG_READ, 1158 1174 2, ··· 1171 1187 u64 *ver) 1172 1188 { 1173 1189 return rbd_req_sync_op(dev, NULL, 1174 - (snapid ? snapid : CEPH_NOSNAP), 1190 + snapid, 1175 1191 CEPH_OSD_OP_READ, 1176 1192 CEPH_OSD_FLAG_READ, 1177 1193 NULL, ··· 1222 1238 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1223 1239 notify_id, (int)opcode); 1224 1240 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1225 - rc = __rbd_update_snaps(dev); 1241 + rc = __rbd_refresh_header(dev); 1226 1242 mutex_unlock(&ctl_mutex); 1227 1243 if (rc) 1228 1244 pr_warning(RBD_DRV_NAME "%d got notification but failed to " ··· 1505 1521 coll, cur_seg); 1506 1522 else 1507 1523 rbd_req_read(rq, rbd_dev, 1508 - cur_snap_id(rbd_dev), 1524 + rbd_dev->snap_id, 1509 1525 ofs, 1510 1526 op_size, bio, 1511 1527 coll, cur_seg); ··· 1576 1592 { 1577 1593 ssize_t rc; 1578 1594 struct rbd_image_header_ondisk *dh; 1579 - int snap_count = 0; 1595 + u32 snap_count = 0; 1580 1596 u64 ver; 1581 1597 size_t len; 1582 1598 ··· 1640 1656 struct ceph_mon_client *monc; 1641 1657 1642 1658 /* we should create a snapshot only if we're pointing at the head */ 1643 - if (dev->cur_snap) 1659 + if (dev->snap_id != CEPH_NOSNAP) 1644 1660 return -EINVAL; 1645 1661 1646 1662 monc = &dev->rbd_client->client->monc; ··· 1667 1683 if (ret < 0) 1668 1684 return ret; 1669 1685 1670 - dev->header.snapc->seq = new_snapid; 1686 + down_write(&dev->header_rwsem); 1687 + dev->header.snapc->seq = new_snapid; 1688 + up_write(&dev->header_rwsem); 1671 1689 1672 1690 return 0; 1673 1691 bad: ··· 1689 1703 /* 1690 1704 * only read the first part of the ondisk header, without the snaps info 1691 1705 */ 1692 - static int __rbd_update_snaps(struct rbd_device *rbd_dev) 1706 + static int __rbd_refresh_header(struct rbd_device *rbd_dev) 1693 1707 { 1694 1708 int ret; 1695 1709 struct rbd_image_header h; ··· 1876 1890 1877 1891 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1878 1892 1879 - rc = __rbd_update_snaps(rbd_dev); 1893 + rc = __rbd_refresh_header(rbd_dev); 1880 1894 if (rc < 0) 1881 1895 ret = rc; 1882 1896 ··· 1935 1949 { 1936 1950 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1937 1951 1938 - return sprintf(buf, "%zd\n", snap->size); 1952 + return sprintf(buf, "%llu\n", (unsigned long long)snap->size); 1939 1953 } 1940 1954 1941 1955 static ssize_t rbd_snap_id_show(struct device *dev, ··· 1944 1958 { 1945 1959 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1946 1960 1947 - return sprintf(buf, "%llu\n", (unsigned long long) snap->id); 1961 + return sprintf(buf, "%llu\n", (unsigned long long)snap->id); 1948 1962 } 1949 1963 1950 1964 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); ··· 2159 2173 rbd_dev->header.obj_version); 2160 2174 if (ret == -ERANGE) { 2161 2175 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2162 - rc = __rbd_update_snaps(rbd_dev); 2176 + rc = __rbd_refresh_header(rbd_dev); 2163 2177 mutex_unlock(&ctl_mutex); 2164 2178 if (rc < 0) 2165 2179 return rc; ··· 2544 2558 if (ret < 0) 2545 2559 goto err_unlock; 2546 2560 2547 - ret = __rbd_update_snaps(rbd_dev); 2561 + ret = __rbd_refresh_header(rbd_dev); 2548 2562 if (ret < 0) 2549 2563 goto err_unlock; 2550 2564
-1
fs/ceph/file.c
··· 54 54 req->r_fmode = ceph_flags_to_mode(flags); 55 55 req->r_args.open.flags = cpu_to_le32(flags); 56 56 req->r_args.open.mode = cpu_to_le32(create_mode); 57 - req->r_args.open.preferred = cpu_to_le32(-1); 58 57 out: 59 58 return req; 60 59 }
+47 -55
fs/ceph/ioctl.c
··· 26 26 l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); 27 27 l.object_size = ceph_file_layout_object_size(ci->i_layout); 28 28 l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); 29 - l.preferred_osd = 30 - (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); 29 + l.preferred_osd = (s32)-1; 31 30 if (copy_to_user(arg, &l, sizeof(l))) 32 31 return -EFAULT; 33 32 } 34 33 35 34 return err; 35 + } 36 + 37 + static long __validate_layout(struct ceph_mds_client *mdsc, 38 + struct ceph_ioctl_layout *l) 39 + { 40 + int i, err; 41 + 42 + /* validate striping parameters */ 43 + if ((l->object_size & ~PAGE_MASK) || 44 + (l->stripe_unit & ~PAGE_MASK) || 45 + ((unsigned)l->object_size % (unsigned)l->stripe_unit)) 46 + return -EINVAL; 47 + 48 + /* make sure it's a valid data pool */ 49 + mutex_lock(&mdsc->mutex); 50 + err = -EINVAL; 51 + for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) 52 + if (mdsc->mdsmap->m_data_pg_pools[i] == l->data_pool) { 53 + err = 0; 54 + break; 55 + } 56 + mutex_unlock(&mdsc->mutex); 57 + if (err) 58 + return err; 59 + 60 + return 0; 36 61 } 37 62 38 63 static long ceph_ioctl_set_layout(struct file *file, void __user *arg) ··· 69 44 struct ceph_ioctl_layout l; 70 45 struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode); 71 46 struct ceph_ioctl_layout nl; 72 - int err, i; 47 + int err; 73 48 74 49 if (copy_from_user(&l, arg, sizeof(l))) 75 50 return -EFAULT; 76 51 77 52 /* validate changed params against current layout */ 78 53 err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); 79 - if (!err) { 80 - nl.stripe_unit = ceph_file_layout_su(ci->i_layout); 81 - nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); 82 - nl.object_size = ceph_file_layout_object_size(ci->i_layout); 83 - nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); 84 - nl.preferred_osd = 85 - (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); 86 - } else 54 + if (err) 87 55 return err; 88 56 57 + memset(&nl, 0, sizeof(nl)); 89 58 if (l.stripe_count) 90 59 nl.stripe_count = l.stripe_count; 60 + else 61 + nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); 91 62 if (l.stripe_unit) 92 63 nl.stripe_unit = l.stripe_unit; 64 + else 65 + nl.stripe_unit = ceph_file_layout_su(ci->i_layout); 93 66 if (l.object_size) 94 67 nl.object_size = l.object_size; 68 + else 69 + nl.object_size = ceph_file_layout_object_size(ci->i_layout); 95 70 if (l.data_pool) 96 71 nl.data_pool = l.data_pool; 97 - if (l.preferred_osd) 98 - nl.preferred_osd = l.preferred_osd; 72 + else 73 + nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout); 99 74 100 - if ((nl.object_size & ~PAGE_MASK) || 101 - (nl.stripe_unit & ~PAGE_MASK) || 102 - ((unsigned)nl.object_size % (unsigned)nl.stripe_unit)) 103 - return -EINVAL; 75 + /* this is obsolete, and always -1 */ 76 + nl.preferred_osd = le64_to_cpu(-1); 104 77 105 - /* make sure it's a valid data pool */ 106 - if (l.data_pool > 0) { 107 - mutex_lock(&mdsc->mutex); 108 - err = -EINVAL; 109 - for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) 110 - if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) { 111 - err = 0; 112 - break; 113 - } 114 - mutex_unlock(&mdsc->mutex); 115 - if (err) 116 - return err; 117 - } 78 + err = __validate_layout(mdsc, &nl); 79 + if (err) 80 + return err; 118 81 119 82 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETLAYOUT, 120 83 USE_AUTH_MDS); ··· 119 106 req->r_args.setlayout.layout.fl_object_size = 120 107 cpu_to_le32(l.object_size); 121 108 req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); 122 - req->r_args.setlayout.layout.fl_pg_preferred = 123 - cpu_to_le32(l.preferred_osd); 124 109 125 110 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 126 111 err = ceph_mdsc_do_request(mdsc, parent_inode, req); ··· 138 127 struct inode *inode = file->f_dentry->d_inode; 139 128 struct ceph_mds_request *req; 140 129 struct ceph_ioctl_layout l; 141 - int err, i; 130 + int err; 142 131 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 143 132 144 133 /* copy and validate */ 145 134 if (copy_from_user(&l, arg, sizeof(l))) 146 135 return -EFAULT; 147 136 148 - if ((l.object_size & ~PAGE_MASK) || 149 - (l.stripe_unit & ~PAGE_MASK) || 150 - !l.stripe_unit || 151 - (l.object_size && 152 - (unsigned)l.object_size % (unsigned)l.stripe_unit)) 153 - return -EINVAL; 154 - 155 - /* make sure it's a valid data pool */ 156 - if (l.data_pool > 0) { 157 - mutex_lock(&mdsc->mutex); 158 - err = -EINVAL; 159 - for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) 160 - if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) { 161 - err = 0; 162 - break; 163 - } 164 - mutex_unlock(&mdsc->mutex); 165 - if (err) 166 - return err; 167 - } 137 + err = __validate_layout(mdsc, &l); 138 + if (err) 139 + return err; 168 140 169 141 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETDIRLAYOUT, 170 142 USE_AUTH_MDS); ··· 165 171 cpu_to_le32(l.object_size); 166 172 req->r_args.setlayout.layout.fl_pg_pool = 167 173 cpu_to_le32(l.data_pool); 168 - req->r_args.setlayout.layout.fl_pg_preferred = 169 - cpu_to_le32(l.preferred_osd); 170 174 171 175 err = ceph_mdsc_do_request(mdsc, inode, req); 172 176 ceph_mdsc_put_request(req);
+2
fs/ceph/ioctl.h
··· 34 34 struct ceph_ioctl_layout { 35 35 __u64 stripe_unit, stripe_count, object_size; 36 36 __u64 data_pool; 37 + 38 + /* obsolete. new values ignored, always return -1 */ 37 39 __s64 preferred_osd; 38 40 }; 39 41
+22 -28
fs/ceph/mds_client.c
··· 334 334 dout("mdsc put_session %p %d -> %d\n", s, 335 335 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 336 336 if (atomic_dec_and_test(&s->s_ref)) { 337 - if (s->s_authorizer) 337 + if (s->s_auth.authorizer) 338 338 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( 339 339 s->s_mdsc->fsc->client->monc.auth, 340 - s->s_authorizer); 340 + s->s_auth.authorizer); 341 341 kfree(s); 342 342 } 343 343 } ··· 3395 3395 /* 3396 3396 * authentication 3397 3397 */ 3398 - static int get_authorizer(struct ceph_connection *con, 3399 - void **buf, int *len, int *proto, 3400 - void **reply_buf, int *reply_len, int force_new) 3398 + 3399 + /* 3400 + * Note: returned pointer is the address of a structure that's 3401 + * managed separately. Caller must *not* attempt to free it. 3402 + */ 3403 + static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 3404 + int *proto, int force_new) 3401 3405 { 3402 3406 struct ceph_mds_session *s = con->private; 3403 3407 struct ceph_mds_client *mdsc = s->s_mdsc; 3404 3408 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3405 - int ret = 0; 3409 + struct ceph_auth_handshake *auth = &s->s_auth; 3406 3410 3407 - if (force_new && s->s_authorizer) { 3408 - ac->ops->destroy_authorizer(ac, s->s_authorizer); 3409 - s->s_authorizer = NULL; 3411 + if (force_new && auth->authorizer) { 3412 + if (ac->ops && ac->ops->destroy_authorizer) 3413 + ac->ops->destroy_authorizer(ac, auth->authorizer); 3414 + auth->authorizer = NULL; 3410 3415 } 3411 - if (s->s_authorizer == NULL) { 3412 - if (ac->ops->create_authorizer) { 3413 - ret = ac->ops->create_authorizer( 3414 - ac, CEPH_ENTITY_TYPE_MDS, 3415 - &s->s_authorizer, 3416 - &s->s_authorizer_buf, 3417 - &s->s_authorizer_buf_len, 3418 - &s->s_authorizer_reply_buf, 3419 - &s->s_authorizer_reply_buf_len); 3420 - if (ret) 3421 - return ret; 3422 - } 3416 + if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 3417 + int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 3418 + auth); 3419 + if (ret) 3420 + return ERR_PTR(ret); 3423 3421 } 3424 - 3425 3422 *proto = ac->protocol; 3426 - *buf = s->s_authorizer_buf; 3427 - *len = s->s_authorizer_buf_len; 3428 - *reply_buf = s->s_authorizer_reply_buf; 3429 - *reply_len = s->s_authorizer_reply_buf_len; 3430 - return 0; 3423 + 3424 + return auth; 3431 3425 } 3432 3426 3433 3427 ··· 3431 3437 struct ceph_mds_client *mdsc = s->s_mdsc; 3432 3438 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3433 3439 3434 - return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); 3440 + return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len); 3435 3441 } 3436 3442 3437 3443 static int invalidate_authorizer(struct ceph_connection *con)
+2 -3
fs/ceph/mds_client.h
··· 11 11 #include <linux/ceph/types.h> 12 12 #include <linux/ceph/messenger.h> 13 13 #include <linux/ceph/mdsmap.h> 14 + #include <linux/ceph/auth.h> 14 15 15 16 /* 16 17 * Some lock dependencies: ··· 114 113 115 114 struct ceph_connection s_con; 116 115 117 - struct ceph_authorizer *s_authorizer; 118 - void *s_authorizer_buf, *s_authorizer_reply_buf; 119 - size_t s_authorizer_buf_len, s_authorizer_reply_buf_len; 116 + struct ceph_auth_handshake s_auth; 120 117 121 118 /* protected by s_gen_ttl_lock */ 122 119 spinlock_t s_gen_ttl_lock;
-9
fs/ceph/xattr.c
··· 118 118 (unsigned long long)ceph_file_layout_su(ci->i_layout), 119 119 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), 120 120 (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); 121 - 122 - if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) { 123 - val += ret; 124 - size -= ret; 125 - ret += snprintf(val, size, "preferred_osd=%lld\n", 126 - (unsigned long long)ceph_file_layout_pg_preferred( 127 - ci->i_layout)); 128 - } 129 - 130 121 return ret; 131 122 } 132 123
+9 -3
include/linux/ceph/auth.h
··· 14 14 struct ceph_auth_client; 15 15 struct ceph_authorizer; 16 16 17 + struct ceph_auth_handshake { 18 + struct ceph_authorizer *authorizer; 19 + void *authorizer_buf; 20 + size_t authorizer_buf_len; 21 + void *authorizer_reply_buf; 22 + size_t authorizer_reply_buf_len; 23 + }; 24 + 17 25 struct ceph_auth_client_ops { 18 26 const char *name; 19 27 ··· 51 43 * the response to authenticate the service. 52 44 */ 53 45 int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type, 54 - struct ceph_authorizer **a, 55 - void **buf, size_t *len, 56 - void **reply_buf, size_t *reply_len); 46 + struct ceph_auth_handshake *auth); 57 47 int (*verify_authorizer_reply)(struct ceph_auth_client *ac, 58 48 struct ceph_authorizer *a, size_t len); 59 49 void (*destroy_authorizer)(struct ceph_auth_client *ac,
+2 -2
include/linux/ceph/ceph_fs.h
··· 65 65 __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */ 66 66 67 67 /* object -> pg layout */ 68 - __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */ 68 + __le32 fl_unused; /* unused; used to be preferred primary (-1) */ 69 69 __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ 70 70 } __attribute__ ((packed)); 71 71 ··· 384 384 __le32 stripe_count; /* ... */ 385 385 __le32 object_size; 386 386 __le32 file_replication; 387 - __le32 preferred; 387 + __le32 unused; /* used to be preferred osd */ 388 388 } __attribute__ ((packed)) open; 389 389 struct { 390 390 __le32 flags;
+7 -2
include/linux/ceph/decode.h
··· 46 46 /* 47 47 * bounds check input. 48 48 */ 49 + static inline int ceph_has_room(void **p, void *end, size_t n) 50 + { 51 + return end >= *p && n <= end - *p; 52 + } 53 + 49 54 #define ceph_decode_need(p, end, n, bad) \ 50 55 do { \ 51 - if (unlikely(*(p) + (n) > (end))) \ 56 + if (!likely(ceph_has_room(p, end, n))) \ 52 57 goto bad; \ 53 58 } while (0) 54 59 ··· 172 167 173 168 #define ceph_encode_need(p, end, n, bad) \ 174 169 do { \ 175 - if (unlikely(*(p) + (n) > (end))) \ 170 + if (!likely(ceph_has_room(p, end, n))) \ 176 171 goto bad; \ 177 172 } while (0) 178 173
+3 -3
include/linux/ceph/messenger.h
··· 25 25 void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m); 26 26 27 27 /* authorize an outgoing connection */ 28 - int (*get_authorizer) (struct ceph_connection *con, 29 - void **buf, int *len, int *proto, 30 - void **reply_buf, int *reply_len, int force_new); 28 + struct ceph_auth_handshake *(*get_authorizer) ( 29 + struct ceph_connection *con, 30 + int *proto, int force_new); 31 31 int (*verify_authorizer_reply) (struct ceph_connection *con, int len); 32 32 int (*invalidate_authorizer)(struct ceph_connection *con); 33 33
+5 -6
include/linux/ceph/osd_client.h
··· 6 6 #include <linux/mempool.h> 7 7 #include <linux/rbtree.h> 8 8 9 - #include "types.h" 10 - #include "osdmap.h" 11 - #include "messenger.h" 9 + #include <linux/ceph/types.h> 10 + #include <linux/ceph/osdmap.h> 11 + #include <linux/ceph/messenger.h> 12 + #include <linux/ceph/auth.h> 12 13 13 14 /* 14 15 * Maximum object name size ··· 41 40 struct list_head o_requests; 42 41 struct list_head o_linger_requests; 43 42 struct list_head o_osd_lru; 44 - struct ceph_authorizer *o_authorizer; 45 - void *o_authorizer_buf, *o_authorizer_reply_buf; 46 - size_t o_authorizer_buf_len, o_authorizer_reply_buf_len; 43 + struct ceph_auth_handshake o_auth; 47 44 unsigned long lru_ttl; 48 45 int o_marked_for_keepalive; 49 46 struct list_head o_keepalive_item;
-2
include/linux/ceph/osdmap.h
··· 65 65 #define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) 66 66 #define ceph_file_layout_object_su(l) \ 67 67 ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) 68 - #define ceph_file_layout_pg_preferred(l) \ 69 - ((__s32)le32_to_cpu((l).fl_pg_preferred)) 70 68 #define ceph_file_layout_pg_pool(l) \ 71 69 ((__s32)le32_to_cpu((l).fl_pg_pool)) 72 70
+6 -12
include/linux/crush/crush.h
··· 151 151 struct crush_bucket **buckets; 152 152 struct crush_rule **rules; 153 153 154 - /* 155 - * Parent pointers to identify the parent bucket a device or 156 - * bucket in the hierarchy. If an item appears more than 157 - * once, this is the _last_ time it appeared (where buckets 158 - * are processed in bucket id order, from -1 on down to 159 - * -max_buckets. 160 - */ 161 - __u32 *bucket_parents; 162 - __u32 *device_parents; 163 - 164 154 __s32 max_buckets; 165 155 __u32 max_rules; 166 156 __s32 max_devices; ··· 158 168 159 169 160 170 /* crush.c */ 161 - extern int crush_get_bucket_item_weight(struct crush_bucket *b, int pos); 162 - extern void crush_calc_parents(struct crush_map *map); 171 + extern int crush_get_bucket_item_weight(const struct crush_bucket *b, int pos); 163 172 extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b); 164 173 extern void crush_destroy_bucket_list(struct crush_bucket_list *b); 165 174 extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); 166 175 extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b); 167 176 extern void crush_destroy_bucket(struct crush_bucket *b); 168 177 extern void crush_destroy(struct crush_map *map); 178 + 179 + static inline int crush_calc_tree_node(int i) 180 + { 181 + return ((i+1) << 1)-1; 182 + } 169 183 170 184 #endif
+3 -4
include/linux/crush/mapper.h
··· 10 10 11 11 #include "crush.h" 12 12 13 - extern int crush_find_rule(struct crush_map *map, int pool, int type, int size); 14 - extern int crush_do_rule(struct crush_map *map, 13 + extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size); 14 + extern int crush_do_rule(const struct crush_map *map, 15 15 int ruleno, 16 16 int x, int *result, int result_max, 17 - int forcefeed, /* -1 for none */ 18 - __u32 *weights); 17 + const __u32 *weights); 19 18 20 19 #endif
+7 -8
net/ceph/auth_none.c
··· 59 59 */ 60 60 static int ceph_auth_none_create_authorizer( 61 61 struct ceph_auth_client *ac, int peer_type, 62 - struct ceph_authorizer **a, 63 - void **buf, size_t *len, 64 - void **reply_buf, size_t *reply_len) 62 + struct ceph_auth_handshake *auth) 65 63 { 66 64 struct ceph_auth_none_info *ai = ac->private; 67 65 struct ceph_none_authorizer *au = &ai->au; ··· 80 82 dout("built authorizer len %d\n", au->buf_len); 81 83 } 82 84 83 - *a = (struct ceph_authorizer *)au; 84 - *buf = au->buf; 85 - *len = au->buf_len; 86 - *reply_buf = au->reply_buf; 87 - *reply_len = sizeof(au->reply_buf); 85 + auth->authorizer = (struct ceph_authorizer *) au; 86 + auth->authorizer_buf = au->buf; 87 + auth->authorizer_buf_len = au->buf_len; 88 + auth->authorizer_reply_buf = au->reply_buf; 89 + auth->authorizer_reply_buf_len = sizeof (au->reply_buf); 90 + 88 91 return 0; 89 92 90 93 bad2:
+7 -8
net/ceph/auth_x.c
··· 526 526 527 527 static int ceph_x_create_authorizer( 528 528 struct ceph_auth_client *ac, int peer_type, 529 - struct ceph_authorizer **a, 530 - void **buf, size_t *len, 531 - void **reply_buf, size_t *reply_len) 529 + struct ceph_auth_handshake *auth) 532 530 { 533 531 struct ceph_x_authorizer *au; 534 532 struct ceph_x_ticket_handler *th; ··· 546 548 return ret; 547 549 } 548 550 549 - *a = (struct ceph_authorizer *)au; 550 - *buf = au->buf->vec.iov_base; 551 - *len = au->buf->vec.iov_len; 552 - *reply_buf = au->reply_buf; 553 - *reply_len = sizeof(au->reply_buf); 551 + auth->authorizer = (struct ceph_authorizer *) au; 552 + auth->authorizer_buf = au->buf->vec.iov_base; 553 + auth->authorizer_buf_len = au->buf->vec.iov_len; 554 + auth->authorizer_reply_buf = au->reply_buf; 555 + auth->authorizer_reply_buf_len = sizeof (au->reply_buf); 556 + 554 557 return 0; 555 558 } 556 559
+7 -32
net/ceph/crush/crush.c
··· 26 26 * @b: bucket pointer 27 27 * @p: item index in bucket 28 28 */ 29 - int crush_get_bucket_item_weight(struct crush_bucket *b, int p) 29 + int crush_get_bucket_item_weight(const struct crush_bucket *b, int p) 30 30 { 31 - if (p >= b->size) 31 + if ((__u32)p >= b->size) 32 32 return 0; 33 33 34 34 switch (b->alg) { ··· 37 37 case CRUSH_BUCKET_LIST: 38 38 return ((struct crush_bucket_list *)b)->item_weights[p]; 39 39 case CRUSH_BUCKET_TREE: 40 - if (p & 1) 41 - return ((struct crush_bucket_tree *)b)->node_weights[p]; 42 - return 0; 40 + return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)]; 43 41 case CRUSH_BUCKET_STRAW: 44 42 return ((struct crush_bucket_straw *)b)->item_weights[p]; 45 43 } 46 44 return 0; 47 - } 48 - 49 - /** 50 - * crush_calc_parents - Calculate parent vectors for the given crush map. 51 - * @map: crush_map pointer 52 - */ 53 - void crush_calc_parents(struct crush_map *map) 54 - { 55 - int i, b, c; 56 - 57 - for (b = 0; b < map->max_buckets; b++) { 58 - if (map->buckets[b] == NULL) 59 - continue; 60 - for (i = 0; i < map->buckets[b]->size; i++) { 61 - c = map->buckets[b]->items[i]; 62 - BUG_ON(c >= map->max_devices || 63 - c < -map->max_buckets); 64 - if (c >= 0) 65 - map->device_parents[c] = map->buckets[b]->id; 66 - else 67 - map->bucket_parents[-1-c] = map->buckets[b]->id; 68 - } 69 - } 70 45 } 71 46 72 47 void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) ··· 62 87 63 88 void crush_destroy_bucket_tree(struct crush_bucket_tree *b) 64 89 { 90 + kfree(b->h.perm); 91 + kfree(b->h.items); 65 92 kfree(b->node_weights); 66 93 kfree(b); 67 94 } ··· 101 124 */ 102 125 void crush_destroy(struct crush_map *map) 103 126 { 104 - int b; 105 - 106 127 /* buckets */ 107 128 if (map->buckets) { 129 + __s32 b; 108 130 for (b = 0; b < map->max_buckets; b++) { 109 131 if (map->buckets[b] == NULL) 110 132 continue; ··· 114 138 115 139 /* rules */ 116 140 if (map->rules) { 141 + __u32 b; 117 142 for (b = 0; b < map->max_rules; b++) 118 143 kfree(map->rules[b]); 119 144 kfree(map->rules); 120 145 } 121 146 122 - kfree(map->bucket_parents); 123 - kfree(map->device_parents); 124 147 kfree(map); 125 148 } 126 149
+48 -76
net/ceph/crush/mapper.c
··· 33 33 * @type: storage ruleset type (user defined) 34 34 * @size: output set size 35 35 */ 36 - int crush_find_rule(struct crush_map *map, int ruleset, int type, int size) 36 + int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size) 37 37 { 38 - int i; 38 + __u32 i; 39 39 40 40 for (i = 0; i < map->max_rules; i++) { 41 41 if (map->rules[i] && ··· 73 73 unsigned int i, s; 74 74 75 75 /* start a new permutation if @x has changed */ 76 - if (bucket->perm_x != x || bucket->perm_n == 0) { 76 + if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) { 77 77 dprintk("bucket %d new x=%d\n", bucket->id, x); 78 78 bucket->perm_x = x; 79 79 ··· 153 153 return bucket->h.items[i]; 154 154 } 155 155 156 - BUG_ON(1); 157 - return 0; 156 + dprintk("bad list sums for bucket %d\n", bucket->h.id); 157 + return bucket->h.items[0]; 158 158 } 159 159 160 160 ··· 220 220 static int bucket_straw_choose(struct crush_bucket_straw *bucket, 221 221 int x, int r) 222 222 { 223 - int i; 223 + __u32 i; 224 224 int high = 0; 225 225 __u64 high_draw = 0; 226 226 __u64 draw; ··· 240 240 static int crush_bucket_choose(struct crush_bucket *in, int x, int r) 241 241 { 242 242 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); 243 + BUG_ON(in->size == 0); 243 244 switch (in->alg) { 244 245 case CRUSH_BUCKET_UNIFORM: 245 246 return bucket_uniform_choose((struct crush_bucket_uniform *)in, ··· 255 254 return bucket_straw_choose((struct crush_bucket_straw *)in, 256 255 x, r); 257 256 default: 258 - BUG_ON(1); 257 + dprintk("unknown bucket %d alg %d\n", in->id, in->alg); 259 258 return in->items[0]; 260 259 } 261 260 } ··· 264 263 * true if device is marked "out" (failed, fully offloaded) 265 264 * of the cluster 266 265 */ 267 - static int is_out(struct crush_map *map, __u32 *weight, int item, int x) 266 + static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) 268 267 { 269 268 if (weight[item] >= 0x10000) 270 269 return 0; ··· 289 288 * @recurse_to_leaf: true if we want one device under each item of given type 290 289 * @out2: second output vector for leaf items (if @recurse_to_leaf) 291 290 */ 292 - static int crush_choose(struct crush_map *map, 291 + static int crush_choose(const struct crush_map *map, 293 292 struct crush_bucket *bucket, 294 - __u32 *weight, 293 + const __u32 *weight, 295 294 int x, int numrep, int type, 296 295 int *out, int outpos, 297 296 int firstn, int recurse_to_leaf, 298 297 int *out2) 299 298 { 300 299 int rep; 301 - int ftotal, flocal; 300 + unsigned int ftotal, flocal; 302 301 int retry_descent, retry_bucket, skip_rep; 303 302 struct crush_bucket *in = bucket; 304 303 int r; ··· 306 305 int item = 0; 307 306 int itemtype; 308 307 int collide, reject; 309 - const int orig_tries = 5; /* attempts before we fall back to search */ 308 + const unsigned int orig_tries = 5; /* attempts before we fall back to search */ 310 309 311 310 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 312 311 bucket->id, x, outpos, numrep); ··· 327 326 r = rep; 328 327 if (in->alg == CRUSH_BUCKET_UNIFORM) { 329 328 /* be careful */ 330 - if (firstn || numrep >= in->size) 329 + if (firstn || (__u32)numrep >= in->size) 331 330 /* r' = r + f_total */ 332 331 r += ftotal; 333 332 else if (in->size % numrep == 0) ··· 356 355 item = bucket_perm_choose(in, x, r); 357 356 else 358 357 item = crush_bucket_choose(in, x, r); 359 - BUG_ON(item >= map->max_devices); 358 + if (item >= map->max_devices) { 359 + dprintk(" bad item %d\n", item); 360 + skip_rep = 1; 361 + break; 362 + } 360 363 361 364 /* desired type? */ 362 365 if (item < 0) ··· 371 366 372 367 /* keep going? */ 373 368 if (itemtype != type) { 374 - BUG_ON(item >= 0 || 375 - (-1-item) >= map->max_buckets); 369 + if (item >= 0 || 370 + (-1-item) >= map->max_buckets) { 371 + dprintk(" bad item type %d\n", type); 372 + skip_rep = 1; 373 + break; 374 + } 376 375 in = map->buckets[-1-item]; 377 376 retry_bucket = 1; 378 377 continue; ··· 425 416 if (collide && flocal < 3) 426 417 /* retry locally a few times */ 427 418 retry_bucket = 1; 428 - else if (flocal < in->size + orig_tries) 419 + else if (flocal <= in->size + orig_tries) 429 420 /* exhaustive bucket search */ 430 421 retry_bucket = 1; 431 422 else if (ftotal < 20) ··· 435 426 /* else give up */ 436 427 skip_rep = 1; 437 428 dprintk(" reject %d collide %d " 438 - "ftotal %d flocal %d\n", 429 + "ftotal %u flocal %u\n", 439 430 reject, collide, ftotal, 440 431 flocal); 441 432 } ··· 464 455 * @x: hash input 465 456 * @result: pointer to result vector 466 457 * @result_max: maximum result size 467 - * @force: force initial replica choice; -1 for none 468 458 */ 469 - int crush_do_rule(struct crush_map *map, 459 + int crush_do_rule(const struct crush_map *map, 470 460 int ruleno, int x, int *result, int result_max, 471 - int force, __u32 *weight) 461 + const __u32 *weight) 472 462 { 473 463 int result_len; 474 - int force_context[CRUSH_MAX_DEPTH]; 475 - int force_pos = -1; 476 464 int a[CRUSH_MAX_SET]; 477 465 int b[CRUSH_MAX_SET]; 478 466 int c[CRUSH_MAX_SET]; ··· 480 474 int osize; 481 475 int *tmp; 482 476 struct crush_rule *rule; 483 - int step; 477 + __u32 step; 484 478 int i, j; 485 479 int numrep; 486 480 int firstn; 487 481 488 - BUG_ON(ruleno >= map->max_rules); 482 + if ((__u32)ruleno >= map->max_rules) { 483 + dprintk(" bad ruleno %d\n", ruleno); 484 + return 0; 485 + } 489 486 490 487 rule = map->rules[ruleno]; 491 488 result_len = 0; 492 489 w = a; 493 490 o = b; 494 491 495 - /* 496 - * determine hierarchical context of force, if any. note 497 - * that this may or may not correspond to the specific types 498 - * referenced by the crush rule. 499 - */ 500 - if (force >= 0 && 501 - force < map->max_devices && 502 - map->device_parents[force] != 0 && 503 - !is_out(map, weight, force, x)) { 504 - while (1) { 505 - force_context[++force_pos] = force; 506 - if (force >= 0) 507 - force = map->device_parents[force]; 508 - else 509 - force = map->bucket_parents[-1-force]; 510 - if (force == 0) 511 - break; 512 - } 513 - } 514 - 515 492 for (step = 0; step < rule->len; step++) { 493 + struct crush_rule_step *curstep = &rule->steps[step]; 494 + 516 495 firstn = 0; 517 - switch (rule->steps[step].op) { 496 + switch (curstep->op) { 518 497 case CRUSH_RULE_TAKE: 519 - w[0] = rule->steps[step].arg1; 520 - 521 - /* find position in force_context/hierarchy */ 522 - while (force_pos >= 0 && 523 - force_context[force_pos] != w[0]) 524 - force_pos--; 525 - /* and move past it */ 526 - if (force_pos >= 0) 527 - force_pos--; 528 - 498 + w[0] = curstep->arg1; 529 499 wsize = 1; 530 500 break; 531 501 532 502 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: 533 503 case CRUSH_RULE_CHOOSE_FIRSTN: 534 504 firstn = 1; 505 + /* fall through */ 535 506 case CRUSH_RULE_CHOOSE_LEAF_INDEP: 536 507 case CRUSH_RULE_CHOOSE_INDEP: 537 - BUG_ON(wsize == 0); 508 + if (wsize == 0) 509 + break; 538 510 539 511 recurse_to_leaf = 540 - rule->steps[step].op == 512 + curstep->op == 541 513 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || 542 - rule->steps[step].op == 514 + curstep->op == 543 515 CRUSH_RULE_CHOOSE_LEAF_INDEP; 544 516 545 517 /* reset output */ ··· 529 545 * basically, numrep <= 0 means relative to 530 546 * the provided result_max 531 547 */ 532 - numrep = rule->steps[step].arg1; 548 + numrep = curstep->arg1; 533 549 if (numrep <= 0) { 534 550 numrep += result_max; 535 551 if (numrep <= 0) 536 552 continue; 537 553 } 538 554 j = 0; 539 - if (osize == 0 && force_pos >= 0) { 540 - /* skip any intermediate types */ 541 - while (force_pos && 542 - force_context[force_pos] < 0 && 543 - rule->steps[step].arg2 != 544 - map->buckets[-1 - 545 - force_context[force_pos]]->type) 546 - force_pos--; 547 - o[osize] = force_context[force_pos]; 548 - if (recurse_to_leaf) 549 - c[osize] = force_context[0]; 550 - j++; 551 - force_pos--; 552 - } 553 555 osize += crush_choose(map, 554 556 map->buckets[-1-w[i]], 555 557 weight, 556 558 x, numrep, 557 - rule->steps[step].arg2, 559 + curstep->arg2, 558 560 o+osize, j, 559 561 firstn, 560 562 recurse_to_leaf, c+osize); ··· 567 597 break; 568 598 569 599 default: 570 - BUG_ON(1); 600 + dprintk(" unknown op %d at step %d\n", 601 + curstep->op, step); 602 + break; 571 603 } 572 604 } 573 605 return result_len;
+107 -75
net/ceph/messenger.c
··· 653 653 * Connection negotiation. 654 654 */ 655 655 656 - static int prepare_connect_authorizer(struct ceph_connection *con) 656 + static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, 657 + int *auth_proto) 657 658 { 658 - void *auth_buf; 659 - int auth_len = 0; 660 - int auth_protocol = 0; 659 + struct ceph_auth_handshake *auth; 660 + 661 + if (!con->ops->get_authorizer) { 662 + con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 663 + con->out_connect.authorizer_len = 0; 664 + 665 + return NULL; 666 + } 667 + 668 + /* Can't hold the mutex while getting authorizer */ 661 669 662 670 mutex_unlock(&con->mutex); 663 - if (con->ops->get_authorizer) 664 - con->ops->get_authorizer(con, &auth_buf, &auth_len, 665 - &auth_protocol, &con->auth_reply_buf, 666 - &con->auth_reply_buf_len, 667 - con->auth_retry); 671 + 672 + auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 673 + 668 674 mutex_lock(&con->mutex); 669 675 670 - if (test_bit(CLOSED, &con->state) || 671 - test_bit(OPENING, &con->state)) 672 - return -EAGAIN; 676 + if (IS_ERR(auth)) 677 + return auth; 678 + if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) 679 + return ERR_PTR(-EAGAIN); 673 680 674 - con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 675 - con->out_connect.authorizer_len = cpu_to_le32(auth_len); 681 + con->auth_reply_buf = auth->authorizer_reply_buf; 682 + con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 676 683 677 - if (auth_len) 678 - ceph_con_out_kvec_add(con, auth_len, auth_buf); 679 684 680 - return 0; 685 + return auth; 681 686 } 682 687 683 688 /* 684 689 * We connected to a peer and are saying hello. 685 690 */ 686 - static void prepare_write_banner(struct ceph_messenger *msgr, 687 - struct ceph_connection *con) 691 + static void prepare_write_banner(struct ceph_connection *con) 688 692 { 689 - ceph_con_out_kvec_reset(con); 690 693 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 691 - ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), 692 - &msgr->my_enc_addr); 694 + ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 695 + &con->msgr->my_enc_addr); 693 696 694 697 con->out_more = 0; 695 698 set_bit(WRITE_PENDING, &con->state); 696 699 } 697 700 698 - static int prepare_write_connect(struct ceph_messenger *msgr, 699 - struct ceph_connection *con, 700 - int include_banner) 701 + static int prepare_write_connect(struct ceph_connection *con) 701 702 { 702 703 unsigned int global_seq = get_global_seq(con->msgr, 0); 703 704 int proto; 705 + int auth_proto; 706 + struct ceph_auth_handshake *auth; 704 707 705 708 switch (con->peer_name.type) { 706 709 case CEPH_ENTITY_TYPE_MON: ··· 722 719 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 723 720 con->connect_seq, global_seq, proto); 724 721 725 - con->out_connect.features = cpu_to_le64(msgr->supported_features); 722 + con->out_connect.features = cpu_to_le64(con->msgr->supported_features); 726 723 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 727 724 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 728 725 con->out_connect.global_seq = cpu_to_le32(global_seq); 729 726 con->out_connect.protocol_version = cpu_to_le32(proto); 730 727 con->out_connect.flags = 0; 731 728 732 - if (include_banner) 733 - prepare_write_banner(msgr, con); 734 - else 735 - ceph_con_out_kvec_reset(con); 736 - ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); 729 + auth_proto = CEPH_AUTH_UNKNOWN; 730 + auth = get_connect_authorizer(con, &auth_proto); 731 + if (IS_ERR(auth)) 732 + return PTR_ERR(auth); 733 + 734 + con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 735 + con->out_connect.authorizer_len = auth ? 736 + cpu_to_le32(auth->authorizer_buf_len) : 0; 737 + 738 + ceph_con_out_kvec_add(con, sizeof (con->out_connect), 739 + &con->out_connect); 740 + if (auth && auth->authorizer_buf_len) 741 + ceph_con_out_kvec_add(con, auth->authorizer_buf_len, 742 + auth->authorizer_buf); 737 743 738 744 con->out_more = 0; 739 745 set_bit(WRITE_PENDING, &con->state); 740 746 741 - return prepare_connect_authorizer(con); 747 + return 0; 742 748 } 743 749 744 750 /* ··· 1004 992 1005 993 1006 994 static int read_partial(struct ceph_connection *con, 1007 - int *to, int size, void *object) 995 + int end, int size, void *object) 1008 996 { 1009 - *to += size; 1010 - while (con->in_base_pos < *to) { 1011 - int left = *to - con->in_base_pos; 997 + while (con->in_base_pos < end) { 998 + int left = end - con->in_base_pos; 1012 999 int have = size - left; 1013 1000 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1014 1001 if (ret <= 0) ··· 1023 1012 */ 1024 1013 static int read_partial_banner(struct ceph_connection *con) 1025 1014 { 1026 - int ret, to = 0; 1015 + int size; 1016 + int end; 1017 + int ret; 1027 1018 1028 1019 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1029 1020 1030 1021 /* peer's banner */ 1031 - ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 1022 + size = strlen(CEPH_BANNER); 1023 + end = size; 1024 + ret = read_partial(con, end, size, con->in_banner); 1032 1025 if (ret <= 0) 1033 1026 goto out; 1034 - ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 1035 - &con->actual_peer_addr); 1027 + 1028 + size = sizeof (con->actual_peer_addr); 1029 + end += size; 1030 + ret = read_partial(con, end, size, &con->actual_peer_addr); 1036 1031 if (ret <= 0) 1037 1032 goto out; 1038 - ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 1039 - &con->peer_addr_for_me); 1033 + 1034 + size = sizeof (con->peer_addr_for_me); 1035 + end += size; 1036 + ret = read_partial(con, end, size, &con->peer_addr_for_me); 1040 1037 if (ret <= 0) 1041 1038 goto out; 1039 + 1042 1040 out: 1043 1041 return ret; 1044 1042 } 1045 1043 1046 1044 static int read_partial_connect(struct ceph_connection *con) 1047 1045 { 1048 - int ret, to = 0; 1046 + int size; 1047 + int end; 1048 + int ret; 1049 1049 1050 1050 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1051 1051 1052 - ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 1052 + size = sizeof (con->in_reply); 1053 + end = size; 1054 + ret = read_partial(con, end, size, &con->in_reply); 1053 1055 if (ret <= 0) 1054 1056 goto out; 1055 - ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 1056 - con->auth_reply_buf); 1057 + 1058 + size = le32_to_cpu(con->in_reply.authorizer_len); 1059 + end += size; 1060 + ret = read_partial(con, end, size, con->auth_reply_buf); 1057 1061 if (ret <= 0) 1058 1062 goto out; 1059 1063 ··· 1403 1377 return -1; 1404 1378 } 1405 1379 con->auth_retry = 1; 1406 - ret = prepare_write_connect(con->msgr, con, 0); 1380 + ceph_con_out_kvec_reset(con); 1381 + ret = prepare_write_connect(con); 1407 1382 if (ret < 0) 1408 1383 return ret; 1409 1384 prepare_read_connect(con); ··· 1424 1397 ENTITY_NAME(con->peer_name), 1425 1398 ceph_pr_addr(&con->peer_addr.in_addr)); 1426 1399 reset_connection(con); 1427 - prepare_write_connect(con->msgr, con, 0); 1400 + ceph_con_out_kvec_reset(con); 1401 + ret = prepare_write_connect(con); 1402 + if (ret < 0) 1403 + return ret; 1428 1404 prepare_read_connect(con); 1429 1405 1430 1406 /* Tell ceph about it. */ ··· 1450 1420 le32_to_cpu(con->out_connect.connect_seq), 1451 1421 le32_to_cpu(con->in_connect.connect_seq)); 1452 1422 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1453 - prepare_write_connect(con->msgr, con, 0); 1423 + ceph_con_out_kvec_reset(con); 1424 + ret = prepare_write_connect(con); 1425 + if (ret < 0) 1426 + return ret; 1454 1427 prepare_read_connect(con); 1455 1428 break; 1456 1429 ··· 1467 1434 le32_to_cpu(con->in_connect.global_seq)); 1468 1435 get_global_seq(con->msgr, 1469 1436 le32_to_cpu(con->in_connect.global_seq)); 1470 - prepare_write_connect(con->msgr, con, 0); 1437 + ceph_con_out_kvec_reset(con); 1438 + ret = prepare_write_connect(con); 1439 + if (ret < 0) 1440 + return ret; 1471 1441 prepare_read_connect(con); 1472 1442 break; 1473 1443 ··· 1527 1491 */ 1528 1492 static int read_partial_ack(struct ceph_connection *con) 1529 1493 { 1530 - int to = 0; 1494 + int size = sizeof (con->in_temp_ack); 1495 + int end = size; 1531 1496 1532 - return read_partial(con, &to, sizeof(con->in_temp_ack), 1533 - &con->in_temp_ack); 1497 + return read_partial(con, end, size, &con->in_temp_ack); 1534 1498 } 1535 1499 1536 1500 ··· 1663 1627 static int read_partial_message(struct ceph_connection *con) 1664 1628 { 1665 1629 struct ceph_msg *m = con->in_msg; 1630 + int size; 1631 + int end; 1666 1632 int ret; 1667 - int to, left; 1668 1633 unsigned int front_len, middle_len, data_len; 1669 1634 bool do_datacrc = !con->msgr->nocrc; 1670 1635 int skip; ··· 1675 1638 dout("read_partial_message con %p msg %p\n", con, m); 1676 1639 1677 1640 /* header */ 1678 - while (con->in_base_pos < sizeof(con->in_hdr)) { 1679 - left = sizeof(con->in_hdr) - con->in_base_pos; 1680 - ret = ceph_tcp_recvmsg(con->sock, 1681 - (char *)&con->in_hdr + con->in_base_pos, 1682 - left); 1683 - if (ret <= 0) 1684 - return ret; 1685 - con->in_base_pos += ret; 1686 - } 1641 + size = sizeof (con->in_hdr); 1642 + end = size; 1643 + ret = read_partial(con, end, size, &con->in_hdr); 1644 + if (ret <= 0) 1645 + return ret; 1687 1646 1688 1647 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 1689 1648 if (cpu_to_le32(crc) != con->in_hdr.crc) { ··· 1792 1759 } 1793 1760 1794 1761 /* footer */ 1795 - to = sizeof(m->hdr) + sizeof(m->footer); 1796 - while (con->in_base_pos < to) { 1797 - left = to - con->in_base_pos; 1798 - ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1799 - (con->in_base_pos - sizeof(m->hdr)), 1800 - left); 1801 - if (ret <= 0) 1802 - return ret; 1803 - con->in_base_pos += ret; 1804 - } 1762 + size = sizeof (m->footer); 1763 + end += size; 1764 + ret = read_partial(con, end, size, &m->footer); 1765 + if (ret <= 0) 1766 + return ret; 1767 + 1805 1768 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1806 1769 m, front_len, m->footer.front_crc, middle_len, 1807 1770 m->footer.middle_crc, data_len, m->footer.data_crc); ··· 1864 1835 */ 1865 1836 static int try_write(struct ceph_connection *con) 1866 1837 { 1867 - struct ceph_messenger *msgr = con->msgr; 1868 1838 int ret = 1; 1869 1839 1870 1840 dout("try_write start %p state %lu nref %d\n", con, con->state, ··· 1874 1846 1875 1847 /* open the socket first? */ 1876 1848 if (con->sock == NULL) { 1877 - prepare_write_connect(msgr, con, 1); 1849 + ceph_con_out_kvec_reset(con); 1850 + prepare_write_banner(con); 1851 + ret = prepare_write_connect(con); 1852 + if (ret < 0) 1853 + goto out; 1878 1854 prepare_read_banner(con); 1879 1855 set_bit(CONNECTING, &con->state); 1880 1856 clear_bit(NEGOTIATING, &con->state);
+33 -28
net/ceph/osd_client.c
··· 278 278 { 279 279 dst->op = cpu_to_le16(src->op); 280 280 281 - switch (dst->op) { 281 + switch (src->op) { 282 282 case CEPH_OSD_OP_READ: 283 283 case CEPH_OSD_OP_WRITE: 284 284 dst->extent.offset = ··· 664 664 { 665 665 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 666 666 atomic_read(&osd->o_ref) - 1); 667 - if (atomic_dec_and_test(&osd->o_ref)) { 667 + if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 668 668 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 669 669 670 - if (osd->o_authorizer) 671 - ac->ops->destroy_authorizer(ac, osd->o_authorizer); 670 + if (ac->ops && ac->ops->destroy_authorizer) 671 + ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer); 672 672 kfree(osd); 673 673 } 674 674 } ··· 841 841 static void __unregister_request(struct ceph_osd_client *osdc, 842 842 struct ceph_osd_request *req) 843 843 { 844 + if (RB_EMPTY_NODE(&req->r_node)) { 845 + dout("__unregister_request %p tid %lld not registered\n", 846 + req, req->r_tid); 847 + return; 848 + } 849 + 844 850 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 845 851 rb_erase(&req->r_node, &osdc->requests); 846 852 osdc->num_requests--; ··· 2114 2108 /* 2115 2109 * authentication 2116 2110 */ 2117 - static int get_authorizer(struct ceph_connection *con, 2118 - void **buf, int *len, int *proto, 2119 - void **reply_buf, int *reply_len, int force_new) 2111 + /* 2112 + * Note: returned pointer is the address of a structure that's 2113 + * managed separately. Caller must *not* attempt to free it. 2114 + */ 2115 + static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2116 + int *proto, int force_new) 2120 2117 { 2121 2118 struct ceph_osd *o = con->private; 2122 2119 struct ceph_osd_client *osdc = o->o_osdc; 2123 2120 struct ceph_auth_client *ac = osdc->client->monc.auth; 2124 - int ret = 0; 2121 + struct ceph_auth_handshake *auth = &o->o_auth; 2125 2122 2126 - if (force_new && o->o_authorizer) { 2127 - ac->ops->destroy_authorizer(ac, o->o_authorizer); 2128 - o->o_authorizer = NULL; 2123 + if (force_new && auth->authorizer) { 2124 + if (ac->ops && ac->ops->destroy_authorizer) 2125 + ac->ops->destroy_authorizer(ac, auth->authorizer); 2126 + auth->authorizer = NULL; 2129 2127 } 2130 - if (o->o_authorizer == NULL) { 2131 - ret = ac->ops->create_authorizer( 2132 - ac, CEPH_ENTITY_TYPE_OSD, 2133 - &o->o_authorizer, 2134 - &o->o_authorizer_buf, 2135 - &o->o_authorizer_buf_len, 2136 - &o->o_authorizer_reply_buf, 2137 - &o->o_authorizer_reply_buf_len); 2128 + if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 2129 + int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2130 + auth); 2138 2131 if (ret) 2139 - return ret; 2132 + return ERR_PTR(ret); 2140 2133 } 2141 - 2142 2134 *proto = ac->protocol; 2143 - *buf = o->o_authorizer_buf; 2144 - *len = o->o_authorizer_buf_len; 2145 - *reply_buf = o->o_authorizer_reply_buf; 2146 - *reply_len = o->o_authorizer_reply_buf_len; 2147 - return 0; 2135 + 2136 + return auth; 2148 2137 } 2149 2138 2150 2139 ··· 2149 2148 struct ceph_osd_client *osdc = o->o_osdc; 2150 2149 struct ceph_auth_client *ac = osdc->client->monc.auth; 2151 2150 2152 - return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); 2151 + /* 2152 + * XXX If ac->ops or ac->ops->verify_authorizer_reply is null, 2153 + * XXX which do we do: succeed or fail? 2154 + */ 2155 + return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2153 2156 } 2154 2157 2155 2158 static int invalidate_authorizer(struct ceph_connection *con) ··· 2162 2157 struct ceph_osd_client *osdc = o->o_osdc; 2163 2158 struct ceph_auth_client *ac = osdc->client->monc.auth; 2164 2159 2165 - if (ac->ops->invalidate_authorizer) 2160 + if (ac->ops && ac->ops->invalidate_authorizer) 2166 2161 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2167 2162 2168 2163 return ceph_monc_validate_auth(&osdc->client->monc);
+25 -48
net/ceph/osdmap.c
··· 161 161 c->max_rules = ceph_decode_32(p); 162 162 c->max_devices = ceph_decode_32(p); 163 163 164 - c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS); 165 - if (c->device_parents == NULL) 166 - goto badmem; 167 - c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS); 168 - if (c->bucket_parents == NULL) 169 - goto badmem; 170 - 171 164 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); 172 165 if (c->buckets == NULL) 173 166 goto badmem; ··· 883 890 pglen = ceph_decode_32(p); 884 891 885 892 if (pglen) { 886 - /* insert */ 887 893 ceph_decode_need(p, end, pglen*sizeof(u32), bad); 894 + 895 + /* removing existing (if any) */ 896 + (void) __remove_pg_mapping(&map->pg_temp, pgid); 897 + 898 + /* insert */ 888 899 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 889 900 if (!pg) { 890 901 err = -ENOMEM; ··· 997 1000 { 998 1001 unsigned int num, num_mask; 999 1002 struct ceph_pg pgid; 1000 - s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred); 1001 1003 int poolid = le32_to_cpu(fl->fl_pg_pool); 1002 1004 struct ceph_pg_pool_info *pool; 1003 1005 unsigned int ps; ··· 1007 1011 if (!pool) 1008 1012 return -EIO; 1009 1013 ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); 1010 - if (preferred >= 0) { 1011 - ps += preferred; 1012 - num = le32_to_cpu(pool->v.lpg_num); 1013 - num_mask = pool->lpg_num_mask; 1014 - } else { 1015 - num = le32_to_cpu(pool->v.pg_num); 1016 - num_mask = pool->pg_num_mask; 1017 - } 1014 + num = le32_to_cpu(pool->v.pg_num); 1015 + num_mask = pool->pg_num_mask; 1018 1016 1019 1017 pgid.ps = cpu_to_le16(ps); 1020 - pgid.preferred = cpu_to_le16(preferred); 1018 + pgid.preferred = cpu_to_le16(-1); 1021 1019 pgid.pool = fl->fl_pg_pool; 1022 - if (preferred >= 0) 1023 - dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps, 1024 - (int)preferred); 1025 - else 1026 - dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); 1020 + dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); 1027 1021 1028 1022 ol->ol_pgid = pgid; 1029 1023 ol->ol_stripe_unit = fl->fl_object_stripe_unit; ··· 1031 1045 struct ceph_pg_mapping *pg; 1032 1046 struct ceph_pg_pool_info *pool; 1033 1047 int ruleno; 1034 - unsigned int poolid, ps, pps, t; 1035 - int preferred; 1048 + unsigned int poolid, ps, pps, t, r; 1036 1049 1037 1050 poolid = le32_to_cpu(pgid.pool); 1038 1051 ps = le16_to_cpu(pgid.ps); 1039 - preferred = (s16)le16_to_cpu(pgid.preferred); 1040 1052 1041 1053 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); 1042 1054 if (!pool) 1043 1055 return NULL; 1044 1056 1045 1057 /* pg_temp? */ 1046 - if (preferred >= 0) 1047 - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num), 1048 - pool->lpgp_num_mask); 1049 - else 1050 - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), 1051 - pool->pgp_num_mask); 1058 + t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), 1059 + pool->pgp_num_mask); 1052 1060 pgid.ps = cpu_to_le16(t); 1053 1061 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1054 1062 if (pg) { ··· 1060 1080 return NULL; 1061 1081 } 1062 1082 1063 - /* don't forcefeed bad device ids to crush */ 1064 - if (preferred >= osdmap->max_osd || 1065 - preferred >= osdmap->crush->max_devices) 1066 - preferred = -1; 1067 - 1068 - if (preferred >= 0) 1069 - pps = ceph_stable_mod(ps, 1070 - le32_to_cpu(pool->v.lpgp_num), 1071 - pool->lpgp_num_mask); 1072 - else 1073 - pps = ceph_stable_mod(ps, 1074 - le32_to_cpu(pool->v.pgp_num), 1075 - pool->pgp_num_mask); 1083 + pps = ceph_stable_mod(ps, 1084 + le32_to_cpu(pool->v.pgp_num), 1085 + pool->pgp_num_mask); 1076 1086 pps += poolid; 1077 - *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1078 - min_t(int, pool->v.size, *num), 1079 - preferred, osdmap->osd_weight); 1087 + r = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1088 + min_t(int, pool->v.size, *num), 1089 + osdmap->osd_weight); 1090 + if (r < 0) { 1091 + pr_err("error %d from crush rule: pool %d ruleset %d type %d" 1092 + " size %d\n", r, poolid, pool->v.crush_ruleset, 1093 + pool->v.type, pool->v.size); 1094 + return NULL; 1095 + } 1096 + *num = r; 1080 1097 return osds; 1081 1098 } 1082 1099