Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

ceph: include the initial ACL in create/mkdir/mknod MDS requests

Current code set new file/directory's initial ACL in a non-atomic
manner.
Client first sends request to MDS to create new file/directory, then set
the initial ACL after the new file/directory is successfully created.

The fix is include the initial ACL in create/mkdir/mknod MDS requests.
So MDS can handle creating file/directory and setting the initial ACL in
one request.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
Reviewed-by: Sage Weil <sage@redhat.com>

authored by

Yan, Zheng and committed by
Sage Weil
b1ee94aa 25e6bae3

+171 -48
+100 -27
fs/ceph/acl.c
··· 169 169 return ret; 170 170 } 171 171 172 - int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) 172 + int ceph_pre_init_acls(struct inode *dir, umode_t *mode, 173 + struct ceph_acls_info *info) 173 174 { 174 - struct posix_acl *default_acl, *acl; 175 - umode_t new_mode = inode->i_mode; 176 - int error; 175 + struct posix_acl *acl, *default_acl; 176 + size_t val_size1 = 0, val_size2 = 0; 177 + struct ceph_pagelist *pagelist = NULL; 178 + void *tmp_buf = NULL; 179 + int err; 177 180 178 - error = posix_acl_create(dir, &new_mode, &default_acl, &acl); 179 - if (error) 180 - return error; 181 + err = posix_acl_create(dir, mode, &default_acl, &acl); 182 + if (err) 183 + return err; 181 184 182 - if (!default_acl && !acl) { 183 - cache_no_acl(inode); 184 - if (new_mode != inode->i_mode) { 185 - struct iattr newattrs = { 186 - .ia_mode = new_mode, 187 - .ia_valid = ATTR_MODE, 188 - }; 189 - error = ceph_setattr(dentry, &newattrs); 190 - } 191 - return error; 192 - } 193 - 194 - if (default_acl) { 195 - error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT); 196 - posix_acl_release(default_acl); 197 - } 198 185 if (acl) { 199 - if (!error) 200 - error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS); 201 - posix_acl_release(acl); 186 + int ret = posix_acl_equiv_mode(acl, mode); 187 + if (ret < 0) 188 + goto out_err; 189 + if (ret == 0) { 190 + posix_acl_release(acl); 191 + acl = NULL; 192 + } 202 193 } 203 - return error; 194 + 195 + if (!default_acl && !acl) 196 + return 0; 197 + 198 + if (acl) 199 + val_size1 = posix_acl_xattr_size(acl->a_count); 200 + if (default_acl) 201 + val_size2 = posix_acl_xattr_size(default_acl->a_count); 202 + 203 + err = -ENOMEM; 204 + tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS); 205 + if (!tmp_buf) 206 + goto out_err; 207 + pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS); 208 + if (!pagelist) 209 + goto out_err; 210 + ceph_pagelist_init(pagelist); 211 + 212 + err = ceph_pagelist_reserve(pagelist, PAGE_SIZE); 213 + if (err) 214 + goto out_err; 215 + 216 + ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1); 217 + 218 + if (acl) { 219 + size_t len = strlen(POSIX_ACL_XATTR_ACCESS); 220 + err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8); 221 + if (err) 222 + goto out_err; 223 + ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS, 224 + len); 225 + err = posix_acl_to_xattr(&init_user_ns, acl, 226 + tmp_buf, val_size1); 227 + if (err < 0) 228 + goto out_err; 229 + ceph_pagelist_encode_32(pagelist, val_size1); 230 + ceph_pagelist_append(pagelist, tmp_buf, val_size1); 231 + } 232 + if (default_acl) { 233 + size_t len = strlen(POSIX_ACL_XATTR_DEFAULT); 234 + err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8); 235 + if (err) 236 + goto out_err; 237 + err = ceph_pagelist_encode_string(pagelist, 238 + POSIX_ACL_XATTR_DEFAULT, len); 239 + err = posix_acl_to_xattr(&init_user_ns, default_acl, 240 + tmp_buf, val_size2); 241 + if (err < 0) 242 + goto out_err; 243 + ceph_pagelist_encode_32(pagelist, val_size2); 244 + ceph_pagelist_append(pagelist, tmp_buf, val_size2); 245 + } 246 + 247 + kfree(tmp_buf); 248 + 249 + info->acl = acl; 250 + info->default_acl = default_acl; 251 + info->pagelist = pagelist; 252 + return 0; 253 + 254 + out_err: 255 + posix_acl_release(acl); 256 + posix_acl_release(default_acl); 257 + kfree(tmp_buf); 258 + if (pagelist) 259 + ceph_pagelist_release(pagelist); 260 + return err; 261 + } 262 + 263 + void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info) 264 + { 265 + if (!inode) 266 + return; 267 + ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl); 268 + ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl); 269 + } 270 + 271 + void ceph_release_acls_info(struct ceph_acls_info *info) 272 + { 273 + posix_acl_release(info->acl); 274 + posix_acl_release(info->default_acl); 275 + if (info->pagelist) 276 + ceph_pagelist_release(info->pagelist); 204 277 }
+31 -10
fs/ceph/dir.c
··· 682 682 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 683 683 struct ceph_mds_client *mdsc = fsc->mdsc; 684 684 struct ceph_mds_request *req; 685 + struct ceph_acls_info acls = {}; 685 686 int err; 686 687 687 688 if (ceph_snap(dir) != CEPH_NOSNAP) 688 689 return -EROFS; 689 690 691 + err = ceph_pre_init_acls(dir, &mode, &acls); 692 + if (err < 0) 693 + return err; 694 + 690 695 dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", 691 696 dir, dentry, mode, rdev); 692 697 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); 693 698 if (IS_ERR(req)) { 694 - d_drop(dentry); 695 - return PTR_ERR(req); 699 + err = PTR_ERR(req); 700 + goto out; 696 701 } 697 702 req->r_dentry = dget(dentry); 698 703 req->r_num_caps = 2; ··· 706 701 req->r_args.mknod.rdev = cpu_to_le32(rdev); 707 702 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 708 703 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 704 + if (acls.pagelist) { 705 + req->r_pagelist = acls.pagelist; 706 + acls.pagelist = NULL; 707 + } 709 708 err = ceph_mdsc_do_request(mdsc, dir, req); 710 709 if (!err && !req->r_reply_info.head->is_dentry) 711 710 err = ceph_handle_notrace_create(dir, dentry); 712 711 ceph_mdsc_put_request(req); 713 - 712 + out: 714 713 if (!err) 715 - ceph_init_acl(dentry, dentry->d_inode, dir); 714 + ceph_init_inode_acls(dentry->d_inode, &acls); 716 715 else 717 716 d_drop(dentry); 717 + ceph_release_acls_info(&acls); 718 718 return err; 719 719 } 720 720 ··· 743 733 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); 744 734 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); 745 735 if (IS_ERR(req)) { 746 - d_drop(dentry); 747 - return PTR_ERR(req); 736 + err = PTR_ERR(req); 737 + goto out; 748 738 } 749 739 req->r_dentry = dget(dentry); 750 740 req->r_num_caps = 2; ··· 756 746 if (!err && !req->r_reply_info.head->is_dentry) 757 747 err = ceph_handle_notrace_create(dir, dentry); 758 748 ceph_mdsc_put_request(req); 759 - if (!err) 760 - ceph_init_acl(dentry, dentry->d_inode, dir); 761 - else 749 + out: 750 + if (err) 762 751 d_drop(dentry); 763 752 return err; 764 753 } ··· 767 758 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 768 759 struct ceph_mds_client *mdsc = fsc->mdsc; 769 760 struct ceph_mds_request *req; 761 + struct ceph_acls_info acls = {}; 770 762 int err = -EROFS; 771 763 int op; 772 764 ··· 782 772 } else { 783 773 goto out; 784 774 } 775 + 776 + mode |= S_IFDIR; 777 + err = ceph_pre_init_acls(dir, &mode, &acls); 778 + if (err < 0) 779 + goto out; 780 + 785 781 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 786 782 if (IS_ERR(req)) { 787 783 err = PTR_ERR(req); ··· 800 784 req->r_args.mkdir.mode = cpu_to_le32(mode); 801 785 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 802 786 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 787 + if (acls.pagelist) { 788 + req->r_pagelist = acls.pagelist; 789 + acls.pagelist = NULL; 790 + } 803 791 err = ceph_mdsc_do_request(mdsc, dir, req); 804 792 if (!err && !req->r_reply_info.head->is_dentry) 805 793 err = ceph_handle_notrace_create(dir, dentry); 806 794 ceph_mdsc_put_request(req); 807 795 out: 808 796 if (!err) 809 - ceph_init_acl(dentry, dentry->d_inode, dir); 797 + ceph_init_inode_acls(dentry->d_inode, &acls); 810 798 else 811 799 d_drop(dentry); 800 + ceph_release_acls_info(&acls); 812 801 return err; 813 802 } 814 803
+21 -6
fs/ceph/file.c
··· 235 235 struct ceph_mds_client *mdsc = fsc->mdsc; 236 236 struct ceph_mds_request *req; 237 237 struct dentry *dn; 238 + struct ceph_acls_info acls = {}; 238 239 int err; 239 240 240 241 dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n", ··· 249 248 if (err < 0) 250 249 return err; 251 250 251 + if (flags & O_CREAT) { 252 + err = ceph_pre_init_acls(dir, &mode, &acls); 253 + if (err < 0) 254 + return err; 255 + } 256 + 252 257 /* do the open */ 253 258 req = prepare_open_request(dir->i_sb, flags, mode); 254 - if (IS_ERR(req)) 255 - return PTR_ERR(req); 259 + if (IS_ERR(req)) { 260 + err = PTR_ERR(req); 261 + goto out_acl; 262 + } 256 263 req->r_dentry = dget(dentry); 257 264 req->r_num_caps = 2; 258 265 if (flags & O_CREAT) { 259 266 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 260 267 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 268 + if (acls.pagelist) { 269 + req->r_pagelist = acls.pagelist; 270 + acls.pagelist = NULL; 271 + } 261 272 } 262 273 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 263 274 err = ceph_mdsc_do_request(mdsc, 264 275 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 265 276 req); 266 277 if (err) 267 - goto out_err; 278 + goto out_req; 268 279 269 280 err = ceph_handle_snapdir(req, dentry, err); 270 281 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) ··· 291 278 dn = NULL; 292 279 } 293 280 if (err) 294 - goto out_err; 281 + goto out_req; 295 282 if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) { 296 283 /* make vfs retry on splice, ENOENT, or symlink */ 297 284 dout("atomic_open finish_no_open on dn %p\n", dn); ··· 299 286 } else { 300 287 dout("atomic_open finish_open on dn %p\n", dn); 301 288 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { 302 - ceph_init_acl(dentry, dentry->d_inode, dir); 289 + ceph_init_inode_acls(dentry->d_inode, &acls); 303 290 *opened |= FILE_CREATED; 304 291 } 305 292 err = finish_open(file, dentry, ceph_open, opened); 306 293 } 307 - out_err: 294 + out_req: 308 295 if (!req->r_err && req->r_target_inode) 309 296 ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); 310 297 ceph_mdsc_put_request(req); 298 + out_acl: 299 + ceph_release_acls_info(&acls); 311 300 dout("atomic_open result=%d\n", err); 312 301 return err; 313 302 }
+19 -5
fs/ceph/super.h
··· 733 733 extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); 734 734 extern void __init ceph_xattr_init(void); 735 735 extern void ceph_xattr_exit(void); 736 + extern const struct xattr_handler *ceph_xattr_handlers[]; 736 737 737 738 /* acl.c */ 738 - extern const struct xattr_handler *ceph_xattr_handlers[]; 739 + struct ceph_acls_info { 740 + void *default_acl; 741 + void *acl; 742 + struct ceph_pagelist *pagelist; 743 + }; 739 744 740 745 #ifdef CONFIG_CEPH_FS_POSIX_ACL 741 746 742 747 struct posix_acl *ceph_get_acl(struct inode *, int); 743 748 int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type); 744 - int ceph_init_acl(struct dentry *, struct inode *, struct inode *); 749 + int ceph_pre_init_acls(struct inode *dir, umode_t *mode, 750 + struct ceph_acls_info *info); 751 + void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info); 752 + void ceph_release_acls_info(struct ceph_acls_info *info); 745 753 746 754 static inline void ceph_forget_all_cached_acls(struct inode *inode) 747 755 { ··· 761 753 #define ceph_get_acl NULL 762 754 #define ceph_set_acl NULL 763 755 764 - static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, 765 - struct inode *dir) 756 + static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode, 757 + struct ceph_acls_info *info) 766 758 { 767 759 return 0; 768 760 } 769 - 761 + static inline void ceph_init_inode_acls(struct inode *inode, 762 + struct ceph_acls_info *info) 763 + { 764 + } 765 + static inline void ceph_release_acls_info(struct ceph_acls_info *info) 766 + { 767 + } 770 768 static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) 771 769 { 772 770 return 0;