Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
ceph: document unlocked d_parent accesses
ceph: explicitly reference rename old_dentry parent dir in request
ceph: document locking for ceph_set_dentry_offset
ceph: avoid d_parent in ceph_dentry_hash; fix ceph_encode_fh() hashing bug
ceph: protect d_parent access in ceph_d_revalidate
ceph: protect access to d_parent
ceph: handle racing calls to ceph_init_dentry
ceph: set dir complete frag after adding capability
rbd: set blk_queue request sizes to object size
ceph: set up readahead size when rsize is not passed
rbd: cancel watch request when releasing the device
ceph: ignore lease mask
ceph: fix ceph_lookup_open intent usage
ceph: only link open operations to directory unsafe list if O_CREAT|O_TRUNC
ceph: fix bad parent_inode calc in ceph_lookup_open
ceph: avoid carrying Fw cap during write into page cache
libceph: don't time out osd requests that haven't been received
ceph: report f_bfree based on kb_avail rather than diffing.
ceph: only queue capsnap if caps are dirty
ceph: fix snap writeback when racing with writes
...

+306 -145
+45 -1
drivers/block/rbd.c
··· 630 630 } 631 631 632 632 /* 633 + * returns the size of an object in the image 634 + */ 635 + static u64 rbd_obj_bytes(struct rbd_image_header *header) 636 + { 637 + return 1 << header->obj_order; 638 + } 639 + 640 + /* 633 641 * bio helpers 634 642 */ 635 643 ··· 1261 1253 return ret; 1262 1254 } 1263 1255 1256 + /* 1257 + * Request sync osd unwatch 1258 + */ 1259 + static int rbd_req_sync_unwatch(struct rbd_device *dev, 1260 + const char *obj) 1261 + { 1262 + struct ceph_osd_req_op *ops; 1263 + 1264 + int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1265 + if (ret < 0) 1266 + return ret; 1267 + 1268 + ops[0].watch.ver = 0; 1269 + ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); 1270 + ops[0].watch.flag = 0; 1271 + 1272 + ret = rbd_req_sync_op(dev, NULL, 1273 + CEPH_NOSNAP, 1274 + 0, 1275 + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1276 + ops, 1277 + 1, obj, 0, 0, NULL, NULL, NULL); 1278 + 1279 + rbd_destroy_ops(ops); 1280 + ceph_osdc_cancel_event(dev->watch_event); 1281 + dev->watch_event = NULL; 1282 + return ret; 1283 + } 1284 + 1264 1285 struct rbd_notify_info { 1265 1286 struct rbd_device *dev; 1266 1287 }; ··· 1773 1736 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1774 1737 if (!q) 1775 1738 goto out_disk; 1739 + 1740 + /* set io sizes to object size */ 1741 + blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL); 1742 + blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header)); 1743 + blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header)); 1744 + blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header)); 1745 + 1776 1746 blk_queue_merge_bvec(q, rbd_merge_bvec); 1777 1747 disk->queue = q; 1778 1748 ··· 2334 2290 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, 2335 2291 rbd_dev->watch_request); 2336 2292 if (rbd_dev->watch_event) 2337 - ceph_osdc_cancel_event(rbd_dev->watch_event); 2293 + rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name); 2338 2294 2339 2295 rbd_put_client(rbd_dev); 2340 2296
+1 -1
fs/ceph/debugfs.c
··· 102 102 path = NULL; 103 103 spin_lock(&req->r_old_dentry->d_lock); 104 104 seq_printf(s, " #%llx/%.*s (%s)", 105 - ceph_ino(req->r_old_dentry->d_parent->d_inode), 105 + ceph_ino(req->r_old_dentry_dir), 106 106 req->r_old_dentry->d_name.len, 107 107 req->r_old_dentry->d_name.name, 108 108 path ? path : "");
+73 -43
fs/ceph/dir.c
··· 40 40 if (dentry->d_fsdata) 41 41 return 0; 42 42 43 - if (dentry->d_parent == NULL || /* nfs fh_to_dentry */ 44 - ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) 45 - d_set_d_op(dentry, &ceph_dentry_ops); 46 - else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR) 47 - d_set_d_op(dentry, &ceph_snapdir_dentry_ops); 48 - else 49 - d_set_d_op(dentry, &ceph_snap_dentry_ops); 50 - 51 43 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO); 52 44 if (!di) 53 45 return -ENOMEM; /* oh well */ ··· 50 58 kmem_cache_free(ceph_dentry_cachep, di); 51 59 goto out_unlock; 52 60 } 61 + 62 + if (dentry->d_parent == NULL || /* nfs fh_to_dentry */ 63 + ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) 64 + d_set_d_op(dentry, &ceph_dentry_ops); 65 + else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR) 66 + d_set_d_op(dentry, &ceph_snapdir_dentry_ops); 67 + else 68 + d_set_d_op(dentry, &ceph_snap_dentry_ops); 69 + 53 70 di->dentry = dentry; 54 71 di->lease_session = NULL; 55 - dentry->d_fsdata = di; 56 72 dentry->d_time = jiffies; 73 + /* avoid reordering d_fsdata setup so that the check above is safe */ 74 + smp_mb(); 75 + dentry->d_fsdata = di; 57 76 ceph_dentry_lru_add(dentry); 58 77 out_unlock: 59 78 spin_unlock(&dentry->d_lock); 60 79 return 0; 61 80 } 62 81 82 + struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry) 83 + { 84 + struct inode *inode = NULL; 85 + 86 + if (!dentry) 87 + return NULL; 88 + 89 + spin_lock(&dentry->d_lock); 90 + if (dentry->d_parent) { 91 + inode = dentry->d_parent->d_inode; 92 + ihold(inode); 93 + } 94 + spin_unlock(&dentry->d_lock); 95 + return inode; 96 + } 63 97 64 98 65 99 /* ··· 151 133 d_unhashed(dentry) ? "!hashed" : "hashed", 152 134 parent->d_subdirs.prev, parent->d_subdirs.next); 153 135 if (p == &parent->d_subdirs) { 154 - fi->at_end = 1; 136 + fi->flags |= CEPH_F_ATEND; 155 137 goto out_unlock; 156 138 } 157 139 spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); ··· 252 234 const int max_bytes = fsc->mount_options->max_readdir_bytes; 253 235 254 236 dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off); 255 - if (fi->at_end) 237 + if (fi->flags & CEPH_F_ATEND) 256 238 return 0; 257 239 258 240 /* always start with . and .. */ ··· 421 403 dout("readdir next frag is %x\n", frag); 422 404 goto more; 423 405 } 424 - fi->at_end = 1; 406 + fi->flags |= CEPH_F_ATEND; 425 407 426 408 /* 427 409 * if dir_release_count still matches the dir, no dentries ··· 453 435 dput(fi->dentry); 454 436 fi->dentry = NULL; 455 437 } 456 - fi->at_end = 0; 438 + fi->flags &= ~CEPH_F_ATEND; 457 439 } 458 440 459 441 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin) ··· 481 463 if (offset != file->f_pos) { 482 464 file->f_pos = offset; 483 465 file->f_version = 0; 484 - fi->at_end = 0; 466 + fi->flags &= ~CEPH_F_ATEND; 485 467 } 486 468 retval = offset; 487 469 ··· 506 488 } 507 489 508 490 /* 509 - * Process result of a lookup/open request. 510 - * 511 - * Mainly, make sure we return the final req->r_dentry (if it already 512 - * existed) in place of the original VFS-provided dentry when they 513 - * differ. 514 - * 515 - * Gracefully handle the case where the MDS replies with -ENOENT and 516 - * no trace (which it may do, at its discretion, e.g., if it doesn't 517 - * care to issue a lease on the negative dentry). 491 + * Handle lookups for the hidden .snap directory. 518 492 */ 519 - struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 520 - struct dentry *dentry, int err) 493 + int ceph_handle_snapdir(struct ceph_mds_request *req, 494 + struct dentry *dentry, int err) 521 495 { 522 496 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 523 - struct inode *parent = dentry->d_parent->d_inode; 497 + struct inode *parent = dentry->d_parent->d_inode; /* we hold i_mutex */ 524 498 525 499 /* .snap dir? */ 526 500 if (err == -ENOENT && ··· 526 516 d_add(dentry, inode); 527 517 err = 0; 528 518 } 519 + return err; 520 + } 529 521 522 + /* 523 + * Figure out final result of a lookup/open request. 524 + * 525 + * Mainly, make sure we return the final req->r_dentry (if it already 526 + * existed) in place of the original VFS-provided dentry when they 527 + * differ. 528 + * 529 + * Gracefully handle the case where the MDS replies with -ENOENT and 530 + * no trace (which it may do, at its discretion, e.g., if it doesn't 531 + * care to issue a lease on the negative dentry). 532 + */ 533 + struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 534 + struct dentry *dentry, int err) 535 + { 530 536 if (err == -ENOENT) { 531 537 /* no trace? */ 532 538 err = 0; ··· 636 610 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); 637 611 req->r_locked_dir = dir; 638 612 err = ceph_mdsc_do_request(mdsc, NULL, req); 613 + err = ceph_handle_snapdir(req, dentry, err); 639 614 dentry = ceph_finish_lookup(req, dentry, err); 640 615 ceph_mdsc_put_request(req); /* will dput(dentry) */ 641 616 dout("lookup result=%p\n", dentry); ··· 816 789 req->r_dentry = dget(dentry); 817 790 req->r_num_caps = 2; 818 791 req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ 792 + req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); 819 793 req->r_locked_dir = dir; 820 794 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 821 795 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; ··· 915 887 req->r_dentry = dget(new_dentry); 916 888 req->r_num_caps = 2; 917 889 req->r_old_dentry = dget(old_dentry); 890 + req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); 918 891 req->r_locked_dir = new_dir; 919 892 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 920 893 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; ··· 1031 1002 */ 1032 1003 static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd) 1033 1004 { 1005 + int valid = 0; 1034 1006 struct inode *dir; 1035 1007 1036 1008 if (nd && nd->flags & LOOKUP_RCU) 1037 1009 return -ECHILD; 1038 1010 1039 - dir = dentry->d_parent->d_inode; 1040 - 1041 1011 dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry, 1042 1012 dentry->d_name.len, dentry->d_name.name, dentry->d_inode, 1043 1013 ceph_dentry(dentry)->offset); 1014 + 1015 + dir = ceph_get_dentry_parent_inode(dentry); 1044 1016 1045 1017 /* always trust cached snapped dentries, snapdir dentry */ 1046 1018 if (ceph_snap(dir) != CEPH_NOSNAP) { 1047 1019 dout("d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry, 1048 1020 dentry->d_name.len, dentry->d_name.name, dentry->d_inode); 1049 - goto out_touch; 1021 + valid = 1; 1022 + } else if (dentry->d_inode && 1023 + ceph_snap(dentry->d_inode) == CEPH_SNAPDIR) { 1024 + valid = 1; 1025 + } else if (dentry_lease_is_valid(dentry) || 1026 + dir_lease_is_valid(dir, dentry)) { 1027 + valid = 1; 1050 1028 } 1051 - if (dentry->d_inode && ceph_snap(dentry->d_inode) == CEPH_SNAPDIR) 1052 - goto out_touch; 1053 1029 1054 - if (dentry_lease_is_valid(dentry) || 1055 - dir_lease_is_valid(dir, dentry)) 1056 - goto out_touch; 1057 - 1058 - dout("d_revalidate %p invalid\n", dentry); 1059 - d_drop(dentry); 1060 - return 0; 1061 - out_touch: 1062 - ceph_dentry_lru_touch(dentry); 1063 - return 1; 1030 + dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1031 + if (valid) 1032 + ceph_dentry_lru_touch(dentry); 1033 + else 1034 + d_drop(dentry); 1035 + iput(dir); 1036 + return valid; 1064 1037 } 1065 1038 1066 1039 /* ··· 1259 1228 * Return name hash for a given dentry. This is dependent on 1260 1229 * the parent directory's hash function. 1261 1230 */ 1262 - unsigned ceph_dentry_hash(struct dentry *dn) 1231 + unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) 1263 1232 { 1264 - struct inode *dir = dn->d_parent->d_inode; 1265 1233 struct ceph_inode_info *dci = ceph_inode(dir); 1266 1234 1267 1235 switch (dci->i_dir_layout.dl_dir_hash) {
+15 -9
fs/ceph/export.c
··· 46 46 int type; 47 47 struct ceph_nfs_fh *fh = (void *)rawfh; 48 48 struct ceph_nfs_confh *cfh = (void *)rawfh; 49 - struct dentry *parent = dentry->d_parent; 49 + struct dentry *parent; 50 50 struct inode *inode = dentry->d_inode; 51 51 int connected_handle_length = sizeof(*cfh)/4; 52 52 int handle_length = sizeof(*fh)/4; ··· 55 55 if (ceph_snap(inode) != CEPH_NOSNAP) 56 56 return -EINVAL; 57 57 58 + spin_lock(&dentry->d_lock); 59 + parent = dget(dentry->d_parent); 60 + spin_unlock(&dentry->d_lock); 61 + 58 62 if (*max_len >= connected_handle_length) { 59 63 dout("encode_fh %p connectable\n", dentry); 60 64 cfh->ino = ceph_ino(dentry->d_inode); 61 65 cfh->parent_ino = ceph_ino(parent->d_inode); 62 - cfh->parent_name_hash = ceph_dentry_hash(parent); 66 + cfh->parent_name_hash = ceph_dentry_hash(parent->d_inode, 67 + dentry); 63 68 *max_len = connected_handle_length; 64 69 type = 2; 65 70 } else if (*max_len >= handle_length) { 66 71 if (connectable) { 67 72 *max_len = connected_handle_length; 68 - return 255; 73 + type = 255; 74 + } else { 75 + dout("encode_fh %p\n", dentry); 76 + fh->ino = ceph_ino(dentry->d_inode); 77 + *max_len = handle_length; 78 + type = 1; 69 79 } 70 - dout("encode_fh %p\n", dentry); 71 - fh->ino = ceph_ino(dentry->d_inode); 72 - *max_len = handle_length; 73 - type = 1; 74 80 } else { 75 81 *max_len = handle_length; 76 - return 255; 82 + type = 255; 77 83 } 84 + dput(parent); 78 85 return type; 79 86 } 80 87 ··· 130 123 return dentry; 131 124 } 132 125 err = ceph_init_dentry(dentry); 133 - 134 126 if (err < 0) { 135 127 iput(inode); 136 128 return ERR_PTR(err);
+45 -16
fs/ceph/file.c
··· 122 122 struct ceph_mds_client *mdsc = fsc->mdsc; 123 123 struct ceph_mds_request *req; 124 124 struct ceph_file_info *cf = file->private_data; 125 - struct inode *parent_inode = file->f_dentry->d_parent->d_inode; 125 + struct inode *parent_inode = NULL; 126 126 int err; 127 127 int flags, fmode, wanted; 128 128 ··· 194 194 req->r_inode = inode; 195 195 ihold(inode); 196 196 req->r_num_caps = 1; 197 + if (flags & (O_CREAT|O_TRUNC)) 198 + parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 197 199 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 200 + iput(parent_inode); 198 201 if (!err) 199 202 err = ceph_init_file(inode, file, req->r_fmode); 200 203 ceph_mdsc_put_request(req); ··· 225 222 { 226 223 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 227 224 struct ceph_mds_client *mdsc = fsc->mdsc; 228 - struct file *file = nd->intent.open.file; 229 - struct inode *parent_inode = get_dentry_parent_inode(file->f_dentry); 225 + struct file *file; 230 226 struct ceph_mds_request *req; 227 + struct dentry *ret; 231 228 int err; 232 229 int flags = nd->intent.open.flags; 233 230 ··· 245 242 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 246 243 } 247 244 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 248 - err = ceph_mdsc_do_request(mdsc, parent_inode, req); 249 - dentry = ceph_finish_lookup(req, dentry, err); 250 - if (!err && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 245 + err = ceph_mdsc_do_request(mdsc, 246 + (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 247 + req); 248 + err = ceph_handle_snapdir(req, dentry, err); 249 + if (err) 250 + goto out; 251 + if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 251 252 err = ceph_handle_notrace_create(dir, dentry); 252 - if (!err) 253 - err = ceph_init_file(req->r_dentry->d_inode, file, 254 - req->r_fmode); 253 + if (err) 254 + goto out; 255 + file = lookup_instantiate_filp(nd, req->r_dentry, ceph_open); 256 + if (IS_ERR(file)) 257 + err = PTR_ERR(file); 258 + out: 259 + ret = ceph_finish_lookup(req, dentry, err); 255 260 ceph_mdsc_put_request(req); 256 - dout("ceph_lookup_open result=%p\n", dentry); 257 - return dentry; 261 + dout("ceph_lookup_open result=%p\n", ret); 262 + return ret; 258 263 } 259 264 260 265 int ceph_release(struct inode *inode, struct file *file) ··· 654 643 655 644 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 656 645 (iocb->ki_filp->f_flags & O_DIRECT) || 657 - (inode->i_sb->s_flags & MS_SYNCHRONOUS)) 646 + (inode->i_sb->s_flags & MS_SYNCHRONOUS) || 647 + (fi->flags & CEPH_F_SYNC)) 658 648 /* hmm, this isn't really async... */ 659 649 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 660 650 else ··· 724 712 want = CEPH_CAP_FILE_BUFFER; 725 713 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); 726 714 if (ret < 0) 727 - goto out; 715 + goto out_put; 728 716 729 717 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n", 730 718 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, ··· 732 720 733 721 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 734 722 (iocb->ki_filp->f_flags & O_DIRECT) || 735 - (inode->i_sb->s_flags & MS_SYNCHRONOUS)) { 723 + (inode->i_sb->s_flags & MS_SYNCHRONOUS) || 724 + (fi->flags & CEPH_F_SYNC)) { 736 725 ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, 737 726 &iocb->ki_pos); 738 727 } else { 739 - ret = generic_file_aio_write(iocb, iov, nr_segs, pos); 728 + /* 729 + * buffered write; drop Fw early to avoid slow 730 + * revocation if we get stuck on balance_dirty_pages 731 + */ 732 + int dirty; 740 733 734 + spin_lock(&inode->i_lock); 735 + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 736 + spin_unlock(&inode->i_lock); 737 + ceph_put_cap_refs(ci, got); 738 + 739 + ret = generic_file_aio_write(iocb, iov, nr_segs, pos); 741 740 if ((ret >= 0 || ret == -EIOCBQUEUED) && 742 741 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) 743 742 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { ··· 756 733 if (err < 0) 757 734 ret = err; 758 735 } 736 + 737 + if (dirty) 738 + __mark_inode_dirty(inode, dirty); 739 + goto out; 759 740 } 741 + 760 742 if (ret >= 0) { 761 743 int dirty; 762 744 spin_lock(&inode->i_lock); ··· 771 743 __mark_inode_dirty(inode, dirty); 772 744 } 773 745 774 - out: 746 + out_put: 775 747 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", 776 748 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 777 749 ceph_cap_string(got)); 778 750 ceph_put_cap_refs(ci, got); 779 751 752 + out: 780 753 if (ret == -EOLDSNAPC) { 781 754 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", 782 755 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
+27 -21
fs/ceph/inode.c
··· 560 560 struct ceph_mds_reply_inode *info = iinfo->in; 561 561 struct ceph_inode_info *ci = ceph_inode(inode); 562 562 int i; 563 - int issued, implemented; 563 + int issued = 0, implemented; 564 + int updating_inode = 0; 564 565 struct timespec mtime, atime, ctime; 565 566 u32 nsplits; 566 567 struct ceph_buffer *xattr_blob = NULL; ··· 600 599 if (le64_to_cpu(info->version) > 0 && 601 600 (ci->i_version & ~1) >= le64_to_cpu(info->version)) 602 601 goto no_change; 603 - 602 + 603 + updating_inode = 1; 604 604 issued = __ceph_caps_issued(ci, &implemented); 605 605 issued |= implemented | __ceph_caps_dirty(ci); 606 606 ··· 709 707 ci->i_rfiles = le64_to_cpu(info->rfiles); 710 708 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs); 711 709 ceph_decode_timespec(&ci->i_rctime, &info->rctime); 712 - 713 - /* set dir completion flag? */ 714 - if (ci->i_files == 0 && ci->i_subdirs == 0 && 715 - ceph_snap(inode) == CEPH_NOSNAP && 716 - (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && 717 - (issued & CEPH_CAP_FILE_EXCL) == 0 && 718 - (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { 719 - dout(" marking %p complete (empty)\n", inode); 720 - /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */ 721 - ci->i_max_offset = 2; 722 - } 723 710 break; 724 711 default: 725 712 pr_err("fill_inode %llx.%llx BAD mode 0%o\n", ··· 765 774 __ceph_get_fmode(ci, cap_fmode); 766 775 } 767 776 777 + /* set dir completion flag? */ 778 + if (S_ISDIR(inode->i_mode) && 779 + updating_inode && /* didn't jump to no_change */ 780 + ci->i_files == 0 && ci->i_subdirs == 0 && 781 + ceph_snap(inode) == CEPH_NOSNAP && 782 + (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && 783 + (issued & CEPH_CAP_FILE_EXCL) == 0 && 784 + (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { 785 + dout(" marking %p complete (empty)\n", inode); 786 + /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */ 787 + ci->i_max_offset = 2; 788 + } 789 + 768 790 /* update delegation info? */ 769 791 if (dirinfo) 770 792 ceph_fill_dirfrag(inode, dirinfo); ··· 809 805 return; 810 806 811 807 spin_lock(&dentry->d_lock); 812 - dout("update_dentry_lease %p mask %d duration %lu ms ttl %lu\n", 813 - dentry, le16_to_cpu(lease->mask), duration, ttl); 808 + dout("update_dentry_lease %p duration %lu ms ttl %lu\n", 809 + dentry, duration, ttl); 814 810 815 811 /* make lease_rdcache_gen match directory */ 816 812 dir = dentry->d_parent->d_inode; 817 813 di->lease_shared_gen = ceph_inode(dir)->i_shared_gen; 818 814 819 - if (lease->mask == 0) 815 + if (duration == 0) 820 816 goto out_unlock; 821 817 822 818 if (di->lease_gen == session->s_cap_gen && ··· 843 839 /* 844 840 * Set dentry's directory position based on the current dir's max, and 845 841 * order it in d_subdirs, so that dcache_readdir behaves. 842 + * 843 + * Always called under directory's i_mutex. 846 844 */ 847 845 static void ceph_set_dentry_offset(struct dentry *dn) 848 846 { 849 847 struct dentry *dir = dn->d_parent; 850 - struct inode *inode = dn->d_parent->d_inode; 848 + struct inode *inode = dir->d_inode; 851 849 struct ceph_dentry_info *di; 852 850 853 851 BUG_ON(!inode); ··· 1028 1022 1029 1023 /* do we have a dn lease? */ 1030 1024 have_lease = have_dir_cap || 1031 - (le16_to_cpu(rinfo->dlease->mask) & 1032 - CEPH_LOCK_DN); 1033 - 1025 + le32_to_cpu(rinfo->dlease->duration_ms); 1034 1026 if (!have_lease) 1035 1027 dout("fill_trace no dentry lease or dir cap\n"); 1036 1028 ··· 1564 1560 { 1565 1561 struct inode *inode = dentry->d_inode; 1566 1562 struct ceph_inode_info *ci = ceph_inode(inode); 1567 - struct inode *parent_inode = dentry->d_parent->d_inode; 1563 + struct inode *parent_inode; 1568 1564 const unsigned int ia_valid = attr->ia_valid; 1569 1565 struct ceph_mds_request *req; 1570 1566 struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; ··· 1747 1743 req->r_inode_drop = release; 1748 1744 req->r_args.setattr.mask = cpu_to_le32(mask); 1749 1745 req->r_num_caps = 1; 1746 + parent_inode = ceph_get_dentry_parent_inode(dentry); 1750 1747 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 1748 + iput(parent_inode); 1751 1749 } 1752 1750 dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, 1753 1751 ceph_cap_string(dirtied), mask);
+14 -1
fs/ceph/ioctl.c
··· 38 38 static long ceph_ioctl_set_layout(struct file *file, void __user *arg) 39 39 { 40 40 struct inode *inode = file->f_dentry->d_inode; 41 - struct inode *parent_inode = file->f_dentry->d_parent->d_inode; 41 + struct inode *parent_inode; 42 42 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 43 43 struct ceph_mds_request *req; 44 44 struct ceph_ioctl_layout l; ··· 87 87 req->r_args.setlayout.layout.fl_pg_preferred = 88 88 cpu_to_le32(l.preferred_osd); 89 89 90 + parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 90 91 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 92 + iput(parent_inode); 91 93 ceph_mdsc_put_request(req); 92 94 return err; 93 95 } ··· 233 231 return 0; 234 232 } 235 233 234 + static long ceph_ioctl_syncio(struct file *file) 235 + { 236 + struct ceph_file_info *fi = file->private_data; 237 + 238 + fi->flags |= CEPH_F_SYNC; 239 + return 0; 240 + } 241 + 236 242 long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg) 237 243 { 238 244 dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg); ··· 259 249 260 250 case CEPH_IOC_LAZYIO: 261 251 return ceph_ioctl_lazyio(file); 252 + 253 + case CEPH_IOC_SYNCIO: 254 + return ceph_ioctl_syncio(file); 262 255 } 263 256 264 257 return -ENOTTY;
+1
fs/ceph/ioctl.h
··· 40 40 struct ceph_ioctl_dataloc) 41 41 42 42 #define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4) 43 + #define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5) 43 44 44 45 #endif
+31 -25
fs/ceph/mds_client.c
··· 483 483 destroy_reply_info(&req->r_reply_info); 484 484 } 485 485 if (req->r_inode) { 486 - ceph_put_cap_refs(ceph_inode(req->r_inode), 487 - CEPH_CAP_PIN); 486 + ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 488 487 iput(req->r_inode); 489 488 } 490 489 if (req->r_locked_dir) 491 - ceph_put_cap_refs(ceph_inode(req->r_locked_dir), 492 - CEPH_CAP_PIN); 490 + ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 493 491 if (req->r_target_inode) 494 492 iput(req->r_target_inode); 495 493 if (req->r_dentry) 496 494 dput(req->r_dentry); 497 495 if (req->r_old_dentry) { 498 - ceph_put_cap_refs( 499 - ceph_inode(req->r_old_dentry->d_parent->d_inode), 500 - CEPH_CAP_PIN); 496 + /* 497 + * track (and drop pins for) r_old_dentry_dir 498 + * separately, since r_old_dentry's d_parent may have 499 + * changed between the dir mutex being dropped and 500 + * this request being freed. 501 + */ 502 + ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 503 + CEPH_CAP_PIN); 501 504 dput(req->r_old_dentry); 505 + iput(req->r_old_dentry_dir); 502 506 } 503 507 kfree(req->r_path1); 504 508 kfree(req->r_path2); ··· 621 617 */ 622 618 struct dentry *get_nonsnap_parent(struct dentry *dentry) 623 619 { 620 + /* 621 + * we don't need to worry about protecting the d_parent access 622 + * here because we never renaming inside the snapped namespace 623 + * except to resplice to another snapdir, and either the old or new 624 + * result is a valid result. 625 + */ 624 626 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 625 627 dentry = dentry->d_parent; 626 628 return dentry; ··· 662 652 if (req->r_inode) { 663 653 inode = req->r_inode; 664 654 } else if (req->r_dentry) { 665 - struct inode *dir = req->r_dentry->d_parent->d_inode; 655 + /* ignore race with rename; old or new d_parent is okay */ 656 + struct dentry *parent = req->r_dentry->d_parent; 657 + struct inode *dir = parent->d_inode; 666 658 667 659 if (dir->i_sb != mdsc->fsc->sb) { 668 660 /* not this fs! */ ··· 672 660 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 673 661 /* direct snapped/virtual snapdir requests 674 662 * based on parent dir inode */ 675 - struct dentry *dn = 676 - get_nonsnap_parent(req->r_dentry->d_parent); 663 + struct dentry *dn = get_nonsnap_parent(parent); 677 664 inode = dn->d_inode; 678 665 dout("__choose_mds using nonsnap parent %p\n", inode); 679 666 } else if (req->r_dentry->d_inode) { ··· 681 670 } else { 682 671 /* dir + name */ 683 672 inode = dir; 684 - hash = ceph_dentry_hash(req->r_dentry); 673 + hash = ceph_dentry_hash(dir, req->r_dentry); 685 674 is_hash = true; 686 675 } 687 676 } ··· 1942 1931 if (req->r_locked_dir) 1943 1932 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 1944 1933 if (req->r_old_dentry) 1945 - ceph_get_cap_refs( 1946 - ceph_inode(req->r_old_dentry->d_parent->d_inode), 1947 - CEPH_CAP_PIN); 1934 + ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 1935 + CEPH_CAP_PIN); 1948 1936 1949 1937 /* issue */ 1950 1938 mutex_lock(&mdsc->mutex); ··· 2724 2714 struct ceph_mds_lease *h = msg->front.iov_base; 2725 2715 u32 seq; 2726 2716 struct ceph_vino vino; 2727 - int mask; 2728 2717 struct qstr dname; 2729 2718 int release = 0; 2730 2719 ··· 2734 2725 goto bad; 2735 2726 vino.ino = le64_to_cpu(h->ino); 2736 2727 vino.snap = CEPH_NOSNAP; 2737 - mask = le16_to_cpu(h->mask); 2738 2728 seq = le32_to_cpu(h->seq); 2739 2729 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2740 2730 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); ··· 2745 2737 2746 2738 /* lookup inode */ 2747 2739 inode = ceph_find_inode(sb, vino); 2748 - dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", 2749 - ceph_lease_op_name(h->action), mask, vino.ino, inode, 2740 + dout("handle_lease %s, ino %llx %p %.*s\n", 2741 + ceph_lease_op_name(h->action), vino.ino, inode, 2750 2742 dname.len, dname.name); 2751 2743 if (inode == NULL) { 2752 2744 dout("handle_lease no inode %llx\n", vino.ino); ··· 2836 2828 return; 2837 2829 lease = msg->front.iov_base; 2838 2830 lease->action = action; 2839 - lease->mask = cpu_to_le16(1); 2840 2831 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2841 2832 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2842 2833 lease->seq = cpu_to_le32(seq); ··· 2857 2850 * Pass @inode always, @dentry is optional. 2858 2851 */ 2859 2852 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2860 - struct dentry *dentry, int mask) 2853 + struct dentry *dentry) 2861 2854 { 2862 2855 struct ceph_dentry_info *di; 2863 2856 struct ceph_mds_session *session; ··· 2865 2858 2866 2859 BUG_ON(inode == NULL); 2867 2860 BUG_ON(dentry == NULL); 2868 - BUG_ON(mask == 0); 2869 2861 2870 2862 /* is dentry lease valid? */ 2871 2863 spin_lock(&dentry->d_lock); ··· 2874 2868 di->lease_gen != di->lease_session->s_cap_gen || 2875 2869 !time_before(jiffies, dentry->d_time)) { 2876 2870 dout("lease_release inode %p dentry %p -- " 2877 - "no lease on %d\n", 2878 - inode, dentry, mask); 2871 + "no lease\n", 2872 + inode, dentry); 2879 2873 spin_unlock(&dentry->d_lock); 2880 2874 return; 2881 2875 } ··· 2886 2880 __ceph_mdsc_drop_dentry_lease(dentry); 2887 2881 spin_unlock(&dentry->d_lock); 2888 2882 2889 - dout("lease_release inode %p dentry %p mask %d to mds%d\n", 2890 - inode, dentry, mask, session->s_mds); 2883 + dout("lease_release inode %p dentry %p to mds%d\n", 2884 + inode, dentry, session->s_mds); 2891 2885 ceph_mdsc_lease_send_msg(session, inode, dentry, 2892 2886 CEPH_MDS_LEASE_RELEASE, seq); 2893 2887 ceph_put_mds_session(session);
+2 -1
fs/ceph/mds_client.h
··· 171 171 struct inode *r_inode; /* arg1 */ 172 172 struct dentry *r_dentry; /* arg1 */ 173 173 struct dentry *r_old_dentry; /* arg2: rename from or link from */ 174 + struct inode *r_old_dentry_dir; /* arg2: old dentry's parent dir */ 174 175 char *r_path1, *r_path2; 175 176 struct ceph_vino r_ino1, r_ino2; 176 177 ··· 334 333 335 334 extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, 336 335 struct inode *inode, 337 - struct dentry *dn, int mask); 336 + struct dentry *dn); 338 337 339 338 extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); 340 339
+20 -5
fs/ceph/snap.c
··· 449 449 spin_lock(&inode->i_lock); 450 450 used = __ceph_caps_used(ci); 451 451 dirty = __ceph_caps_dirty(ci); 452 + 453 + /* 454 + * If there is a write in progress, treat that as a dirty Fw, 455 + * even though it hasn't completed yet; by the time we finish 456 + * up this capsnap it will be. 457 + */ 458 + if (used & CEPH_CAP_FILE_WR) 459 + dirty |= CEPH_CAP_FILE_WR; 460 + 452 461 if (__ceph_have_pending_cap_snap(ci)) { 453 462 /* there is no point in queuing multiple "pending" cap_snaps, 454 463 as no new writes are allowed to start when pending, so any ··· 465 456 cap_snap. lucky us. */ 466 457 dout("queue_cap_snap %p already pending\n", inode); 467 458 kfree(capsnap); 468 - } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) || 469 - (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| 470 - CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) { 459 + } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| 460 + CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { 471 461 struct ceph_snap_context *snapc = ci->i_head_snapc; 472 462 473 - dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode, 474 - capsnap, snapc); 463 + /* 464 + * if we are a sync write, we may need to go to the snaprealm 465 + * to get the current snapc. 466 + */ 467 + if (!snapc) 468 + snapc = ci->i_snap_realm->cached_context; 469 + 470 + dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", 471 + inode, capsnap, snapc, ceph_cap_string(dirty)); 475 472 ihold(inode); 476 473 477 474 atomic_set(&capsnap->nref, 1);
+5 -2
fs/ceph/super.c
··· 73 73 */ 74 74 buf->f_bsize = 1 << CEPH_BLOCK_SHIFT; 75 75 buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10); 76 - buf->f_bfree = (le64_to_cpu(st.kb) - le64_to_cpu(st.kb_used)) >> 77 - (CEPH_BLOCK_SHIFT-10); 76 + buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); 78 77 buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); 79 78 80 79 buf->f_files = le64_to_cpu(st.num_objects); ··· 779 780 fsc->backing_dev_info.ra_pages = 780 781 (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) 781 782 >> PAGE_SHIFT; 783 + else 784 + fsc->backing_dev_info.ra_pages = 785 + default_backing_dev_info.ra_pages; 786 + 782 787 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d", 783 788 atomic_long_inc_return(&bdi_seq)); 784 789 if (!err)
+9 -11
fs/ceph/super.h
··· 543 543 /* 544 544 * we keep buffered readdir results attached to file->private_data 545 545 */ 546 + #define CEPH_F_SYNC 1 547 + #define CEPH_F_ATEND 2 548 + 546 549 struct ceph_file_info { 547 - int fmode; /* initialized on open */ 550 + short fmode; /* initialized on open */ 551 + short flags; /* CEPH_F_* */ 548 552 549 553 /* readdir: position within the dir */ 550 554 u32 frag; 551 555 struct ceph_mds_request *last_readdir; 552 - int at_end; 553 556 554 557 /* readdir: position within a frag */ 555 558 unsigned offset; /* offset of last chunk, adjusted for . and .. */ ··· 792 789 ceph_snapdir_dentry_ops; 793 790 794 791 extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry); 792 + extern int ceph_handle_snapdir(struct ceph_mds_request *req, 793 + struct dentry *dentry, int err); 795 794 extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 796 795 struct dentry *dentry, int err); 797 796 ··· 801 796 extern void ceph_dentry_lru_touch(struct dentry *dn); 802 797 extern void ceph_dentry_lru_del(struct dentry *dn); 803 798 extern void ceph_invalidate_dentry_lease(struct dentry *dentry); 804 - extern unsigned ceph_dentry_hash(struct dentry *dn); 799 + extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); 800 + extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry); 805 801 806 802 /* 807 803 * our d_ops vary depending on whether the inode is live, ··· 824 818 extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p, 825 819 int p_locks, int f_locks); 826 820 extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c); 827 - 828 - static inline struct inode *get_dentry_parent_inode(struct dentry *dentry) 829 - { 830 - if (dentry && dentry->d_parent) 831 - return dentry->d_parent->d_inode; 832 - 833 - return NULL; 834 - } 835 821 836 822 /* debugfs.c */ 837 823 extern int ceph_fs_debugfs_init(struct ceph_fs_client *client);
+6 -2
fs/ceph/xattr.c
··· 629 629 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 630 630 struct inode *inode = dentry->d_inode; 631 631 struct ceph_inode_info *ci = ceph_inode(inode); 632 - struct inode *parent_inode = dentry->d_parent->d_inode; 632 + struct inode *parent_inode; 633 633 struct ceph_mds_request *req; 634 634 struct ceph_mds_client *mdsc = fsc->mdsc; 635 635 int err; ··· 677 677 req->r_data_len = size; 678 678 679 679 dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); 680 + parent_inode = ceph_get_dentry_parent_inode(dentry); 680 681 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 682 + iput(parent_inode); 681 683 ceph_mdsc_put_request(req); 682 684 dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); 683 685 ··· 790 788 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 791 789 struct ceph_mds_client *mdsc = fsc->mdsc; 792 790 struct inode *inode = dentry->d_inode; 793 - struct inode *parent_inode = dentry->d_parent->d_inode; 791 + struct inode *parent_inode; 794 792 struct ceph_mds_request *req; 795 793 int err; 796 794 ··· 804 802 req->r_num_caps = 1; 805 803 req->r_path2 = kstrdup(name, GFP_NOFS); 806 804 805 + parent_inode = ceph_get_dentry_parent_inode(dentry); 807 806 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 807 + iput(parent_inode); 808 808 ceph_mdsc_put_request(req); 809 809 return err; 810 810 }
+1
include/linux/ceph/messenger.h
··· 94 94 bool more_to_follow; 95 95 bool needs_out_seq; 96 96 int front_max; 97 + unsigned long ack_stamp; /* tx: when we were acked */ 97 98 98 99 struct ceph_msgpool *pool; 99 100 };
+5 -7
net/ceph/messenger.c
··· 486 486 m = list_first_entry(&con->out_queue, 487 487 struct ceph_msg, list_head); 488 488 con->out_msg = m; 489 - if (test_bit(LOSSYTX, &con->state)) { 490 - list_del_init(&m->list_head); 491 - } else { 492 - /* put message on sent list */ 493 - ceph_msg_get(m); 494 - list_move_tail(&m->list_head, &con->out_sent); 495 - } 489 + 490 + /* put message on sent list */ 491 + ceph_msg_get(m); 492 + list_move_tail(&m->list_head, &con->out_sent); 496 493 497 494 /* 498 495 * only assign outgoing seq # if we haven't sent this message ··· 1396 1399 break; 1397 1400 dout("got ack for seq %llu type %d at %p\n", seq, 1398 1401 le16_to_cpu(m->hdr.type), m); 1402 + m->ack_stamp = jiffies; 1399 1403 ceph_msg_remove(m); 1400 1404 } 1401 1405 prepare_read_tag(con);
+6
net/ceph/osd_client.c
··· 1085 1085 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 1086 1086 r_req_lru_item); 1087 1087 1088 + /* hasn't been long enough since we sent it? */ 1088 1089 if (time_before(jiffies, req->r_stamp + timeout)) 1090 + break; 1091 + 1092 + /* hasn't been long enough since it was acked? */ 1093 + if (req->r_request->ack_stamp == 0 || 1094 + time_before(jiffies, req->r_request->ack_stamp + timeout)) 1089 1095 break; 1090 1096 1091 1097 BUG_ON(req == last_req && req->r_stamp == last_stamp);