Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

ceph: wait for the first reply of inflight async unlink

In async unlink case the kclient won't wait for the first reply
from MDS and just drop all the links and unhash the dentry and then
succeeds immediately.

For any new create/link/rename,etc requests followed by using the
same file names we must wait for the first reply of the inflight
unlink request, or the MDS possibly will fail these following
requests with -EEXIST if the inflight async unlink request was
delayed for some reasons.

And the worst case is that for the none async openc request it will
successfully open the file if the CDentry hasn't been unlinked yet,
but later the previous delayed async unlink request will remove the
CDenty. That means the just created file is possiblly deleted later
by accident.

We need to wait for the inflight async unlink requests to finish
when creating new files/directories by using the same file names.

Link: https://tracker.ceph.com/issues/55332
Signed-off-by: Xiubo Li <xiubli@redhat.com>
Reviewed-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>

authored by

Xiubo Li and committed by
Ilya Dryomov
4868e537 4f48d5da

+167 -16
+70 -9
fs/ceph/dir.c
··· 856 856 if (ceph_snap(dir) != CEPH_NOSNAP) 857 857 return -EROFS; 858 858 859 + err = ceph_wait_on_conflict_unlink(dentry); 860 + if (err) 861 + return err; 862 + 859 863 if (ceph_quota_is_max_files_exceeded(dir)) { 860 864 err = -EDQUOT; 861 865 goto out; ··· 922 918 if (ceph_snap(dir) != CEPH_NOSNAP) 923 919 return -EROFS; 924 920 921 + err = ceph_wait_on_conflict_unlink(dentry); 922 + if (err) 923 + return err; 924 + 925 925 if (ceph_quota_is_max_files_exceeded(dir)) { 926 926 err = -EDQUOT; 927 927 goto out; ··· 976 968 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); 977 969 struct ceph_mds_request *req; 978 970 struct ceph_acl_sec_ctx as_ctx = {}; 979 - int err = -EROFS; 971 + int err; 980 972 int op; 973 + 974 + err = ceph_wait_on_conflict_unlink(dentry); 975 + if (err) 976 + return err; 981 977 982 978 if (ceph_snap(dir) == CEPH_SNAPDIR) { 983 979 /* mkdir .snap/foo is a MKSNAP */ ··· 992 980 dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode); 993 981 op = CEPH_MDS_OP_MKDIR; 994 982 } else { 983 + err = -EROFS; 995 984 goto out; 996 985 } 997 986 ··· 1050 1037 struct ceph_mds_request *req; 1051 1038 int err; 1052 1039 1040 + err = ceph_wait_on_conflict_unlink(dentry); 1041 + if (err) 1042 + return err; 1043 + 1053 1044 if (ceph_snap(dir) != CEPH_NOSNAP) 1054 1045 return -EROFS; 1055 1046 ··· 1088 1071 static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, 1089 1072 struct ceph_mds_request *req) 1090 1073 { 1074 + struct dentry *dentry = req->r_dentry; 1075 + struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 1076 + struct ceph_dentry_info *di = ceph_dentry(dentry); 1091 1077 int result = req->r_err ? req->r_err : 1092 1078 le32_to_cpu(req->r_reply_info.head->result); 1079 + 1080 + if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 1081 + pr_warn("%s dentry %p:%pd async unlink bit is not set\n", 1082 + __func__, dentry, dentry); 1083 + 1084 + spin_lock(&fsc->async_unlink_conflict_lock); 1085 + hash_del_rcu(&di->hnode); 1086 + spin_unlock(&fsc->async_unlink_conflict_lock); 1087 + 1088 + spin_lock(&dentry->d_lock); 1089 + di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK; 1090 + wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT); 1091 + spin_unlock(&dentry->d_lock); 1092 + 1093 + synchronize_rcu(); 1093 1094 1094 1095 if (result == -EJUKEBOX) 1095 1096 goto out; ··· 1116 1081 if (result) { 1117 1082 int pathlen = 0; 1118 1083 u64 base = 0; 1119 - char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, 1084 + char *path = ceph_mdsc_build_path(dentry, &pathlen, 1120 1085 &base, 0); 1121 1086 1122 1087 /* mark error on parent + clear complete */ ··· 1124 1089 ceph_dir_clear_complete(req->r_parent); 1125 1090 1126 1091 /* drop the dentry -- we don't know its status */ 1127 - if (!d_unhashed(req->r_dentry)) 1128 - d_drop(req->r_dentry); 1092 + if (!d_unhashed(dentry)) 1093 + d_drop(dentry); 1129 1094 1130 1095 /* mark inode itself for an error (since metadata is bogus) */ 1131 1096 mapping_set_error(req->r_old_inode->i_mapping, result); 1132 1097 1133 - pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n", 1098 + pr_warn("async unlink failure path=(%llx)%s result=%d!\n", 1134 1099 base, IS_ERR(path) ? "<<bad>>" : path, result); 1135 1100 ceph_mdsc_free_path(path, pathlen); 1136 1101 } ··· 1215 1180 1216 1181 if (try_async && op == CEPH_MDS_OP_UNLINK && 1217 1182 (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) { 1183 + struct ceph_dentry_info *di = ceph_dentry(dentry); 1184 + 1218 1185 dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir), 1219 1186 dentry->d_name.len, dentry->d_name.name, 1220 1187 ceph_cap_string(req->r_dir_caps)); ··· 1224 1187 req->r_callback = ceph_async_unlink_cb; 1225 1188 req->r_old_inode = d_inode(dentry); 1226 1189 ihold(req->r_old_inode); 1190 + 1191 + spin_lock(&dentry->d_lock); 1192 + di->flags |= CEPH_DENTRY_ASYNC_UNLINK; 1193 + spin_unlock(&dentry->d_lock); 1194 + 1195 + spin_lock(&fsc->async_unlink_conflict_lock); 1196 + hash_add_rcu(fsc->async_unlink_conflict, &di->hnode, 1197 + dentry->d_name.hash); 1198 + spin_unlock(&fsc->async_unlink_conflict_lock); 1199 + 1227 1200 err = ceph_mdsc_submit_request(mdsc, dir, req); 1228 1201 if (!err) { 1229 1202 /* ··· 1242 1195 */ 1243 1196 drop_nlink(inode); 1244 1197 d_delete(dentry); 1245 - } else if (err == -EJUKEBOX) { 1246 - try_async = false; 1247 - ceph_mdsc_put_request(req); 1248 - goto retry; 1198 + } else { 1199 + spin_lock(&fsc->async_unlink_conflict_lock); 1200 + hash_del_rcu(&di->hnode); 1201 + spin_unlock(&fsc->async_unlink_conflict_lock); 1202 + 1203 + spin_lock(&dentry->d_lock); 1204 + di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK; 1205 + spin_unlock(&dentry->d_lock); 1206 + 1207 + if (err == -EJUKEBOX) { 1208 + try_async = false; 1209 + ceph_mdsc_put_request(req); 1210 + goto retry; 1211 + } 1249 1212 } 1250 1213 } else { 1251 1214 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); ··· 1293 1236 if ((old_dir != new_dir) && 1294 1237 (!ceph_quota_is_same_realm(old_dir, new_dir))) 1295 1238 return -EXDEV; 1239 + 1240 + err = ceph_wait_on_conflict_unlink(new_dentry); 1241 + if (err) 1242 + return err; 1296 1243 1297 1244 dout("rename dir %p dentry %p to dir %p dentry %p\n", 1298 1245 old_dir, old_dentry, new_dir, new_dentry);
+5 -1
fs/ceph/file.c
··· 569 569 char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, 570 570 &base, 0); 571 571 572 - pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n", 572 + pr_warn("async create failure path=(%llx)%s result=%d!\n", 573 573 base, IS_ERR(path) ? "<<bad>>" : path, result); 574 574 ceph_mdsc_free_path(path, pathlen); 575 575 ··· 739 739 740 740 if (dentry->d_name.len > NAME_MAX) 741 741 return -ENAMETOOLONG; 742 + 743 + err = ceph_wait_on_conflict_unlink(dentry); 744 + if (err) 745 + return err; 742 746 743 747 if (flags & O_CREAT) { 744 748 if (ceph_quota_is_max_files_exceeded(dir))
+74 -1
fs/ceph/mds_client.c
··· 456 456 dout("added delegated inode 0x%llx\n", 457 457 start - 1); 458 458 } else if (err == -EBUSY) { 459 - pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 459 + pr_warn("MDS delegated inode 0x%llx more than once.\n", 460 460 start - 1); 461 461 } else { 462 462 return err; ··· 653 653 if (!info->dir_entries) 654 654 return; 655 655 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 656 + } 657 + 658 + /* 659 + * In async unlink case the kclient won't wait for the first reply 660 + * from MDS and just drop all the links and unhash the dentry and then 661 + * succeeds immediately. 662 + * 663 + * For any new create/link/rename,etc requests followed by using the 664 + * same file names we must wait for the first reply of the inflight 665 + * unlink request, or the MDS possibly will fail these following 666 + * requests with -EEXIST if the inflight async unlink request was 667 + * delayed for some reasons. 668 + * 669 + * And the worst case is that for the none async openc request it will 670 + * successfully open the file if the CDentry hasn't been unlinked yet, 671 + * but later the previous delayed async unlink request will remove the 672 + * CDenty. That means the just created file is possiblly deleted later 673 + * by accident. 674 + * 675 + * We need to wait for the inflight async unlink requests to finish 676 + * when creating new files/directories by using the same file names. 677 + */ 678 + int ceph_wait_on_conflict_unlink(struct dentry *dentry) 679 + { 680 + struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 681 + struct dentry *pdentry = dentry->d_parent; 682 + struct dentry *udentry, *found = NULL; 683 + struct ceph_dentry_info *di; 684 + struct qstr dname; 685 + u32 hash = dentry->d_name.hash; 686 + int err; 687 + 688 + dname.name = dentry->d_name.name; 689 + dname.len = dentry->d_name.len; 690 + 691 + rcu_read_lock(); 692 + hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 693 + hnode, hash) { 694 + udentry = di->dentry; 695 + 696 + spin_lock(&udentry->d_lock); 697 + if (udentry->d_name.hash != hash) 698 + goto next; 699 + if (unlikely(udentry->d_parent != pdentry)) 700 + goto next; 701 + if (!hash_hashed(&di->hnode)) 702 + goto next; 703 + 704 + if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 705 + pr_warn("%s dentry %p:%pd async unlink bit is not set\n", 706 + __func__, dentry, dentry); 707 + 708 + if (!d_same_name(udentry, pdentry, &dname)) 709 + goto next; 710 + 711 + spin_unlock(&udentry->d_lock); 712 + found = dget(udentry); 713 + break; 714 + next: 715 + spin_unlock(&udentry->d_lock); 716 + } 717 + rcu_read_unlock(); 718 + 719 + if (likely(!found)) 720 + return 0; 721 + 722 + dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__, 723 + dentry, dentry, found, found); 724 + 725 + err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 726 + TASK_KILLABLE); 727 + dput(found); 728 + return err; 656 729 } 657 730 658 731
+1
fs/ceph/mds_client.h
··· 575 575 TASK_KILLABLE); 576 576 } 577 577 578 + extern int ceph_wait_on_conflict_unlink(struct dentry *dentry); 578 579 extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session); 579 580 extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino); 580 581 #endif
+3
fs/ceph/super.c
··· 816 816 if (!fsc->cap_wq) 817 817 goto fail_inode_wq; 818 818 819 + hash_init(fsc->async_unlink_conflict); 820 + spin_lock_init(&fsc->async_unlink_conflict_lock); 821 + 819 822 spin_lock(&ceph_fsc_lock); 820 823 list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list); 821 824 spin_unlock(&ceph_fsc_lock);
+14 -5
fs/ceph/super.h
··· 19 19 #include <linux/security.h> 20 20 #include <linux/netfs.h> 21 21 #include <linux/fscache.h> 22 + #include <linux/hashtable.h> 22 23 23 24 #include <linux/ceph/libceph.h> 24 25 ··· 100 99 char *mon_addr; 101 100 }; 102 101 102 + #define CEPH_ASYNC_CREATE_CONFLICT_BITS 8 103 + 103 104 struct ceph_fs_client { 104 105 struct super_block *sb; 105 106 ··· 126 123 127 124 struct workqueue_struct *inode_wq; 128 125 struct workqueue_struct *cap_wq; 126 + 127 + DECLARE_HASHTABLE(async_unlink_conflict, CEPH_ASYNC_CREATE_CONFLICT_BITS); 128 + spinlock_t async_unlink_conflict_lock; 129 129 130 130 #ifdef CONFIG_DEBUG_FS 131 131 struct dentry *debugfs_dentry_lru, *debugfs_caps; ··· 286 280 struct dentry *dentry; 287 281 struct ceph_mds_session *lease_session; 288 282 struct list_head lease_list; 289 - unsigned flags; 283 + struct hlist_node hnode; 284 + unsigned long flags; 290 285 int lease_shared_gen; 291 286 u32 lease_gen; 292 287 u32 lease_seq; ··· 296 289 u64 offset; 297 290 }; 298 291 299 - #define CEPH_DENTRY_REFERENCED 1 300 - #define CEPH_DENTRY_LEASE_LIST 2 301 - #define CEPH_DENTRY_SHRINK_LIST 4 302 - #define CEPH_DENTRY_PRIMARY_LINK 8 292 + #define CEPH_DENTRY_REFERENCED (1 << 0) 293 + #define CEPH_DENTRY_LEASE_LIST (1 << 1) 294 + #define CEPH_DENTRY_SHRINK_LIST (1 << 2) 295 + #define CEPH_DENTRY_PRIMARY_LINK (1 << 3) 296 + #define CEPH_DENTRY_ASYNC_UNLINK_BIT (4) 297 + #define CEPH_DENTRY_ASYNC_UNLINK (1 << CEPH_DENTRY_ASYNC_UNLINK_BIT) 303 298 304 299 struct ceph_inode_xattrs_info { 305 300 /*