Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/fs.h>
5#include <linux/wait.h>
6#include <linux/slab.h>
7#include <linux/gfp.h>
8#include <linux/sched.h>
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
11#include <linux/ratelimit.h>
12#include <linux/bits.h>
13#include <linux/ktime.h>
14#include <linux/bitmap.h>
15#include <linux/mnt_idmapping.h>
16
17#include "super.h"
18#include "mds_client.h"
19#include "crypto.h"
20
21#include <linux/ceph/ceph_features.h>
22#include <linux/ceph/messenger.h>
23#include <linux/ceph/decode.h>
24#include <linux/ceph/pagelist.h>
25#include <linux/ceph/auth.h>
26#include <linux/ceph/debugfs.h>
27#include <trace/events/ceph.h>
28
29#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
30
31/*
32 * A cluster of MDS (metadata server) daemons is responsible for
33 * managing the file system namespace (the directory hierarchy and
34 * inodes) and for coordinating shared access to storage. Metadata is
35 * partitioning hierarchically across a number of servers, and that
36 * partition varies over time as the cluster adjusts the distribution
37 * in order to balance load.
38 *
39 * The MDS client is primarily responsible to managing synchronous
40 * metadata requests for operations like open, unlink, and so forth.
41 * If there is a MDS failure, we find out about it when we (possibly
42 * request and) receive a new MDS map, and can resubmit affected
43 * requests.
44 *
45 * For the most part, though, we take advantage of a lossless
46 * communications channel to the MDS, and do not need to worry about
47 * timing out or resubmitting requests.
48 *
49 * We maintain a stateful "session" with each MDS we interact with.
50 * Within each session, we sent periodic heartbeat messages to ensure
51 * any capabilities or leases we have been issues remain valid. If
52 * the session times out and goes stale, our leases and capabilities
53 * are no longer valid.
54 */
55
56struct ceph_reconnect_state {
57 struct ceph_mds_session *session;
58 int nr_caps, nr_realms;
59 struct ceph_pagelist *pagelist;
60 unsigned msg_version;
61 bool allow_multi;
62};
63
64static void __wake_requests(struct ceph_mds_client *mdsc,
65 struct list_head *head);
66static void ceph_cap_release_work(struct work_struct *work);
67static void ceph_cap_reclaim_work(struct work_struct *work);
68
69static const struct ceph_connection_operations mds_con_ops;
70
71
72/*
73 * mds reply parsing
74 */
75
76static int parse_reply_info_quota(void **p, void *end,
77 struct ceph_mds_reply_info_in *info)
78{
79 u8 struct_v, struct_compat;
80 u32 struct_len;
81
82 ceph_decode_8_safe(p, end, struct_v, bad);
83 ceph_decode_8_safe(p, end, struct_compat, bad);
84 /* struct_v is expected to be >= 1. we only
85 * understand encoding with struct_compat == 1. */
86 if (!struct_v || struct_compat != 1)
87 goto bad;
88 ceph_decode_32_safe(p, end, struct_len, bad);
89 ceph_decode_need(p, end, struct_len, bad);
90 end = *p + struct_len;
91 ceph_decode_64_safe(p, end, info->max_bytes, bad);
92 ceph_decode_64_safe(p, end, info->max_files, bad);
93 *p = end;
94 return 0;
95bad:
96 return -EIO;
97}
98
99/*
100 * parse individual inode info
101 */
102static int parse_reply_info_in(void **p, void *end,
103 struct ceph_mds_reply_info_in *info,
104 u64 features)
105{
106 int err = 0;
107 u8 struct_v = 0;
108
109 if (features == (u64)-1) {
110 u32 struct_len;
111 u8 struct_compat;
112 ceph_decode_8_safe(p, end, struct_v, bad);
113 ceph_decode_8_safe(p, end, struct_compat, bad);
114 /* struct_v is expected to be >= 1. we only understand
115 * encoding with struct_compat == 1. */
116 if (!struct_v || struct_compat != 1)
117 goto bad;
118 ceph_decode_32_safe(p, end, struct_len, bad);
119 ceph_decode_need(p, end, struct_len, bad);
120 end = *p + struct_len;
121 }
122
123 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
124 info->in = *p;
125 *p += sizeof(struct ceph_mds_reply_inode) +
126 sizeof(*info->in->fragtree.splits) *
127 le32_to_cpu(info->in->fragtree.nsplits);
128
129 ceph_decode_32_safe(p, end, info->symlink_len, bad);
130 ceph_decode_need(p, end, info->symlink_len, bad);
131 info->symlink = *p;
132 *p += info->symlink_len;
133
134 ceph_decode_copy_safe(p, end, &info->dir_layout,
135 sizeof(info->dir_layout), bad);
136 ceph_decode_32_safe(p, end, info->xattr_len, bad);
137 ceph_decode_need(p, end, info->xattr_len, bad);
138 info->xattr_data = *p;
139 *p += info->xattr_len;
140
141 if (features == (u64)-1) {
142 /* inline data */
143 ceph_decode_64_safe(p, end, info->inline_version, bad);
144 ceph_decode_32_safe(p, end, info->inline_len, bad);
145 ceph_decode_need(p, end, info->inline_len, bad);
146 info->inline_data = *p;
147 *p += info->inline_len;
148 /* quota */
149 err = parse_reply_info_quota(p, end, info);
150 if (err < 0)
151 goto out_bad;
152 /* pool namespace */
153 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
154 if (info->pool_ns_len > 0) {
155 ceph_decode_need(p, end, info->pool_ns_len, bad);
156 info->pool_ns_data = *p;
157 *p += info->pool_ns_len;
158 }
159
160 /* btime */
161 ceph_decode_need(p, end, sizeof(info->btime), bad);
162 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
163
164 /* change attribute */
165 ceph_decode_64_safe(p, end, info->change_attr, bad);
166
167 /* dir pin */
168 if (struct_v >= 2) {
169 ceph_decode_32_safe(p, end, info->dir_pin, bad);
170 } else {
171 info->dir_pin = -ENODATA;
172 }
173
174 /* snapshot birth time, remains zero for v<=2 */
175 if (struct_v >= 3) {
176 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
177 ceph_decode_copy(p, &info->snap_btime,
178 sizeof(info->snap_btime));
179 } else {
180 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
181 }
182
183 /* snapshot count, remains zero for v<=3 */
184 if (struct_v >= 4) {
185 ceph_decode_64_safe(p, end, info->rsnaps, bad);
186 } else {
187 info->rsnaps = 0;
188 }
189
190 if (struct_v >= 5) {
191 u32 alen;
192
193 ceph_decode_32_safe(p, end, alen, bad);
194
195 while (alen--) {
196 u32 len;
197
198 /* key */
199 ceph_decode_32_safe(p, end, len, bad);
200 ceph_decode_skip_n(p, end, len, bad);
201 /* value */
202 ceph_decode_32_safe(p, end, len, bad);
203 ceph_decode_skip_n(p, end, len, bad);
204 }
205 }
206
207 /* fscrypt flag -- ignore */
208 if (struct_v >= 6)
209 ceph_decode_skip_8(p, end, bad);
210
211 info->fscrypt_auth = NULL;
212 info->fscrypt_auth_len = 0;
213 info->fscrypt_file = NULL;
214 info->fscrypt_file_len = 0;
215 if (struct_v >= 7) {
216 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
217 if (info->fscrypt_auth_len) {
218 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
219 GFP_KERNEL);
220 if (!info->fscrypt_auth)
221 return -ENOMEM;
222 ceph_decode_copy_safe(p, end, info->fscrypt_auth,
223 info->fscrypt_auth_len, bad);
224 }
225 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
226 if (info->fscrypt_file_len) {
227 info->fscrypt_file = kmalloc(info->fscrypt_file_len,
228 GFP_KERNEL);
229 if (!info->fscrypt_file)
230 return -ENOMEM;
231 ceph_decode_copy_safe(p, end, info->fscrypt_file,
232 info->fscrypt_file_len, bad);
233 }
234 }
235 *p = end;
236 } else {
237 /* legacy (unversioned) struct */
238 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
239 ceph_decode_64_safe(p, end, info->inline_version, bad);
240 ceph_decode_32_safe(p, end, info->inline_len, bad);
241 ceph_decode_need(p, end, info->inline_len, bad);
242 info->inline_data = *p;
243 *p += info->inline_len;
244 } else
245 info->inline_version = CEPH_INLINE_NONE;
246
247 if (features & CEPH_FEATURE_MDS_QUOTA) {
248 err = parse_reply_info_quota(p, end, info);
249 if (err < 0)
250 goto out_bad;
251 } else {
252 info->max_bytes = 0;
253 info->max_files = 0;
254 }
255
256 info->pool_ns_len = 0;
257 info->pool_ns_data = NULL;
258 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
259 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
260 if (info->pool_ns_len > 0) {
261 ceph_decode_need(p, end, info->pool_ns_len, bad);
262 info->pool_ns_data = *p;
263 *p += info->pool_ns_len;
264 }
265 }
266
267 if (features & CEPH_FEATURE_FS_BTIME) {
268 ceph_decode_need(p, end, sizeof(info->btime), bad);
269 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
270 ceph_decode_64_safe(p, end, info->change_attr, bad);
271 }
272
273 info->dir_pin = -ENODATA;
274 /* info->snap_btime and info->rsnaps remain zero */
275 }
276 return 0;
277bad:
278 err = -EIO;
279out_bad:
280 return err;
281}
282
283static int parse_reply_info_dir(void **p, void *end,
284 struct ceph_mds_reply_dirfrag **dirfrag,
285 u64 features)
286{
287 if (features == (u64)-1) {
288 u8 struct_v, struct_compat;
289 u32 struct_len;
290 ceph_decode_8_safe(p, end, struct_v, bad);
291 ceph_decode_8_safe(p, end, struct_compat, bad);
292 /* struct_v is expected to be >= 1. we only understand
293 * encoding whose struct_compat == 1. */
294 if (!struct_v || struct_compat != 1)
295 goto bad;
296 ceph_decode_32_safe(p, end, struct_len, bad);
297 ceph_decode_need(p, end, struct_len, bad);
298 end = *p + struct_len;
299 }
300
301 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
302 *dirfrag = *p;
303 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
304 if (unlikely(*p > end))
305 goto bad;
306 if (features == (u64)-1)
307 *p = end;
308 return 0;
309bad:
310 return -EIO;
311}
312
313static int parse_reply_info_lease(void **p, void *end,
314 struct ceph_mds_reply_lease **lease,
315 u64 features, u32 *altname_len, u8 **altname)
316{
317 u8 struct_v;
318 u32 struct_len;
319 void *lend;
320
321 if (features == (u64)-1) {
322 u8 struct_compat;
323
324 ceph_decode_8_safe(p, end, struct_v, bad);
325 ceph_decode_8_safe(p, end, struct_compat, bad);
326
327 /* struct_v is expected to be >= 1. we only understand
328 * encoding whose struct_compat == 1. */
329 if (!struct_v || struct_compat != 1)
330 goto bad;
331
332 ceph_decode_32_safe(p, end, struct_len, bad);
333 } else {
334 struct_len = sizeof(**lease);
335 *altname_len = 0;
336 *altname = NULL;
337 }
338
339 lend = *p + struct_len;
340 ceph_decode_need(p, end, struct_len, bad);
341 *lease = *p;
342 *p += sizeof(**lease);
343
344 if (features == (u64)-1) {
345 if (struct_v >= 2) {
346 ceph_decode_32_safe(p, end, *altname_len, bad);
347 ceph_decode_need(p, end, *altname_len, bad);
348 *altname = *p;
349 *p += *altname_len;
350 } else {
351 *altname = NULL;
352 *altname_len = 0;
353 }
354 }
355 *p = lend;
356 return 0;
357bad:
358 return -EIO;
359}
360
361/*
362 * parse a normal reply, which may contain a (dir+)dentry and/or a
363 * target inode.
364 */
365static int parse_reply_info_trace(void **p, void *end,
366 struct ceph_mds_reply_info_parsed *info,
367 u64 features)
368{
369 int err;
370
371 if (info->head->is_dentry) {
372 err = parse_reply_info_in(p, end, &info->diri, features);
373 if (err < 0)
374 goto out_bad;
375
376 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
377 if (err < 0)
378 goto out_bad;
379
380 ceph_decode_32_safe(p, end, info->dname_len, bad);
381 ceph_decode_need(p, end, info->dname_len, bad);
382 info->dname = *p;
383 *p += info->dname_len;
384
385 err = parse_reply_info_lease(p, end, &info->dlease, features,
386 &info->altname_len, &info->altname);
387 if (err < 0)
388 goto out_bad;
389 }
390
391 if (info->head->is_target) {
392 err = parse_reply_info_in(p, end, &info->targeti, features);
393 if (err < 0)
394 goto out_bad;
395 }
396
397 if (unlikely(*p != end))
398 goto bad;
399 return 0;
400
401bad:
402 err = -EIO;
403out_bad:
404 pr_err("problem parsing mds trace %d\n", err);
405 return err;
406}
407
408/*
409 * parse readdir results
410 */
411static int parse_reply_info_readdir(void **p, void *end,
412 struct ceph_mds_request *req,
413 u64 features)
414{
415 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
416 struct ceph_client *cl = req->r_mdsc->fsc->client;
417 u32 num, i = 0;
418 int err;
419
420 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
421 if (err < 0)
422 goto out_bad;
423
424 ceph_decode_need(p, end, sizeof(num) + 2, bad);
425 num = ceph_decode_32(p);
426 {
427 u16 flags = ceph_decode_16(p);
428 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
429 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
430 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
431 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
432 }
433 if (num == 0)
434 goto done;
435
436 BUG_ON(!info->dir_entries);
437 if ((unsigned long)(info->dir_entries + num) >
438 (unsigned long)info->dir_entries + info->dir_buf_size) {
439 pr_err_client(cl, "dir contents are larger than expected\n");
440 WARN_ON(1);
441 goto bad;
442 }
443
444 info->dir_nr = num;
445 while (num) {
446 struct inode *inode = d_inode(req->r_dentry);
447 struct ceph_inode_info *ci = ceph_inode(inode);
448 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
449 struct fscrypt_str tname = FSTR_INIT(NULL, 0);
450 struct fscrypt_str oname = FSTR_INIT(NULL, 0);
451 struct ceph_fname fname;
452 u32 altname_len, _name_len;
453 u8 *altname, *_name;
454
455 /* dentry */
456 ceph_decode_32_safe(p, end, _name_len, bad);
457 ceph_decode_need(p, end, _name_len, bad);
458 _name = *p;
459 *p += _name_len;
460 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name);
461
462 if (info->hash_order)
463 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
464 _name, _name_len);
465
466 /* dentry lease */
467 err = parse_reply_info_lease(p, end, &rde->lease, features,
468 &altname_len, &altname);
469 if (err)
470 goto out_bad;
471
472 /*
473 * Try to dencrypt the dentry names and update them
474 * in the ceph_mds_reply_dir_entry struct.
475 */
476 fname.dir = inode;
477 fname.name = _name;
478 fname.name_len = _name_len;
479 fname.ctext = altname;
480 fname.ctext_len = altname_len;
481 /*
482 * The _name_len maybe larger than altname_len, such as
483 * when the human readable name length is in range of
484 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
485 * then the copy in ceph_fname_to_usr will corrupt the
486 * data if there has no encryption key.
487 *
488 * Just set the no_copy flag and then if there has no
489 * encryption key the oname.name will be assigned to
490 * _name always.
491 */
492 fname.no_copy = true;
493 if (altname_len == 0) {
494 /*
495 * Set tname to _name, and this will be used
496 * to do the base64_decode in-place. It's
497 * safe because the decoded string should
498 * always be shorter, which is 3/4 of origin
499 * string.
500 */
501 tname.name = _name;
502
503 /*
504 * Set oname to _name too, and this will be
505 * used to do the dencryption in-place.
506 */
507 oname.name = _name;
508 oname.len = _name_len;
509 } else {
510 /*
511 * This will do the decryption only in-place
512 * from altname cryptext directly.
513 */
514 oname.name = altname;
515 oname.len = altname_len;
516 }
517 rde->is_nokey = false;
518 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
519 if (err) {
520 pr_err_client(cl, "unable to decode %.*s, got %d\n",
521 _name_len, _name, err);
522 goto out_bad;
523 }
524 rde->name = oname.name;
525 rde->name_len = oname.len;
526
527 /* inode */
528 err = parse_reply_info_in(p, end, &rde->inode, features);
529 if (err < 0)
530 goto out_bad;
531 /* ceph_readdir_prepopulate() will update it */
532 rde->offset = 0;
533 i++;
534 num--;
535 }
536
537done:
538 /* Skip over any unrecognized fields */
539 *p = end;
540 return 0;
541
542bad:
543 err = -EIO;
544out_bad:
545 pr_err_client(cl, "problem parsing dir contents %d\n", err);
546 return err;
547}
548
549/*
550 * parse fcntl F_GETLK results
551 */
552static int parse_reply_info_filelock(void **p, void *end,
553 struct ceph_mds_reply_info_parsed *info,
554 u64 features)
555{
556 if (*p + sizeof(*info->filelock_reply) > end)
557 goto bad;
558
559 info->filelock_reply = *p;
560
561 /* Skip over any unrecognized fields */
562 *p = end;
563 return 0;
564bad:
565 return -EIO;
566}
567
568
569#if BITS_PER_LONG == 64
570
571#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
572
573static int ceph_parse_deleg_inos(void **p, void *end,
574 struct ceph_mds_session *s)
575{
576 struct ceph_client *cl = s->s_mdsc->fsc->client;
577 u32 sets;
578
579 ceph_decode_32_safe(p, end, sets, bad);
580 doutc(cl, "got %u sets of delegated inodes\n", sets);
581 while (sets--) {
582 u64 start, len;
583
584 ceph_decode_64_safe(p, end, start, bad);
585 ceph_decode_64_safe(p, end, len, bad);
586
587 /* Don't accept a delegation of system inodes */
588 if (start < CEPH_INO_SYSTEM_BASE) {
589 pr_warn_ratelimited_client(cl,
590 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
591 start, len);
592 continue;
593 }
594 while (len--) {
595 int err = xa_insert(&s->s_delegated_inos, start++,
596 DELEGATED_INO_AVAILABLE,
597 GFP_KERNEL);
598 if (!err) {
599 doutc(cl, "added delegated inode 0x%llx\n", start - 1);
600 } else if (err == -EBUSY) {
601 pr_warn_client(cl,
602 "MDS delegated inode 0x%llx more than once.\n",
603 start - 1);
604 } else {
605 return err;
606 }
607 }
608 }
609 return 0;
610bad:
611 return -EIO;
612}
613
614u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
615{
616 unsigned long ino;
617 void *val;
618
619 xa_for_each(&s->s_delegated_inos, ino, val) {
620 val = xa_erase(&s->s_delegated_inos, ino);
621 if (val == DELEGATED_INO_AVAILABLE)
622 return ino;
623 }
624 return 0;
625}
626
627int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
628{
629 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
630 GFP_KERNEL);
631}
632#else /* BITS_PER_LONG == 64 */
633/*
634 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
635 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
636 * and bottom words?
637 */
638static int ceph_parse_deleg_inos(void **p, void *end,
639 struct ceph_mds_session *s)
640{
641 u32 sets;
642
643 ceph_decode_32_safe(p, end, sets, bad);
644 if (sets)
645 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
646 return 0;
647bad:
648 return -EIO;
649}
650
651u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
652{
653 return 0;
654}
655
656int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
657{
658 return 0;
659}
660#endif /* BITS_PER_LONG == 64 */
661
662/*
663 * parse create results
664 */
665static int parse_reply_info_create(void **p, void *end,
666 struct ceph_mds_reply_info_parsed *info,
667 u64 features, struct ceph_mds_session *s)
668{
669 int ret;
670
671 if (features == (u64)-1 ||
672 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
673 if (*p == end) {
674 /* Malformed reply? */
675 info->has_create_ino = false;
676 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
677 info->has_create_ino = true;
678 /* struct_v, struct_compat, and len */
679 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
680 ceph_decode_64_safe(p, end, info->ino, bad);
681 ret = ceph_parse_deleg_inos(p, end, s);
682 if (ret)
683 return ret;
684 } else {
685 /* legacy */
686 ceph_decode_64_safe(p, end, info->ino, bad);
687 info->has_create_ino = true;
688 }
689 } else {
690 if (*p != end)
691 goto bad;
692 }
693
694 /* Skip over any unrecognized fields */
695 *p = end;
696 return 0;
697bad:
698 return -EIO;
699}
700
701static int parse_reply_info_getvxattr(void **p, void *end,
702 struct ceph_mds_reply_info_parsed *info,
703 u64 features)
704{
705 u32 value_len;
706
707 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
708 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
709 ceph_decode_skip_32(p, end, bad); /* skip payload length */
710
711 ceph_decode_32_safe(p, end, value_len, bad);
712
713 if (value_len == end - *p) {
714 info->xattr_info.xattr_value = *p;
715 info->xattr_info.xattr_value_len = value_len;
716 *p = end;
717 return value_len;
718 }
719bad:
720 return -EIO;
721}
722
723/*
724 * parse extra results
725 */
726static int parse_reply_info_extra(void **p, void *end,
727 struct ceph_mds_request *req,
728 u64 features, struct ceph_mds_session *s)
729{
730 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
731 u32 op = le32_to_cpu(info->head->op);
732
733 if (op == CEPH_MDS_OP_GETFILELOCK)
734 return parse_reply_info_filelock(p, end, info, features);
735 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
736 return parse_reply_info_readdir(p, end, req, features);
737 else if (op == CEPH_MDS_OP_CREATE)
738 return parse_reply_info_create(p, end, info, features, s);
739 else if (op == CEPH_MDS_OP_GETVXATTR)
740 return parse_reply_info_getvxattr(p, end, info, features);
741 else
742 return -EIO;
743}
744
745/*
746 * parse entire mds reply
747 */
748static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
749 struct ceph_mds_request *req, u64 features)
750{
751 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
752 struct ceph_client *cl = s->s_mdsc->fsc->client;
753 void *p, *end;
754 u32 len;
755 int err;
756
757 info->head = msg->front.iov_base;
758 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
759 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
760
761 /* trace */
762 ceph_decode_32_safe(&p, end, len, bad);
763 if (len > 0) {
764 ceph_decode_need(&p, end, len, bad);
765 err = parse_reply_info_trace(&p, p+len, info, features);
766 if (err < 0)
767 goto out_bad;
768 }
769
770 /* extra */
771 ceph_decode_32_safe(&p, end, len, bad);
772 if (len > 0) {
773 ceph_decode_need(&p, end, len, bad);
774 err = parse_reply_info_extra(&p, p+len, req, features, s);
775 if (err < 0)
776 goto out_bad;
777 }
778
779 /* snap blob */
780 ceph_decode_32_safe(&p, end, len, bad);
781 info->snapblob_len = len;
782 info->snapblob = p;
783 p += len;
784
785 if (p != end)
786 goto bad;
787 return 0;
788
789bad:
790 err = -EIO;
791out_bad:
792 pr_err_client(cl, "mds parse_reply err %d\n", err);
793 ceph_msg_dump(msg);
794 return err;
795}
796
797static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
798{
799 int i;
800
801 kfree(info->diri.fscrypt_auth);
802 kfree(info->diri.fscrypt_file);
803 kfree(info->targeti.fscrypt_auth);
804 kfree(info->targeti.fscrypt_file);
805 if (!info->dir_entries)
806 return;
807
808 for (i = 0; i < info->dir_nr; i++) {
809 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
810
811 kfree(rde->inode.fscrypt_auth);
812 kfree(rde->inode.fscrypt_file);
813 }
814 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
815}
816
817/*
818 * In async unlink case the kclient won't wait for the first reply
819 * from MDS and just drop all the links and unhash the dentry and then
820 * succeeds immediately.
821 *
822 * For any new create/link/rename,etc requests followed by using the
823 * same file names we must wait for the first reply of the inflight
824 * unlink request, or the MDS possibly will fail these following
825 * requests with -EEXIST if the inflight async unlink request was
826 * delayed for some reasons.
827 *
828 * And the worst case is that for the none async openc request it will
829 * successfully open the file if the CDentry hasn't been unlinked yet,
830 * but later the previous delayed async unlink request will remove the
831 * CDentry. That means the just created file is possibly deleted later
832 * by accident.
833 *
834 * We need to wait for the inflight async unlink requests to finish
835 * when creating new files/directories by using the same file names.
836 */
837int ceph_wait_on_conflict_unlink(struct dentry *dentry)
838{
839 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
840 struct ceph_client *cl = fsc->client;
841 struct dentry *pdentry = dentry->d_parent;
842 struct dentry *udentry, *found = NULL;
843 struct ceph_dentry_info *di;
844 struct qstr dname;
845 u32 hash = dentry->d_name.hash;
846 int err;
847
848 dname.name = dentry->d_name.name;
849 dname.len = dentry->d_name.len;
850
851 rcu_read_lock();
852 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
853 hnode, hash) {
854 udentry = di->dentry;
855
856 spin_lock(&udentry->d_lock);
857 if (udentry->d_name.hash != hash)
858 goto next;
859 if (unlikely(udentry->d_parent != pdentry))
860 goto next;
861 if (!hash_hashed(&di->hnode))
862 goto next;
863
864 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
865 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n",
866 dentry, dentry);
867
868 if (!d_same_name(udentry, pdentry, &dname))
869 goto next;
870
871 found = dget_dlock(udentry);
872 spin_unlock(&udentry->d_lock);
873 break;
874next:
875 spin_unlock(&udentry->d_lock);
876 }
877 rcu_read_unlock();
878
879 if (likely(!found))
880 return 0;
881
882 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry,
883 found, found);
884
885 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
886 TASK_KILLABLE);
887 dput(found);
888 return err;
889}
890
891
892/*
893 * sessions
894 */
895const char *ceph_session_state_name(int s)
896{
897 switch (s) {
898 case CEPH_MDS_SESSION_NEW: return "new";
899 case CEPH_MDS_SESSION_OPENING: return "opening";
900 case CEPH_MDS_SESSION_OPEN: return "open";
901 case CEPH_MDS_SESSION_HUNG: return "hung";
902 case CEPH_MDS_SESSION_CLOSING: return "closing";
903 case CEPH_MDS_SESSION_CLOSED: return "closed";
904 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
905 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
906 case CEPH_MDS_SESSION_REJECTED: return "rejected";
907 default: return "???";
908 }
909}
910
911struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
912{
913 if (refcount_inc_not_zero(&s->s_ref))
914 return s;
915 return NULL;
916}
917
918void ceph_put_mds_session(struct ceph_mds_session *s)
919{
920 if (IS_ERR_OR_NULL(s))
921 return;
922
923 if (refcount_dec_and_test(&s->s_ref)) {
924 if (s->s_auth.authorizer)
925 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
926 WARN_ON(mutex_is_locked(&s->s_mutex));
927 xa_destroy(&s->s_delegated_inos);
928 kfree(s);
929 }
930}
931
932/*
933 * called under mdsc->mutex
934 */
935struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
936 int mds)
937{
938 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
939 return NULL;
940 return ceph_get_mds_session(mdsc->sessions[mds]);
941}
942
943static bool __have_session(struct ceph_mds_client *mdsc, int mds)
944{
945 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
946 return false;
947 else
948 return true;
949}
950
951static int __verify_registered_session(struct ceph_mds_client *mdsc,
952 struct ceph_mds_session *s)
953{
954 if (s->s_mds >= mdsc->max_sessions ||
955 mdsc->sessions[s->s_mds] != s)
956 return -ENOENT;
957 return 0;
958}
959
960/*
961 * create+register a new session for given mds.
962 * called under mdsc->mutex.
963 */
964static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
965 int mds)
966{
967 struct ceph_client *cl = mdsc->fsc->client;
968 struct ceph_mds_session *s;
969
970 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
971 return ERR_PTR(-EIO);
972
973 if (mds >= mdsc->mdsmap->possible_max_rank)
974 return ERR_PTR(-EINVAL);
975
976 s = kzalloc(sizeof(*s), GFP_NOFS);
977 if (!s)
978 return ERR_PTR(-ENOMEM);
979
980 if (mds >= mdsc->max_sessions) {
981 int newmax = 1 << get_count_order(mds + 1);
982 struct ceph_mds_session **sa;
983 size_t ptr_size = sizeof(struct ceph_mds_session *);
984
985 doutc(cl, "realloc to %d\n", newmax);
986 sa = kcalloc(newmax, ptr_size, GFP_NOFS);
987 if (!sa)
988 goto fail_realloc;
989 if (mdsc->sessions) {
990 memcpy(sa, mdsc->sessions,
991 mdsc->max_sessions * ptr_size);
992 kfree(mdsc->sessions);
993 }
994 mdsc->sessions = sa;
995 mdsc->max_sessions = newmax;
996 }
997
998 doutc(cl, "mds%d\n", mds);
999 s->s_mdsc = mdsc;
1000 s->s_mds = mds;
1001 s->s_state = CEPH_MDS_SESSION_NEW;
1002 mutex_init(&s->s_mutex);
1003
1004 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
1005
1006 atomic_set(&s->s_cap_gen, 1);
1007 s->s_cap_ttl = jiffies - 1;
1008
1009 spin_lock_init(&s->s_cap_lock);
1010 INIT_LIST_HEAD(&s->s_caps);
1011 refcount_set(&s->s_ref, 1);
1012 INIT_LIST_HEAD(&s->s_waiting);
1013 INIT_LIST_HEAD(&s->s_unsafe);
1014 xa_init(&s->s_delegated_inos);
1015 INIT_LIST_HEAD(&s->s_cap_releases);
1016 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1017
1018 INIT_LIST_HEAD(&s->s_cap_dirty);
1019 INIT_LIST_HEAD(&s->s_cap_flushing);
1020
1021 mdsc->sessions[mds] = s;
1022 atomic_inc(&mdsc->num_sessions);
1023 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
1024
1025 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1026 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1027
1028 return s;
1029
1030fail_realloc:
1031 kfree(s);
1032 return ERR_PTR(-ENOMEM);
1033}
1034
1035/*
1036 * called under mdsc->mutex
1037 */
1038static void __unregister_session(struct ceph_mds_client *mdsc,
1039 struct ceph_mds_session *s)
1040{
1041 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s);
1042 BUG_ON(mdsc->sessions[s->s_mds] != s);
1043 mdsc->sessions[s->s_mds] = NULL;
1044 ceph_con_close(&s->s_con);
1045 ceph_put_mds_session(s);
1046 atomic_dec(&mdsc->num_sessions);
1047}
1048
1049/*
1050 * drop session refs in request.
1051 *
1052 * should be last request ref, or hold mdsc->mutex
1053 */
1054static void put_request_session(struct ceph_mds_request *req)
1055{
1056 if (req->r_session) {
1057 ceph_put_mds_session(req->r_session);
1058 req->r_session = NULL;
1059 }
1060}
1061
1062void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1063 void (*cb)(struct ceph_mds_session *),
1064 bool check_state)
1065{
1066 int mds;
1067
1068 mutex_lock(&mdsc->mutex);
1069 for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1070 struct ceph_mds_session *s;
1071
1072 s = __ceph_lookup_mds_session(mdsc, mds);
1073 if (!s)
1074 continue;
1075
1076 if (check_state && !check_session_state(s)) {
1077 ceph_put_mds_session(s);
1078 continue;
1079 }
1080
1081 mutex_unlock(&mdsc->mutex);
1082 cb(s);
1083 ceph_put_mds_session(s);
1084 mutex_lock(&mdsc->mutex);
1085 }
1086 mutex_unlock(&mdsc->mutex);
1087}
1088
1089void ceph_mdsc_release_request(struct kref *kref)
1090{
1091 struct ceph_mds_request *req = container_of(kref,
1092 struct ceph_mds_request,
1093 r_kref);
1094 ceph_mdsc_release_dir_caps_async(req);
1095 destroy_reply_info(&req->r_reply_info);
1096 if (req->r_request)
1097 ceph_msg_put(req->r_request);
1098 if (req->r_reply)
1099 ceph_msg_put(req->r_reply);
1100 if (req->r_inode) {
1101 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1102 iput(req->r_inode);
1103 }
1104 if (req->r_parent) {
1105 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1106 iput(req->r_parent);
1107 }
1108 iput(req->r_target_inode);
1109 iput(req->r_new_inode);
1110 if (req->r_dentry)
1111 dput(req->r_dentry);
1112 if (req->r_old_dentry)
1113 dput(req->r_old_dentry);
1114 if (req->r_old_dentry_dir) {
1115 /*
1116 * track (and drop pins for) r_old_dentry_dir
1117 * separately, since r_old_dentry's d_parent may have
1118 * changed between the dir mutex being dropped and
1119 * this request being freed.
1120 */
1121 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1122 CEPH_CAP_PIN);
1123 iput(req->r_old_dentry_dir);
1124 }
1125 kfree(req->r_path1);
1126 kfree(req->r_path2);
1127 put_cred(req->r_cred);
1128 if (req->r_mnt_idmap)
1129 mnt_idmap_put(req->r_mnt_idmap);
1130 if (req->r_pagelist)
1131 ceph_pagelist_release(req->r_pagelist);
1132 kfree(req->r_fscrypt_auth);
1133 kfree(req->r_altname);
1134 put_request_session(req);
1135 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1136 WARN_ON_ONCE(!list_empty(&req->r_wait));
1137 kmem_cache_free(ceph_mds_request_cachep, req);
1138}
1139
1140DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1141
1142/*
1143 * lookup session, bump ref if found.
1144 *
1145 * called under mdsc->mutex.
1146 */
1147static struct ceph_mds_request *
1148lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1149{
1150 struct ceph_mds_request *req;
1151
1152 req = lookup_request(&mdsc->request_tree, tid);
1153 if (req)
1154 ceph_mdsc_get_request(req);
1155
1156 return req;
1157}
1158
1159/*
1160 * Register an in-flight request, and assign a tid. Link to directory
1161 * are modifying (if any).
1162 *
1163 * Called under mdsc->mutex.
1164 */
1165static void __register_request(struct ceph_mds_client *mdsc,
1166 struct ceph_mds_request *req,
1167 struct inode *dir)
1168{
1169 struct ceph_client *cl = mdsc->fsc->client;
1170 int ret = 0;
1171
1172 req->r_tid = ++mdsc->last_tid;
1173 if (req->r_num_caps) {
1174 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1175 req->r_num_caps);
1176 if (ret < 0) {
1177 pr_err_client(cl, "%p failed to reserve caps: %d\n",
1178 req, ret);
1179 /* set req->r_err to fail early from __do_request */
1180 req->r_err = ret;
1181 return;
1182 }
1183 }
1184 doutc(cl, "%p tid %lld\n", req, req->r_tid);
1185 ceph_mdsc_get_request(req);
1186 insert_request(&mdsc->request_tree, req);
1187
1188 req->r_cred = get_current_cred();
1189 if (!req->r_mnt_idmap)
1190 req->r_mnt_idmap = &nop_mnt_idmap;
1191
1192 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1193 mdsc->oldest_tid = req->r_tid;
1194
1195 if (dir) {
1196 struct ceph_inode_info *ci = ceph_inode(dir);
1197
1198 ihold(dir);
1199 req->r_unsafe_dir = dir;
1200 spin_lock(&ci->i_unsafe_lock);
1201 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1202 spin_unlock(&ci->i_unsafe_lock);
1203 }
1204}
1205
1206static void __unregister_request(struct ceph_mds_client *mdsc,
1207 struct ceph_mds_request *req)
1208{
1209 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid);
1210
1211 /* Never leave an unregistered request on an unsafe list! */
1212 list_del_init(&req->r_unsafe_item);
1213
1214 if (req->r_tid == mdsc->oldest_tid) {
1215 struct rb_node *p = rb_next(&req->r_node);
1216 mdsc->oldest_tid = 0;
1217 while (p) {
1218 struct ceph_mds_request *next_req =
1219 rb_entry(p, struct ceph_mds_request, r_node);
1220 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1221 mdsc->oldest_tid = next_req->r_tid;
1222 break;
1223 }
1224 p = rb_next(p);
1225 }
1226 }
1227
1228 erase_request(&mdsc->request_tree, req);
1229
1230 if (req->r_unsafe_dir) {
1231 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1232 spin_lock(&ci->i_unsafe_lock);
1233 list_del_init(&req->r_unsafe_dir_item);
1234 spin_unlock(&ci->i_unsafe_lock);
1235 }
1236 if (req->r_target_inode &&
1237 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1238 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1239 spin_lock(&ci->i_unsafe_lock);
1240 list_del_init(&req->r_unsafe_target_item);
1241 spin_unlock(&ci->i_unsafe_lock);
1242 }
1243
1244 if (req->r_unsafe_dir) {
1245 iput(req->r_unsafe_dir);
1246 req->r_unsafe_dir = NULL;
1247 }
1248
1249 complete_all(&req->r_safe_completion);
1250
1251 ceph_mdsc_put_request(req);
1252}
1253
1254/*
1255 * Walk back up the dentry tree until we hit a dentry representing a
1256 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1257 * when calling this) to ensure that the objects won't disappear while we're
1258 * working with them. Once we hit a candidate dentry, we attempt to take a
1259 * reference to it, and return that as the result.
1260 */
1261static struct inode *get_nonsnap_parent(struct dentry *dentry)
1262{
1263 struct inode *inode = NULL;
1264
1265 while (dentry && !IS_ROOT(dentry)) {
1266 inode = d_inode_rcu(dentry);
1267 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1268 break;
1269 dentry = dentry->d_parent;
1270 }
1271 if (inode)
1272 inode = igrab(inode);
1273 return inode;
1274}
1275
1276/*
1277 * Choose mds to send request to next. If there is a hint set in the
1278 * request (e.g., due to a prior forward hint from the mds), use that.
1279 * Otherwise, consult frag tree and/or caps to identify the
1280 * appropriate mds. If all else fails, choose randomly.
1281 *
1282 * Called under mdsc->mutex.
1283 */
1284static int __choose_mds(struct ceph_mds_client *mdsc,
1285 struct ceph_mds_request *req,
1286 bool *random)
1287{
1288 struct inode *inode;
1289 struct ceph_inode_info *ci;
1290 struct ceph_cap *cap;
1291 int mode = req->r_direct_mode;
1292 int mds = -1;
1293 u32 hash = req->r_direct_hash;
1294 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1295 struct ceph_client *cl = mdsc->fsc->client;
1296
1297 if (random)
1298 *random = false;
1299
1300 /*
1301 * is there a specific mds we should try? ignore hint if we have
1302 * no session and the mds is not up (active or recovering).
1303 */
1304 if (req->r_resend_mds >= 0 &&
1305 (__have_session(mdsc, req->r_resend_mds) ||
1306 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1307 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds);
1308 return req->r_resend_mds;
1309 }
1310
1311 if (mode == USE_RANDOM_MDS)
1312 goto random;
1313
1314 inode = NULL;
1315 if (req->r_inode) {
1316 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1317 inode = req->r_inode;
1318 ihold(inode);
1319 } else {
1320 /* req->r_dentry is non-null for LSSNAP request */
1321 rcu_read_lock();
1322 inode = get_nonsnap_parent(req->r_dentry);
1323 rcu_read_unlock();
1324 doutc(cl, "using snapdir's parent %p %llx.%llx\n",
1325 inode, ceph_vinop(inode));
1326 }
1327 } else if (req->r_dentry) {
1328 /* ignore race with rename; old or new d_parent is okay */
1329 struct dentry *parent;
1330 struct inode *dir;
1331
1332 rcu_read_lock();
1333 parent = READ_ONCE(req->r_dentry->d_parent);
1334 dir = req->r_parent ? : d_inode_rcu(parent);
1335
1336 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1337 /* not this fs or parent went negative */
1338 inode = d_inode(req->r_dentry);
1339 if (inode)
1340 ihold(inode);
1341 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1342 /* direct snapped/virtual snapdir requests
1343 * based on parent dir inode */
1344 inode = get_nonsnap_parent(parent);
1345 doutc(cl, "using nonsnap parent %p %llx.%llx\n",
1346 inode, ceph_vinop(inode));
1347 } else {
1348 /* dentry target */
1349 inode = d_inode(req->r_dentry);
1350 if (!inode || mode == USE_AUTH_MDS) {
1351 /* dir + name */
1352 inode = igrab(dir);
1353 hash = ceph_dentry_hash(dir, req->r_dentry);
1354 is_hash = true;
1355 } else {
1356 ihold(inode);
1357 }
1358 }
1359 rcu_read_unlock();
1360 }
1361
1362 if (!inode)
1363 goto random;
1364
1365 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode,
1366 ceph_vinop(inode), (int)is_hash, hash, mode);
1367 ci = ceph_inode(inode);
1368
1369 if (is_hash && S_ISDIR(inode->i_mode)) {
1370 struct ceph_inode_frag frag;
1371 int found;
1372
1373 ceph_choose_frag(ci, hash, &frag, &found);
1374 if (found) {
1375 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1376 u8 r;
1377
1378 /* choose a random replica */
1379 get_random_bytes(&r, 1);
1380 r %= frag.ndist;
1381 mds = frag.dist[r];
1382 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
1383 inode, ceph_vinop(inode), frag.frag,
1384 mds, (int)r, frag.ndist);
1385 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1386 CEPH_MDS_STATE_ACTIVE &&
1387 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1388 goto out;
1389 }
1390
1391 /* since this file/dir wasn't known to be
1392 * replicated, then we want to look for the
1393 * authoritative mds. */
1394 if (frag.mds >= 0) {
1395 /* choose auth mds */
1396 mds = frag.mds;
1397 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
1398 inode, ceph_vinop(inode), frag.frag, mds);
1399 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1400 CEPH_MDS_STATE_ACTIVE) {
1401 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1402 mds))
1403 goto out;
1404 }
1405 }
1406 mode = USE_AUTH_MDS;
1407 }
1408 }
1409
1410 spin_lock(&ci->i_ceph_lock);
1411 cap = NULL;
1412 if (mode == USE_AUTH_MDS)
1413 cap = ci->i_auth_cap;
1414 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1415 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1416 if (!cap) {
1417 spin_unlock(&ci->i_ceph_lock);
1418 iput(inode);
1419 goto random;
1420 }
1421 mds = cap->session->s_mds;
1422 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
1423 ceph_vinop(inode), mds,
1424 cap == ci->i_auth_cap ? "auth " : "", cap);
1425 spin_unlock(&ci->i_ceph_lock);
1426out:
1427 iput(inode);
1428 return mds;
1429
1430random:
1431 if (random)
1432 *random = true;
1433
1434 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1435 doutc(cl, "chose random mds%d\n", mds);
1436 return mds;
1437}
1438
1439
1440/*
1441 * session messages
1442 */
1443struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1444{
1445 struct ceph_msg *msg;
1446 struct ceph_mds_session_head *h;
1447
1448 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1449 false);
1450 if (!msg) {
1451 pr_err("ENOMEM creating session %s msg\n",
1452 ceph_session_op_name(op));
1453 return NULL;
1454 }
1455 h = msg->front.iov_base;
1456 h->op = cpu_to_le32(op);
1457 h->seq = cpu_to_le64(seq);
1458
1459 return msg;
1460}
1461
1462static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1463#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1464static int encode_supported_features(void **p, void *end)
1465{
1466 static const size_t count = ARRAY_SIZE(feature_bits);
1467
1468 if (count > 0) {
1469 size_t i;
1470 size_t size = FEATURE_BYTES(count);
1471 unsigned long bit;
1472
1473 if (WARN_ON_ONCE(*p + 4 + size > end))
1474 return -ERANGE;
1475
1476 ceph_encode_32(p, size);
1477 memset(*p, 0, size);
1478 for (i = 0; i < count; i++) {
1479 bit = feature_bits[i];
1480 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1481 }
1482 *p += size;
1483 } else {
1484 if (WARN_ON_ONCE(*p + 4 > end))
1485 return -ERANGE;
1486
1487 ceph_encode_32(p, 0);
1488 }
1489
1490 return 0;
1491}
1492
1493static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1494#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1495static int encode_metric_spec(void **p, void *end)
1496{
1497 static const size_t count = ARRAY_SIZE(metric_bits);
1498
1499 /* header */
1500 if (WARN_ON_ONCE(*p + 2 > end))
1501 return -ERANGE;
1502
1503 ceph_encode_8(p, 1); /* version */
1504 ceph_encode_8(p, 1); /* compat */
1505
1506 if (count > 0) {
1507 size_t i;
1508 size_t size = METRIC_BYTES(count);
1509
1510 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1511 return -ERANGE;
1512
1513 /* metric spec info length */
1514 ceph_encode_32(p, 4 + size);
1515
1516 /* metric spec */
1517 ceph_encode_32(p, size);
1518 memset(*p, 0, size);
1519 for (i = 0; i < count; i++)
1520 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1521 *p += size;
1522 } else {
1523 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1524 return -ERANGE;
1525
1526 /* metric spec info length */
1527 ceph_encode_32(p, 4);
1528 /* metric spec */
1529 ceph_encode_32(p, 0);
1530 }
1531
1532 return 0;
1533}
1534
1535/*
1536 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1537 * to include additional client metadata fields.
1538 */
1539static struct ceph_msg *
1540create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq)
1541{
1542 struct ceph_msg *msg;
1543 struct ceph_mds_session_head *h;
1544 int i;
1545 int extra_bytes = 0;
1546 int metadata_key_count = 0;
1547 struct ceph_options *opt = mdsc->fsc->client->options;
1548 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1549 struct ceph_client *cl = mdsc->fsc->client;
1550 size_t size, count;
1551 void *p, *end;
1552 int ret;
1553
1554 const char* metadata[][2] = {
1555 {"hostname", mdsc->nodename},
1556 {"kernel_version", init_utsname()->release},
1557 {"entity_id", opt->name ? : ""},
1558 {"root", fsopt->server_path ? : "/"},
1559 {NULL, NULL}
1560 };
1561
1562 /* Calculate serialized length of metadata */
1563 extra_bytes = 4; /* map length */
1564 for (i = 0; metadata[i][0]; ++i) {
1565 extra_bytes += 8 + strlen(metadata[i][0]) +
1566 strlen(metadata[i][1]);
1567 metadata_key_count++;
1568 }
1569
1570 /* supported feature */
1571 size = 0;
1572 count = ARRAY_SIZE(feature_bits);
1573 if (count > 0)
1574 size = FEATURE_BYTES(count);
1575 extra_bytes += 4 + size;
1576
1577 /* metric spec */
1578 size = 0;
1579 count = ARRAY_SIZE(metric_bits);
1580 if (count > 0)
1581 size = METRIC_BYTES(count);
1582 extra_bytes += 2 + 4 + 4 + size;
1583
1584 /* flags, mds auth caps and oldest_client_tid */
1585 extra_bytes += 4 + 4 + 8;
1586
1587 /* Allocate the message */
1588 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1589 GFP_NOFS, false);
1590 if (!msg) {
1591 pr_err_client(cl, "ENOMEM creating session open msg\n");
1592 return ERR_PTR(-ENOMEM);
1593 }
1594 p = msg->front.iov_base;
1595 end = p + msg->front.iov_len;
1596
1597 h = p;
1598 h->op = cpu_to_le32(op);
1599 h->seq = cpu_to_le64(seq);
1600
1601 /*
1602 * Serialize client metadata into waiting buffer space, using
1603 * the format that userspace expects for map<string, string>
1604 *
1605 * ClientSession messages with metadata are v7
1606 */
1607 msg->hdr.version = cpu_to_le16(7);
1608 msg->hdr.compat_version = cpu_to_le16(1);
1609
1610 /* The write pointer, following the session_head structure */
1611 p += sizeof(*h);
1612
1613 /* Number of entries in the map */
1614 ceph_encode_32(&p, metadata_key_count);
1615
1616 /* Two length-prefixed strings for each entry in the map */
1617 for (i = 0; metadata[i][0]; ++i) {
1618 size_t const key_len = strlen(metadata[i][0]);
1619 size_t const val_len = strlen(metadata[i][1]);
1620
1621 ceph_encode_32(&p, key_len);
1622 memcpy(p, metadata[i][0], key_len);
1623 p += key_len;
1624 ceph_encode_32(&p, val_len);
1625 memcpy(p, metadata[i][1], val_len);
1626 p += val_len;
1627 }
1628
1629 ret = encode_supported_features(&p, end);
1630 if (ret) {
1631 pr_err_client(cl, "encode_supported_features failed!\n");
1632 ceph_msg_put(msg);
1633 return ERR_PTR(ret);
1634 }
1635
1636 ret = encode_metric_spec(&p, end);
1637 if (ret) {
1638 pr_err_client(cl, "encode_metric_spec failed!\n");
1639 ceph_msg_put(msg);
1640 return ERR_PTR(ret);
1641 }
1642
1643 /* version == 5, flags */
1644 ceph_encode_32(&p, 0);
1645
1646 /* version == 6, mds auth caps */
1647 ceph_encode_32(&p, 0);
1648
1649 /* version == 7, oldest_client_tid */
1650 ceph_encode_64(&p, mdsc->oldest_tid);
1651
1652 msg->front.iov_len = p - msg->front.iov_base;
1653 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1654
1655 return msg;
1656}
1657
1658/*
1659 * send session open request.
1660 *
1661 * called under mdsc->mutex
1662 */
1663static int __open_session(struct ceph_mds_client *mdsc,
1664 struct ceph_mds_session *session)
1665{
1666 struct ceph_msg *msg;
1667 int mstate;
1668 int mds = session->s_mds;
1669
1670 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1671 return -EIO;
1672
1673 /* wait for mds to go active? */
1674 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1675 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
1676 ceph_mds_state_name(mstate));
1677 session->s_state = CEPH_MDS_SESSION_OPENING;
1678 session->s_renew_requested = jiffies;
1679
1680 /* send connect message */
1681 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN,
1682 session->s_seq);
1683 if (IS_ERR(msg))
1684 return PTR_ERR(msg);
1685 ceph_con_send(&session->s_con, msg);
1686 return 0;
1687}
1688
1689/*
1690 * open sessions for any export targets for the given mds
1691 *
1692 * called under mdsc->mutex
1693 */
1694static struct ceph_mds_session *
1695__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1696{
1697 struct ceph_mds_session *session;
1698 int ret;
1699
1700 session = __ceph_lookup_mds_session(mdsc, target);
1701 if (!session) {
1702 session = register_session(mdsc, target);
1703 if (IS_ERR(session))
1704 return session;
1705 }
1706 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1707 session->s_state == CEPH_MDS_SESSION_CLOSING) {
1708 ret = __open_session(mdsc, session);
1709 if (ret)
1710 return ERR_PTR(ret);
1711 }
1712
1713 return session;
1714}
1715
1716struct ceph_mds_session *
1717ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1718{
1719 struct ceph_mds_session *session;
1720 struct ceph_client *cl = mdsc->fsc->client;
1721
1722 doutc(cl, "to mds%d\n", target);
1723
1724 mutex_lock(&mdsc->mutex);
1725 session = __open_export_target_session(mdsc, target);
1726 mutex_unlock(&mdsc->mutex);
1727
1728 return session;
1729}
1730
1731static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1732 struct ceph_mds_session *session)
1733{
1734 struct ceph_mds_info *mi;
1735 struct ceph_mds_session *ts;
1736 int i, mds = session->s_mds;
1737 struct ceph_client *cl = mdsc->fsc->client;
1738
1739 if (mds >= mdsc->mdsmap->possible_max_rank)
1740 return;
1741
1742 mi = &mdsc->mdsmap->m_info[mds];
1743 doutc(cl, "for mds%d (%d targets)\n", session->s_mds,
1744 mi->num_export_targets);
1745
1746 for (i = 0; i < mi->num_export_targets; i++) {
1747 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1748 ceph_put_mds_session(ts);
1749 }
1750}
1751
1752/*
1753 * session caps
1754 */
1755
1756static void detach_cap_releases(struct ceph_mds_session *session,
1757 struct list_head *target)
1758{
1759 struct ceph_client *cl = session->s_mdsc->fsc->client;
1760
1761 lockdep_assert_held(&session->s_cap_lock);
1762
1763 list_splice_init(&session->s_cap_releases, target);
1764 session->s_num_cap_releases = 0;
1765 doutc(cl, "mds%d\n", session->s_mds);
1766}
1767
1768static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1769 struct list_head *dispose)
1770{
1771 while (!list_empty(dispose)) {
1772 struct ceph_cap *cap;
1773 /* zero out the in-progress message */
1774 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1775 list_del(&cap->session_caps);
1776 ceph_put_cap(mdsc, cap);
1777 }
1778}
1779
1780static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1781 struct ceph_mds_session *session)
1782{
1783 struct ceph_client *cl = mdsc->fsc->client;
1784 struct ceph_mds_request *req;
1785 struct rb_node *p;
1786
1787 doutc(cl, "mds%d\n", session->s_mds);
1788 mutex_lock(&mdsc->mutex);
1789 while (!list_empty(&session->s_unsafe)) {
1790 req = list_first_entry(&session->s_unsafe,
1791 struct ceph_mds_request, r_unsafe_item);
1792 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
1793 req->r_tid);
1794 if (req->r_target_inode)
1795 mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1796 if (req->r_unsafe_dir)
1797 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1798 __unregister_request(mdsc, req);
1799 }
1800 /* zero r_attempts, so kick_requests() will re-send requests */
1801 p = rb_first(&mdsc->request_tree);
1802 while (p) {
1803 req = rb_entry(p, struct ceph_mds_request, r_node);
1804 p = rb_next(p);
1805 if (req->r_session &&
1806 req->r_session->s_mds == session->s_mds)
1807 req->r_attempts = 0;
1808 }
1809 mutex_unlock(&mdsc->mutex);
1810}
1811
1812/*
1813 * Helper to safely iterate over all caps associated with a session, with
1814 * special care taken to handle a racing __ceph_remove_cap().
1815 *
1816 * Caller must hold session s_mutex.
1817 */
1818int ceph_iterate_session_caps(struct ceph_mds_session *session,
1819 int (*cb)(struct inode *, int mds, void *),
1820 void *arg)
1821{
1822 struct ceph_client *cl = session->s_mdsc->fsc->client;
1823 struct list_head *p;
1824 struct ceph_cap *cap;
1825 struct inode *inode, *last_inode = NULL;
1826 struct ceph_cap *old_cap = NULL;
1827 int ret;
1828
1829 doutc(cl, "%p mds%d\n", session, session->s_mds);
1830 spin_lock(&session->s_cap_lock);
1831 p = session->s_caps.next;
1832 while (p != &session->s_caps) {
1833 int mds;
1834
1835 cap = list_entry(p, struct ceph_cap, session_caps);
1836 inode = igrab(&cap->ci->netfs.inode);
1837 if (!inode) {
1838 p = p->next;
1839 continue;
1840 }
1841 session->s_cap_iterator = cap;
1842 mds = cap->mds;
1843 spin_unlock(&session->s_cap_lock);
1844
1845 if (last_inode) {
1846 iput(last_inode);
1847 last_inode = NULL;
1848 }
1849 if (old_cap) {
1850 ceph_put_cap(session->s_mdsc, old_cap);
1851 old_cap = NULL;
1852 }
1853
1854 ret = cb(inode, mds, arg);
1855 last_inode = inode;
1856
1857 spin_lock(&session->s_cap_lock);
1858 p = p->next;
1859 if (!cap->ci) {
1860 doutc(cl, "finishing cap %p removal\n", cap);
1861 BUG_ON(cap->session != session);
1862 cap->session = NULL;
1863 list_del_init(&cap->session_caps);
1864 session->s_nr_caps--;
1865 atomic64_dec(&session->s_mdsc->metric.total_caps);
1866 if (cap->queue_release)
1867 __ceph_queue_cap_release(session, cap);
1868 else
1869 old_cap = cap; /* put_cap it w/o locks held */
1870 }
1871 if (ret < 0)
1872 goto out;
1873 }
1874 ret = 0;
1875out:
1876 session->s_cap_iterator = NULL;
1877 spin_unlock(&session->s_cap_lock);
1878
1879 iput(last_inode);
1880 if (old_cap)
1881 ceph_put_cap(session->s_mdsc, old_cap);
1882
1883 return ret;
1884}
1885
1886static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1887{
1888 struct ceph_inode_info *ci = ceph_inode(inode);
1889 struct ceph_client *cl = ceph_inode_to_client(inode);
1890 bool invalidate = false;
1891 struct ceph_cap *cap;
1892 int iputs = 0;
1893
1894 spin_lock(&ci->i_ceph_lock);
1895 cap = __get_cap_for_mds(ci, mds);
1896 if (cap) {
1897 doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
1898 cap, ci, &ci->netfs.inode);
1899
1900 iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1901 }
1902 spin_unlock(&ci->i_ceph_lock);
1903
1904 if (cap)
1905 wake_up_all(&ci->i_cap_wq);
1906 if (invalidate)
1907 ceph_queue_invalidate(inode);
1908 while (iputs--)
1909 iput(inode);
1910 return 0;
1911}
1912
1913/*
1914 * caller must hold session s_mutex
1915 */
1916static void remove_session_caps(struct ceph_mds_session *session)
1917{
1918 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1919 struct super_block *sb = fsc->sb;
1920 LIST_HEAD(dispose);
1921
1922 doutc(fsc->client, "on %p\n", session);
1923 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1924
1925 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1926
1927 spin_lock(&session->s_cap_lock);
1928 if (session->s_nr_caps > 0) {
1929 struct inode *inode;
1930 struct ceph_cap *cap, *prev = NULL;
1931 struct ceph_vino vino;
1932 /*
1933 * iterate_session_caps() skips inodes that are being
1934 * deleted, we need to wait until deletions are complete.
1935 * __wait_on_freeing_inode() is designed for the job,
1936 * but it is not exported, so use lookup inode function
1937 * to access it.
1938 */
1939 while (!list_empty(&session->s_caps)) {
1940 cap = list_entry(session->s_caps.next,
1941 struct ceph_cap, session_caps);
1942 if (cap == prev)
1943 break;
1944 prev = cap;
1945 vino = cap->ci->i_vino;
1946 spin_unlock(&session->s_cap_lock);
1947
1948 inode = ceph_find_inode(sb, vino);
1949 iput(inode);
1950
1951 spin_lock(&session->s_cap_lock);
1952 }
1953 }
1954
1955 // drop cap expires and unlock s_cap_lock
1956 detach_cap_releases(session, &dispose);
1957
1958 BUG_ON(session->s_nr_caps > 0);
1959 BUG_ON(!list_empty(&session->s_cap_flushing));
1960 spin_unlock(&session->s_cap_lock);
1961 dispose_cap_releases(session->s_mdsc, &dispose);
1962}
1963
1964enum {
1965 RECONNECT,
1966 RENEWCAPS,
1967 FORCE_RO,
1968};
1969
1970/*
1971 * wake up any threads waiting on this session's caps. if the cap is
1972 * old (didn't get renewed on the client reconnect), remove it now.
1973 *
1974 * caller must hold s_mutex.
1975 */
1976static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1977{
1978 struct ceph_inode_info *ci = ceph_inode(inode);
1979 unsigned long ev = (unsigned long)arg;
1980
1981 if (ev == RECONNECT) {
1982 spin_lock(&ci->i_ceph_lock);
1983 ci->i_wanted_max_size = 0;
1984 ci->i_requested_max_size = 0;
1985 spin_unlock(&ci->i_ceph_lock);
1986 } else if (ev == RENEWCAPS) {
1987 struct ceph_cap *cap;
1988
1989 spin_lock(&ci->i_ceph_lock);
1990 cap = __get_cap_for_mds(ci, mds);
1991 /* mds did not re-issue stale cap */
1992 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1993 cap->issued = cap->implemented = CEPH_CAP_PIN;
1994 spin_unlock(&ci->i_ceph_lock);
1995 } else if (ev == FORCE_RO) {
1996 }
1997 wake_up_all(&ci->i_cap_wq);
1998 return 0;
1999}
2000
2001static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2002{
2003 struct ceph_client *cl = session->s_mdsc->fsc->client;
2004
2005 doutc(cl, "session %p mds%d\n", session, session->s_mds);
2006 ceph_iterate_session_caps(session, wake_up_session_cb,
2007 (void *)(unsigned long)ev);
2008}
2009
2010/*
2011 * Send periodic message to MDS renewing all currently held caps. The
2012 * ack will reset the expiration for all caps from this session.
2013 *
2014 * caller holds s_mutex
2015 */
2016static int send_renew_caps(struct ceph_mds_client *mdsc,
2017 struct ceph_mds_session *session)
2018{
2019 struct ceph_client *cl = mdsc->fsc->client;
2020 struct ceph_msg *msg;
2021 int state;
2022
2023 if (time_after_eq(jiffies, session->s_cap_ttl) &&
2024 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
2025 pr_info_client(cl, "mds%d caps stale\n", session->s_mds);
2026 session->s_renew_requested = jiffies;
2027
2028 /* do not try to renew caps until a recovering mds has reconnected
2029 * with its clients. */
2030 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
2031 if (state < CEPH_MDS_STATE_RECONNECT) {
2032 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
2033 ceph_mds_state_name(state));
2034 return 0;
2035 }
2036
2037 doutc(cl, "to mds%d (%s)\n", session->s_mds,
2038 ceph_mds_state_name(state));
2039 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS,
2040 ++session->s_renew_seq);
2041 if (IS_ERR(msg))
2042 return PTR_ERR(msg);
2043 ceph_con_send(&session->s_con, msg);
2044 return 0;
2045}
2046
2047static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2048 struct ceph_mds_session *session, u64 seq)
2049{
2050 struct ceph_client *cl = mdsc->fsc->client;
2051 struct ceph_msg *msg;
2052
2053 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds,
2054 ceph_session_state_name(session->s_state), seq);
2055 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2056 if (!msg)
2057 return -ENOMEM;
2058 ceph_con_send(&session->s_con, msg);
2059 return 0;
2060}
2061
2062
2063/*
2064 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
2065 *
2066 * Called under session->s_mutex
2067 */
2068static void renewed_caps(struct ceph_mds_client *mdsc,
2069 struct ceph_mds_session *session, int is_renew)
2070{
2071 struct ceph_client *cl = mdsc->fsc->client;
2072 int was_stale;
2073 int wake = 0;
2074
2075 spin_lock(&session->s_cap_lock);
2076 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2077
2078 session->s_cap_ttl = session->s_renew_requested +
2079 mdsc->mdsmap->m_session_timeout*HZ;
2080
2081 if (was_stale) {
2082 if (time_before(jiffies, session->s_cap_ttl)) {
2083 pr_info_client(cl, "mds%d caps renewed\n",
2084 session->s_mds);
2085 wake = 1;
2086 } else {
2087 pr_info_client(cl, "mds%d caps still stale\n",
2088 session->s_mds);
2089 }
2090 }
2091 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds,
2092 session->s_cap_ttl, was_stale ? "stale" : "fresh",
2093 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2094 spin_unlock(&session->s_cap_lock);
2095
2096 if (wake)
2097 wake_up_session_caps(session, RENEWCAPS);
2098}
2099
2100/*
2101 * send a session close request
2102 */
2103static int request_close_session(struct ceph_mds_session *session)
2104{
2105 struct ceph_client *cl = session->s_mdsc->fsc->client;
2106 struct ceph_msg *msg;
2107
2108 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds,
2109 ceph_session_state_name(session->s_state), session->s_seq);
2110 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2111 session->s_seq);
2112 if (!msg)
2113 return -ENOMEM;
2114 ceph_con_send(&session->s_con, msg);
2115 return 1;
2116}
2117
2118/*
2119 * Called with s_mutex held.
2120 */
2121static int __close_session(struct ceph_mds_client *mdsc,
2122 struct ceph_mds_session *session)
2123{
2124 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2125 return 0;
2126 session->s_state = CEPH_MDS_SESSION_CLOSING;
2127 return request_close_session(session);
2128}
2129
2130static bool drop_negative_children(struct dentry *dentry)
2131{
2132 struct dentry *child;
2133 bool all_negative = true;
2134
2135 if (!d_is_dir(dentry))
2136 goto out;
2137
2138 spin_lock(&dentry->d_lock);
2139 hlist_for_each_entry(child, &dentry->d_children, d_sib) {
2140 if (d_really_is_positive(child)) {
2141 all_negative = false;
2142 break;
2143 }
2144 }
2145 spin_unlock(&dentry->d_lock);
2146
2147 if (all_negative)
2148 shrink_dcache_parent(dentry);
2149out:
2150 return all_negative;
2151}
2152
2153/*
2154 * Trim old(er) caps.
2155 *
2156 * Because we can't cache an inode without one or more caps, we do
2157 * this indirectly: if a cap is unused, we prune its aliases, at which
2158 * point the inode will hopefully get dropped to.
2159 *
2160 * Yes, this is a bit sloppy. Our only real goal here is to respond to
2161 * memory pressure from the MDS, though, so it needn't be perfect.
2162 */
2163static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2164{
2165 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2166 struct ceph_client *cl = mdsc->fsc->client;
2167 int *remaining = arg;
2168 struct ceph_inode_info *ci = ceph_inode(inode);
2169 int used, wanted, oissued, mine;
2170 struct ceph_cap *cap;
2171
2172 if (*remaining <= 0)
2173 return -1;
2174
2175 spin_lock(&ci->i_ceph_lock);
2176 cap = __get_cap_for_mds(ci, mds);
2177 if (!cap) {
2178 spin_unlock(&ci->i_ceph_lock);
2179 return 0;
2180 }
2181 mine = cap->issued | cap->implemented;
2182 used = __ceph_caps_used(ci);
2183 wanted = __ceph_caps_file_wanted(ci);
2184 oissued = __ceph_caps_issued_other(ci, cap);
2185
2186 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
2187 inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
2188 ceph_cap_string(oissued), ceph_cap_string(used),
2189 ceph_cap_string(wanted));
2190 if (cap == ci->i_auth_cap) {
2191 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2192 !list_empty(&ci->i_cap_snaps))
2193 goto out;
2194 if ((used | wanted) & CEPH_CAP_ANY_WR)
2195 goto out;
2196 /* Note: it's possible that i_filelock_ref becomes non-zero
2197 * after dropping auth caps. It doesn't hurt because reply
2198 * of lock mds request will re-add auth caps. */
2199 if (atomic_read(&ci->i_filelock_ref) > 0)
2200 goto out;
2201 }
2202 /* The inode has cached pages, but it's no longer used.
2203 * we can safely drop it */
2204 if (S_ISREG(inode->i_mode) &&
2205 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2206 !(oissued & CEPH_CAP_FILE_CACHE)) {
2207 used = 0;
2208 oissued = 0;
2209 }
2210 if ((used | wanted) & ~oissued & mine)
2211 goto out; /* we need these caps */
2212
2213 if (oissued) {
2214 /* we aren't the only cap.. just remove us */
2215 ceph_remove_cap(mdsc, cap, true);
2216 (*remaining)--;
2217 } else {
2218 struct dentry *dentry;
2219 /* try dropping referring dentries */
2220 spin_unlock(&ci->i_ceph_lock);
2221 dentry = d_find_any_alias(inode);
2222 if (dentry && drop_negative_children(dentry)) {
2223 int count;
2224 dput(dentry);
2225 d_prune_aliases(inode);
2226 count = icount_read(inode);
2227 if (count == 1)
2228 (*remaining)--;
2229 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
2230 inode, ceph_vinop(inode), cap, count);
2231 } else {
2232 dput(dentry);
2233 }
2234 return 0;
2235 }
2236
2237out:
2238 spin_unlock(&ci->i_ceph_lock);
2239 return 0;
2240}
2241
2242/*
2243 * Trim session cap count down to some max number.
2244 */
2245int ceph_trim_caps(struct ceph_mds_client *mdsc,
2246 struct ceph_mds_session *session,
2247 int max_caps)
2248{
2249 struct ceph_client *cl = mdsc->fsc->client;
2250 int trim_caps = session->s_nr_caps - max_caps;
2251
2252 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
2253 session->s_nr_caps, max_caps, trim_caps);
2254 if (trim_caps > 0) {
2255 int remaining = trim_caps;
2256
2257 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2258 doutc(cl, "mds%d done: %d / %d, trimmed %d\n",
2259 session->s_mds, session->s_nr_caps, max_caps,
2260 trim_caps - remaining);
2261 }
2262
2263 ceph_flush_session_cap_releases(mdsc, session);
2264 return 0;
2265}
2266
2267static int check_caps_flush(struct ceph_mds_client *mdsc,
2268 u64 want_flush_tid)
2269{
2270 struct ceph_client *cl = mdsc->fsc->client;
2271 int ret = 1;
2272
2273 spin_lock(&mdsc->cap_dirty_lock);
2274 if (!list_empty(&mdsc->cap_flush_list)) {
2275 struct ceph_cap_flush *cf =
2276 list_first_entry(&mdsc->cap_flush_list,
2277 struct ceph_cap_flush, g_list);
2278 if (cf->tid <= want_flush_tid) {
2279 doutc(cl, "still flushing tid %llu <= %llu\n",
2280 cf->tid, want_flush_tid);
2281 ret = 0;
2282 }
2283 }
2284 spin_unlock(&mdsc->cap_dirty_lock);
2285 return ret;
2286}
2287
2288/*
2289 * flush all dirty inode data to disk.
2290 *
2291 * returns true if we've flushed through want_flush_tid
2292 */
2293static void wait_caps_flush(struct ceph_mds_client *mdsc,
2294 u64 want_flush_tid)
2295{
2296 struct ceph_client *cl = mdsc->fsc->client;
2297
2298 doutc(cl, "want %llu\n", want_flush_tid);
2299
2300 wait_event(mdsc->cap_flushing_wq,
2301 check_caps_flush(mdsc, want_flush_tid));
2302
2303 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid);
2304}
2305
2306/*
2307 * called under s_mutex
2308 */
2309static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2310 struct ceph_mds_session *session)
2311{
2312 struct ceph_client *cl = mdsc->fsc->client;
2313 struct ceph_msg *msg = NULL;
2314 struct ceph_mds_cap_release *head;
2315 struct ceph_mds_cap_item *item;
2316 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2317 struct ceph_cap *cap;
2318 LIST_HEAD(tmp_list);
2319 int num_cap_releases;
2320 __le32 barrier, *cap_barrier;
2321
2322 down_read(&osdc->lock);
2323 barrier = cpu_to_le32(osdc->epoch_barrier);
2324 up_read(&osdc->lock);
2325
2326 spin_lock(&session->s_cap_lock);
2327again:
2328 list_splice_init(&session->s_cap_releases, &tmp_list);
2329 num_cap_releases = session->s_num_cap_releases;
2330 session->s_num_cap_releases = 0;
2331 spin_unlock(&session->s_cap_lock);
2332
2333 while (!list_empty(&tmp_list)) {
2334 if (!msg) {
2335 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2336 PAGE_SIZE, GFP_NOFS, false);
2337 if (!msg)
2338 goto out_err;
2339 head = msg->front.iov_base;
2340 head->num = cpu_to_le32(0);
2341 msg->front.iov_len = sizeof(*head);
2342
2343 msg->hdr.version = cpu_to_le16(2);
2344 msg->hdr.compat_version = cpu_to_le16(1);
2345 }
2346
2347 cap = list_first_entry(&tmp_list, struct ceph_cap,
2348 session_caps);
2349 list_del(&cap->session_caps);
2350 num_cap_releases--;
2351
2352 head = msg->front.iov_base;
2353 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2354 &head->num);
2355 item = msg->front.iov_base + msg->front.iov_len;
2356 item->ino = cpu_to_le64(cap->cap_ino);
2357 item->cap_id = cpu_to_le64(cap->cap_id);
2358 item->migrate_seq = cpu_to_le32(cap->mseq);
2359 item->issue_seq = cpu_to_le32(cap->issue_seq);
2360 msg->front.iov_len += sizeof(*item);
2361
2362 ceph_put_cap(mdsc, cap);
2363
2364 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2365 // Append cap_barrier field
2366 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2367 *cap_barrier = barrier;
2368 msg->front.iov_len += sizeof(*cap_barrier);
2369
2370 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2371 doutc(cl, "mds%d %p\n", session->s_mds, msg);
2372 ceph_con_send(&session->s_con, msg);
2373 msg = NULL;
2374 }
2375 }
2376
2377 BUG_ON(num_cap_releases != 0);
2378
2379 spin_lock(&session->s_cap_lock);
2380 if (!list_empty(&session->s_cap_releases))
2381 goto again;
2382 spin_unlock(&session->s_cap_lock);
2383
2384 if (msg) {
2385 // Append cap_barrier field
2386 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2387 *cap_barrier = barrier;
2388 msg->front.iov_len += sizeof(*cap_barrier);
2389
2390 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2391 doutc(cl, "mds%d %p\n", session->s_mds, msg);
2392 ceph_con_send(&session->s_con, msg);
2393 }
2394 return;
2395out_err:
2396 pr_err_client(cl, "mds%d, failed to allocate message\n",
2397 session->s_mds);
2398 spin_lock(&session->s_cap_lock);
2399 list_splice(&tmp_list, &session->s_cap_releases);
2400 session->s_num_cap_releases += num_cap_releases;
2401 spin_unlock(&session->s_cap_lock);
2402}
2403
2404static void ceph_cap_release_work(struct work_struct *work)
2405{
2406 struct ceph_mds_session *session =
2407 container_of(work, struct ceph_mds_session, s_cap_release_work);
2408
2409 mutex_lock(&session->s_mutex);
2410 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2411 session->s_state == CEPH_MDS_SESSION_HUNG)
2412 ceph_send_cap_releases(session->s_mdsc, session);
2413 mutex_unlock(&session->s_mutex);
2414 ceph_put_mds_session(session);
2415}
2416
2417void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc,
2418 struct ceph_mds_session *session)
2419{
2420 struct ceph_client *cl = mdsc->fsc->client;
2421 if (mdsc->stopping)
2422 return;
2423
2424 ceph_get_mds_session(session);
2425 if (queue_work(mdsc->fsc->cap_wq,
2426 &session->s_cap_release_work)) {
2427 doutc(cl, "cap release work queued\n");
2428 } else {
2429 ceph_put_mds_session(session);
2430 doutc(cl, "failed to queue cap release work\n");
2431 }
2432}
2433
2434/*
2435 * caller holds session->s_cap_lock
2436 */
2437void __ceph_queue_cap_release(struct ceph_mds_session *session,
2438 struct ceph_cap *cap)
2439{
2440 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2441 session->s_num_cap_releases++;
2442
2443 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2444 ceph_flush_session_cap_releases(session->s_mdsc, session);
2445}
2446
2447static void ceph_cap_reclaim_work(struct work_struct *work)
2448{
2449 struct ceph_mds_client *mdsc =
2450 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2451 int ret = ceph_trim_dentries(mdsc);
2452 if (ret == -EAGAIN)
2453 ceph_queue_cap_reclaim_work(mdsc);
2454}
2455
2456void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2457{
2458 struct ceph_client *cl = mdsc->fsc->client;
2459 if (mdsc->stopping)
2460 return;
2461
2462 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2463 doutc(cl, "caps reclaim work queued\n");
2464 } else {
2465 doutc(cl, "failed to queue caps release work\n");
2466 }
2467}
2468
2469void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2470{
2471 int val;
2472 if (!nr)
2473 return;
2474 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2475 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2476 atomic_set(&mdsc->cap_reclaim_pending, 0);
2477 ceph_queue_cap_reclaim_work(mdsc);
2478 }
2479}
2480
2481void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
2482{
2483 struct ceph_client *cl = mdsc->fsc->client;
2484 if (mdsc->stopping)
2485 return;
2486
2487 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
2488 doutc(cl, "caps unlink work queued\n");
2489 } else {
2490 doutc(cl, "failed to queue caps unlink work\n");
2491 }
2492}
2493
2494static void ceph_cap_unlink_work(struct work_struct *work)
2495{
2496 struct ceph_mds_client *mdsc =
2497 container_of(work, struct ceph_mds_client, cap_unlink_work);
2498 struct ceph_client *cl = mdsc->fsc->client;
2499
2500 doutc(cl, "begin\n");
2501 spin_lock(&mdsc->cap_delay_lock);
2502 while (!list_empty(&mdsc->cap_unlink_delay_list)) {
2503 struct ceph_inode_info *ci;
2504 struct inode *inode;
2505
2506 ci = list_first_entry(&mdsc->cap_unlink_delay_list,
2507 struct ceph_inode_info,
2508 i_cap_delay_list);
2509 list_del_init(&ci->i_cap_delay_list);
2510
2511 inode = igrab(&ci->netfs.inode);
2512 if (inode) {
2513 spin_unlock(&mdsc->cap_delay_lock);
2514 doutc(cl, "on %p %llx.%llx\n", inode,
2515 ceph_vinop(inode));
2516 ceph_check_caps(ci, CHECK_CAPS_FLUSH);
2517 iput(inode);
2518 spin_lock(&mdsc->cap_delay_lock);
2519 }
2520 }
2521 spin_unlock(&mdsc->cap_delay_lock);
2522 doutc(cl, "done\n");
2523}
2524
2525/*
2526 * requests
2527 */
2528
2529int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2530 struct inode *dir)
2531{
2532 struct ceph_inode_info *ci = ceph_inode(dir);
2533 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2534 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2535 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2536 unsigned int num_entries;
2537 u64 bytes_count;
2538 int order;
2539
2540 spin_lock(&ci->i_ceph_lock);
2541 num_entries = ci->i_files + ci->i_subdirs;
2542 spin_unlock(&ci->i_ceph_lock);
2543 num_entries = max(num_entries, 1U);
2544 num_entries = min(num_entries, opt->max_readdir);
2545
2546 bytes_count = (u64)size * num_entries;
2547 if (unlikely(bytes_count > ULONG_MAX))
2548 bytes_count = ULONG_MAX;
2549
2550 order = get_order((unsigned long)bytes_count);
2551 while (order >= 0) {
2552 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2553 __GFP_NOWARN |
2554 __GFP_ZERO,
2555 order);
2556 if (rinfo->dir_entries)
2557 break;
2558 order--;
2559 }
2560 if (!rinfo->dir_entries || unlikely(order < 0))
2561 return -ENOMEM;
2562
2563 num_entries = (PAGE_SIZE << order) / size;
2564 num_entries = min(num_entries, opt->max_readdir);
2565
2566 rinfo->dir_buf_size = PAGE_SIZE << order;
2567 req->r_num_caps = num_entries + 1;
2568 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2569 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2570 return 0;
2571}
2572
2573/*
2574 * Create an mds request.
2575 */
2576struct ceph_mds_request *
2577ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2578{
2579 struct ceph_mds_request *req;
2580
2581 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2582 if (!req)
2583 return ERR_PTR(-ENOMEM);
2584
2585 mutex_init(&req->r_fill_mutex);
2586 req->r_mdsc = mdsc;
2587 req->r_started = jiffies;
2588 req->r_start_latency = ktime_get();
2589 req->r_resend_mds = -1;
2590 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2591 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2592 req->r_fmode = -1;
2593 req->r_feature_needed = -1;
2594 kref_init(&req->r_kref);
2595 RB_CLEAR_NODE(&req->r_node);
2596 INIT_LIST_HEAD(&req->r_wait);
2597 init_completion(&req->r_completion);
2598 init_completion(&req->r_safe_completion);
2599 INIT_LIST_HEAD(&req->r_unsafe_item);
2600
2601 ktime_get_coarse_real_ts64(&req->r_stamp);
2602
2603 req->r_op = op;
2604 req->r_direct_mode = mode;
2605 return req;
2606}
2607
2608/*
2609 * return oldest (lowest) request, tid in request tree, 0 if none.
2610 *
2611 * called under mdsc->mutex.
2612 */
2613static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2614{
2615 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2616 return NULL;
2617 return rb_entry(rb_first(&mdsc->request_tree),
2618 struct ceph_mds_request, r_node);
2619}
2620
2621static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2622{
2623 return mdsc->oldest_tid;
2624}
2625
2626#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
2627static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2628{
2629 struct inode *dir = req->r_parent;
2630 struct dentry *dentry = req->r_dentry;
2631 const struct qstr *name = req->r_dname;
2632 u8 *cryptbuf = NULL;
2633 u32 len = 0;
2634 int ret = 0;
2635
2636 /* only encode if we have parent and dentry */
2637 if (!dir || !dentry)
2638 goto success;
2639
2640 /* No-op unless this is encrypted */
2641 if (!IS_ENCRYPTED(dir))
2642 goto success;
2643
2644 ret = ceph_fscrypt_prepare_readdir(dir);
2645 if (ret < 0)
2646 return ERR_PTR(ret);
2647
2648 /* No key? Just ignore it. */
2649 if (!fscrypt_has_encryption_key(dir))
2650 goto success;
2651
2652 if (!name)
2653 name = &dentry->d_name;
2654
2655 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) {
2656 WARN_ON_ONCE(1);
2657 return ERR_PTR(-ENAMETOOLONG);
2658 }
2659
2660 /* No need to append altname if name is short enough */
2661 if (len <= CEPH_NOHASH_NAME_MAX) {
2662 len = 0;
2663 goto success;
2664 }
2665
2666 cryptbuf = kmalloc(len, GFP_KERNEL);
2667 if (!cryptbuf)
2668 return ERR_PTR(-ENOMEM);
2669
2670 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len);
2671 if (ret) {
2672 kfree(cryptbuf);
2673 return ERR_PTR(ret);
2674 }
2675success:
2676 *plen = len;
2677 return cryptbuf;
2678}
2679#else
2680static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2681{
2682 *plen = 0;
2683 return NULL;
2684}
2685#endif
2686
2687/**
2688 * ceph_mdsc_build_path - build a path string to a given dentry
2689 * @mdsc: mds client
2690 * @dentry: dentry to which path should be built
2691 * @path_info: output path, length, base ino+snap, and freepath ownership flag
2692 * @for_wire: is this path going to be sent to the MDS?
2693 *
2694 * Build a string that represents the path to the dentry. This is mostly called
2695 * for two different purposes:
2696 *
2697 * 1) we need to build a path string to send to the MDS (for_wire == true)
2698 * 2) we need a path string for local presentation (e.g. debugfs)
2699 * (for_wire == false)
2700 *
2701 * The path is built in reverse, starting with the dentry. Walk back up toward
2702 * the root, building the path until the first non-snapped inode is reached
2703 * (for_wire) or the root inode is reached (!for_wire).
2704 *
2705 * Encode hidden .snap dirs as a double /, i.e.
2706 * foo/.snap/bar -> foo//bar
2707 */
2708char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2709 struct ceph_path_info *path_info, int for_wire)
2710{
2711 struct ceph_client *cl = mdsc->fsc->client;
2712 struct dentry *cur;
2713 struct inode *inode;
2714 char *path;
2715 int pos;
2716 unsigned seq;
2717 u64 base;
2718
2719 if (!dentry)
2720 return ERR_PTR(-EINVAL);
2721
2722 path = __getname();
2723 if (!path)
2724 return ERR_PTR(-ENOMEM);
2725retry:
2726 pos = PATH_MAX - 1;
2727 path[pos] = '\0';
2728
2729 seq = read_seqbegin(&rename_lock);
2730 cur = dget(dentry);
2731 for (;;) {
2732 struct dentry *parent;
2733
2734 spin_lock(&cur->d_lock);
2735 inode = d_inode(cur);
2736 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2737 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur);
2738 spin_unlock(&cur->d_lock);
2739 parent = dget_parent(cur);
2740 } else if (for_wire && inode && dentry != cur &&
2741 ceph_snap(inode) == CEPH_NOSNAP) {
2742 spin_unlock(&cur->d_lock);
2743 pos++; /* get rid of any prepended '/' */
2744 break;
2745 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2746 pos -= cur->d_name.len;
2747 if (pos < 0) {
2748 spin_unlock(&cur->d_lock);
2749 break;
2750 }
2751 memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2752 spin_unlock(&cur->d_lock);
2753 parent = dget_parent(cur);
2754 } else {
2755 int len, ret;
2756 char buf[NAME_MAX];
2757
2758 /*
2759 * Proactively copy name into buf, in case we need to
2760 * present it as-is.
2761 */
2762 memcpy(buf, cur->d_name.name, cur->d_name.len);
2763 len = cur->d_name.len;
2764 spin_unlock(&cur->d_lock);
2765 parent = dget_parent(cur);
2766
2767 ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2768 if (ret < 0) {
2769 dput(parent);
2770 dput(cur);
2771 return ERR_PTR(ret);
2772 }
2773
2774 if (fscrypt_has_encryption_key(d_inode(parent))) {
2775 len = ceph_encode_encrypted_dname(d_inode(parent),
2776 buf, len);
2777 if (len < 0) {
2778 dput(parent);
2779 dput(cur);
2780 return ERR_PTR(len);
2781 }
2782 }
2783 pos -= len;
2784 if (pos < 0) {
2785 dput(parent);
2786 break;
2787 }
2788 memcpy(path + pos, buf, len);
2789 }
2790 dput(cur);
2791 cur = parent;
2792
2793 /* Are we at the root? */
2794 if (IS_ROOT(cur))
2795 break;
2796
2797 /* Are we out of buffer? */
2798 if (--pos < 0)
2799 break;
2800
2801 path[pos] = '/';
2802 }
2803 inode = d_inode(cur);
2804 base = inode ? ceph_ino(inode) : 0;
2805 dput(cur);
2806
2807 if (read_seqretry(&rename_lock, seq))
2808 goto retry;
2809
2810 if (pos < 0) {
2811 /*
2812 * The path is longer than PATH_MAX and this function
2813 * cannot ever succeed. Creating paths that long is
2814 * possible with Ceph, but Linux cannot use them.
2815 */
2816 return ERR_PTR(-ENAMETOOLONG);
2817 }
2818
2819 /* Initialize the output structure */
2820 memset(path_info, 0, sizeof(*path_info));
2821
2822 path_info->vino.ino = base;
2823 path_info->pathlen = PATH_MAX - 1 - pos;
2824 path_info->path = path + pos;
2825 path_info->freepath = true;
2826
2827 /* Set snap from dentry if available */
2828 if (d_inode(dentry))
2829 path_info->vino.snap = ceph_snap(d_inode(dentry));
2830 else
2831 path_info->vino.snap = CEPH_NOSNAP;
2832
2833 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry),
2834 base, PATH_MAX - 1 - pos, path + pos);
2835 return path + pos;
2836}
2837
2838static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2839 struct inode *dir, struct ceph_path_info *path_info,
2840 bool parent_locked)
2841{
2842 char *path;
2843
2844 rcu_read_lock();
2845 if (!dir)
2846 dir = d_inode_rcu(dentry->d_parent);
2847 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2848 !IS_ENCRYPTED(dir)) {
2849 path_info->vino.ino = ceph_ino(dir);
2850 path_info->vino.snap = ceph_snap(dir);
2851 rcu_read_unlock();
2852 path_info->path = dentry->d_name.name;
2853 path_info->pathlen = dentry->d_name.len;
2854 path_info->freepath = false;
2855 return 0;
2856 }
2857 rcu_read_unlock();
2858 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2859 if (IS_ERR(path))
2860 return PTR_ERR(path);
2861 /*
2862 * ceph_mdsc_build_path already fills path_info, including snap handling.
2863 */
2864 return 0;
2865}
2866
2867static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info)
2868{
2869 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2870 struct dentry *dentry;
2871 char *path;
2872
2873 if (ceph_snap(inode) == CEPH_NOSNAP) {
2874 path_info->vino.ino = ceph_ino(inode);
2875 path_info->vino.snap = ceph_snap(inode);
2876 path_info->pathlen = 0;
2877 path_info->freepath = false;
2878 return 0;
2879 }
2880 dentry = d_find_alias(inode);
2881 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2882 dput(dentry);
2883 if (IS_ERR(path))
2884 return PTR_ERR(path);
2885 /*
2886 * ceph_mdsc_build_path already fills path_info, including snap from dentry.
2887 * Override with inode's snap since that's what this function is for.
2888 */
2889 path_info->vino.snap = ceph_snap(inode);
2890 return 0;
2891}
2892
2893/*
2894 * request arguments may be specified via an inode *, a dentry *, or
2895 * an explicit ino+path.
2896 */
2897static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode,
2898 struct dentry *rdentry, struct inode *rdiri,
2899 const char *rpath, u64 rino,
2900 struct ceph_path_info *path_info,
2901 bool parent_locked)
2902{
2903 struct ceph_client *cl = mdsc->fsc->client;
2904 int r = 0;
2905
2906 /* Initialize the output structure */
2907 memset(path_info, 0, sizeof(*path_info));
2908
2909 if (rinode) {
2910 r = build_inode_path(rinode, path_info);
2911 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2912 ceph_snap(rinode));
2913 } else if (rdentry) {
2914 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked);
2915 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino,
2916 path_info->pathlen, path_info->path);
2917 } else if (rpath || rino) {
2918 path_info->vino.ino = rino;
2919 path_info->vino.snap = CEPH_NOSNAP;
2920 path_info->path = rpath;
2921 path_info->pathlen = rpath ? strlen(rpath) : 0;
2922 path_info->freepath = false;
2923
2924 doutc(cl, " path %.*s\n", path_info->pathlen, rpath);
2925 }
2926
2927 return r;
2928}
2929
2930static void encode_mclientrequest_tail(void **p,
2931 const struct ceph_mds_request *req)
2932{
2933 struct ceph_timespec ts;
2934 int i;
2935
2936 ceph_encode_timespec64(&ts, &req->r_stamp);
2937 ceph_encode_copy(p, &ts, sizeof(ts));
2938
2939 /* v4: gid_list */
2940 ceph_encode_32(p, req->r_cred->group_info->ngroups);
2941 for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2942 ceph_encode_64(p, from_kgid(&init_user_ns,
2943 req->r_cred->group_info->gid[i]));
2944
2945 /* v5: altname */
2946 ceph_encode_32(p, req->r_altname_len);
2947 ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2948
2949 /* v6: fscrypt_auth and fscrypt_file */
2950 if (req->r_fscrypt_auth) {
2951 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2952
2953 ceph_encode_32(p, authlen);
2954 ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2955 } else {
2956 ceph_encode_32(p, 0);
2957 }
2958 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2959 ceph_encode_32(p, sizeof(__le64));
2960 ceph_encode_64(p, req->r_fscrypt_file);
2961 } else {
2962 ceph_encode_32(p, 0);
2963 }
2964}
2965
2966static inline u16 mds_supported_head_version(struct ceph_mds_session *session)
2967{
2968 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features))
2969 return 1;
2970
2971 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features))
2972 return 2;
2973
2974 return CEPH_MDS_REQUEST_HEAD_VERSION;
2975}
2976
2977static struct ceph_mds_request_head_legacy *
2978find_legacy_request_head(void *p, u64 features)
2979{
2980 bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2981 struct ceph_mds_request_head *head;
2982
2983 if (legacy)
2984 return (struct ceph_mds_request_head_legacy *)p;
2985 head = (struct ceph_mds_request_head *)p;
2986 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid;
2987}
2988
2989/*
2990 * called under mdsc->mutex
2991 */
2992static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2993 struct ceph_mds_request *req,
2994 bool drop_cap_releases)
2995{
2996 int mds = session->s_mds;
2997 struct ceph_mds_client *mdsc = session->s_mdsc;
2998 struct ceph_client *cl = mdsc->fsc->client;
2999 struct ceph_msg *msg;
3000 struct ceph_mds_request_head_legacy *lhead;
3001 struct ceph_path_info path_info1 = {0};
3002 struct ceph_path_info path_info2 = {0};
3003 struct dentry *old_dentry = NULL;
3004 int len;
3005 u16 releases;
3006 void *p, *end;
3007 int ret;
3008 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
3009 u16 request_head_version = mds_supported_head_version(session);
3010 kuid_t caller_fsuid = req->r_cred->fsuid;
3011 kgid_t caller_fsgid = req->r_cred->fsgid;
3012 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
3013
3014 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry,
3015 req->r_parent, req->r_path1, req->r_ino1.ino,
3016 &path_info1, parent_locked);
3017 if (ret < 0) {
3018 msg = ERR_PTR(ret);
3019 goto out;
3020 }
3021
3022 /*
3023 * When the parent directory's i_rwsem is *not* locked, req->r_parent may
3024 * have become stale (e.g. after a concurrent rename) between the time the
3025 * dentry was looked up and now. If we detect that the stored r_parent
3026 * does not match the inode number we just encoded for the request, switch
3027 * to the correct inode so that the MDS receives a valid parent reference.
3028 */
3029 if (!parent_locked && req->r_parent && path_info1.vino.ino &&
3030 ceph_ino(req->r_parent) != path_info1.vino.ino) {
3031 struct inode *old_parent = req->r_parent;
3032 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL);
3033 if (!IS_ERR(correct_dir)) {
3034 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n",
3035 ceph_ino(old_parent), path_info1.vino.ino);
3036 /*
3037 * Transfer CEPH_CAP_PIN from the old parent to the new one.
3038 * The pin was taken earlier in ceph_mdsc_submit_request().
3039 */
3040 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN);
3041 iput(old_parent);
3042 req->r_parent = correct_dir;
3043 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
3044 }
3045 }
3046
3047 /* If r_old_dentry is set, then assume that its parent is locked */
3048 if (req->r_old_dentry &&
3049 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
3050 old_dentry = req->r_old_dentry;
3051 ret = set_request_path_attr(mdsc, NULL, old_dentry,
3052 req->r_old_dentry_dir,
3053 req->r_path2, req->r_ino2.ino,
3054 &path_info2, true);
3055 if (ret < 0) {
3056 msg = ERR_PTR(ret);
3057 goto out_free1;
3058 }
3059
3060 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
3061 if (IS_ERR(req->r_altname)) {
3062 msg = ERR_CAST(req->r_altname);
3063 req->r_altname = NULL;
3064 goto out_free2;
3065 }
3066
3067 /*
3068 * For old cephs without supporting the 32bit retry/fwd feature
3069 * it will copy the raw memories directly when decoding the
3070 * requests. While new cephs will decode the head depending the
3071 * version member, so we need to make sure it will be compatible
3072 * with them both.
3073 */
3074 if (legacy)
3075 len = sizeof(struct ceph_mds_request_head_legacy);
3076 else if (request_head_version == 1)
3077 len = offsetofend(struct ceph_mds_request_head, args);
3078 else if (request_head_version == 2)
3079 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3080 else
3081 len = sizeof(struct ceph_mds_request_head);
3082
3083 /* filepaths */
3084 len += 2 * (1 + sizeof(u32) + sizeof(u64));
3085 len += path_info1.pathlen + path_info2.pathlen;
3086
3087 /* cap releases */
3088 len += sizeof(struct ceph_mds_request_release) *
3089 (!!req->r_inode_drop + !!req->r_dentry_drop +
3090 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
3091
3092 if (req->r_dentry_drop)
3093 len += path_info1.pathlen;
3094 if (req->r_old_dentry_drop)
3095 len += path_info2.pathlen;
3096
3097 /* MClientRequest tail */
3098
3099 /* req->r_stamp */
3100 len += sizeof(struct ceph_timespec);
3101
3102 /* gid list */
3103 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
3104
3105 /* alternate name */
3106 len += sizeof(u32) + req->r_altname_len;
3107
3108 /* fscrypt_auth */
3109 len += sizeof(u32); // fscrypt_auth
3110 if (req->r_fscrypt_auth)
3111 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
3112
3113 /* fscrypt_file */
3114 len += sizeof(u32);
3115 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
3116 len += sizeof(__le64);
3117
3118 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
3119 if (!msg) {
3120 msg = ERR_PTR(-ENOMEM);
3121 goto out_free2;
3122 }
3123
3124 msg->hdr.tid = cpu_to_le64(req->r_tid);
3125
3126 lhead = find_legacy_request_head(msg->front.iov_base,
3127 session->s_con.peer_features);
3128
3129 if ((req->r_mnt_idmap != &nop_mnt_idmap) &&
3130 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) {
3131 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op));
3132
3133 if (enable_unsafe_idmap) {
3134 pr_warn_once_client(cl,
3135 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3136 " is not supported by MDS. UID/GID-based restrictions may"
3137 " not work properly.\n");
3138
3139 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3140 VFSUIDT_INIT(req->r_cred->fsuid));
3141 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3142 VFSGIDT_INIT(req->r_cred->fsgid));
3143 } else {
3144 pr_err_ratelimited_client(cl,
3145 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3146 " is not supported by MDS. Fail request with -EIO.\n");
3147
3148 ret = -EIO;
3149 goto out_err;
3150 }
3151 }
3152
3153 /*
3154 * The ceph_mds_request_head_legacy didn't contain a version field, and
3155 * one was added when we moved the message version from 3->4.
3156 */
3157 if (legacy) {
3158 msg->hdr.version = cpu_to_le16(3);
3159 p = msg->front.iov_base + sizeof(*lhead);
3160 } else if (request_head_version == 1) {
3161 struct ceph_mds_request_head *nhead = msg->front.iov_base;
3162
3163 msg->hdr.version = cpu_to_le16(4);
3164 nhead->version = cpu_to_le16(1);
3165 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args);
3166 } else if (request_head_version == 2) {
3167 struct ceph_mds_request_head *nhead = msg->front.iov_base;
3168
3169 msg->hdr.version = cpu_to_le16(6);
3170 nhead->version = cpu_to_le16(2);
3171
3172 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3173 } else {
3174 struct ceph_mds_request_head *nhead = msg->front.iov_base;
3175 kuid_t owner_fsuid;
3176 kgid_t owner_fsgid;
3177
3178 msg->hdr.version = cpu_to_le16(6);
3179 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
3180 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head));
3181
3182 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) {
3183 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3184 VFSUIDT_INIT(req->r_cred->fsuid));
3185 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3186 VFSGIDT_INIT(req->r_cred->fsgid));
3187 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid));
3188 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid));
3189 } else {
3190 nhead->owner_uid = cpu_to_le32(-1);
3191 nhead->owner_gid = cpu_to_le32(-1);
3192 }
3193
3194 p = msg->front.iov_base + sizeof(*nhead);
3195 }
3196
3197 end = msg->front.iov_base + msg->front.iov_len;
3198
3199 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
3200 lhead->op = cpu_to_le32(req->r_op);
3201 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
3202 caller_fsuid));
3203 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
3204 caller_fsgid));
3205 lhead->ino = cpu_to_le64(req->r_deleg_ino);
3206 lhead->args = req->r_args;
3207
3208 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path);
3209 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path);
3210
3211 /* make note of release offset, in case we need to replay */
3212 req->r_request_release_offset = p - msg->front.iov_base;
3213
3214 /* cap releases */
3215 releases = 0;
3216 if (req->r_inode_drop)
3217 releases += ceph_encode_inode_release(&p,
3218 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3219 mds, req->r_inode_drop, req->r_inode_unless,
3220 req->r_op == CEPH_MDS_OP_READDIR);
3221 if (req->r_dentry_drop) {
3222 ret = ceph_encode_dentry_release(&p, req->r_dentry,
3223 req->r_parent, mds, req->r_dentry_drop,
3224 req->r_dentry_unless);
3225 if (ret < 0)
3226 goto out_err;
3227 releases += ret;
3228 }
3229 if (req->r_old_dentry_drop) {
3230 ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3231 req->r_old_dentry_dir, mds,
3232 req->r_old_dentry_drop,
3233 req->r_old_dentry_unless);
3234 if (ret < 0)
3235 goto out_err;
3236 releases += ret;
3237 }
3238 if (req->r_old_inode_drop)
3239 releases += ceph_encode_inode_release(&p,
3240 d_inode(req->r_old_dentry),
3241 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3242
3243 if (drop_cap_releases) {
3244 releases = 0;
3245 p = msg->front.iov_base + req->r_request_release_offset;
3246 }
3247
3248 lhead->num_releases = cpu_to_le16(releases);
3249
3250 encode_mclientrequest_tail(&p, req);
3251
3252 if (WARN_ON_ONCE(p > end)) {
3253 ceph_msg_put(msg);
3254 msg = ERR_PTR(-ERANGE);
3255 goto out_free2;
3256 }
3257
3258 msg->front.iov_len = p - msg->front.iov_base;
3259 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3260
3261 if (req->r_pagelist) {
3262 struct ceph_pagelist *pagelist = req->r_pagelist;
3263 ceph_msg_data_add_pagelist(msg, pagelist);
3264 msg->hdr.data_len = cpu_to_le32(pagelist->length);
3265 } else {
3266 msg->hdr.data_len = 0;
3267 }
3268
3269 msg->hdr.data_off = cpu_to_le16(0);
3270
3271out_free2:
3272 ceph_mdsc_free_path_info(&path_info2);
3273out_free1:
3274 ceph_mdsc_free_path_info(&path_info1);
3275out:
3276 return msg;
3277out_err:
3278 ceph_msg_put(msg);
3279 msg = ERR_PTR(ret);
3280 goto out_free2;
3281}
3282
3283/*
3284 * called under mdsc->mutex if error, under no mutex if
3285 * success.
3286 */
3287static void complete_request(struct ceph_mds_client *mdsc,
3288 struct ceph_mds_request *req)
3289{
3290 req->r_end_latency = ktime_get();
3291
3292 trace_ceph_mdsc_complete_request(mdsc, req);
3293
3294 if (req->r_callback)
3295 req->r_callback(mdsc, req);
3296 complete_all(&req->r_completion);
3297}
3298
3299/*
3300 * called under mdsc->mutex
3301 */
3302static int __prepare_send_request(struct ceph_mds_session *session,
3303 struct ceph_mds_request *req,
3304 bool drop_cap_releases)
3305{
3306 int mds = session->s_mds;
3307 struct ceph_mds_client *mdsc = session->s_mdsc;
3308 struct ceph_client *cl = mdsc->fsc->client;
3309 struct ceph_mds_request_head_legacy *lhead;
3310 struct ceph_mds_request_head *nhead;
3311 struct ceph_msg *msg;
3312 int flags = 0, old_max_retry;
3313 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3314 &session->s_features);
3315
3316 /*
3317 * Avoid infinite retrying after overflow. The client will
3318 * increase the retry count and if the MDS is old version,
3319 * so we limit to retry at most 256 times.
3320 */
3321 if (req->r_attempts) {
3322 old_max_retry = sizeof_field(struct ceph_mds_request_head,
3323 num_retry);
3324 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3325 if ((old_version && req->r_attempts >= old_max_retry) ||
3326 ((uint32_t)req->r_attempts >= U32_MAX)) {
3327 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n",
3328 req->r_tid);
3329 return -EMULTIHOP;
3330 }
3331 }
3332
3333 req->r_attempts++;
3334 if (req->r_inode) {
3335 struct ceph_cap *cap =
3336 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3337
3338 if (cap)
3339 req->r_sent_on_mseq = cap->mseq;
3340 else
3341 req->r_sent_on_mseq = -1;
3342 }
3343 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid,
3344 ceph_mds_op_name(req->r_op), req->r_attempts);
3345
3346 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3347 void *p;
3348
3349 /*
3350 * Replay. Do not regenerate message (and rebuild
3351 * paths, etc.); just use the original message.
3352 * Rebuilding paths will break for renames because
3353 * d_move mangles the src name.
3354 */
3355 msg = req->r_request;
3356 lhead = find_legacy_request_head(msg->front.iov_base,
3357 session->s_con.peer_features);
3358
3359 flags = le32_to_cpu(lhead->flags);
3360 flags |= CEPH_MDS_FLAG_REPLAY;
3361 lhead->flags = cpu_to_le32(flags);
3362
3363 if (req->r_target_inode)
3364 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3365
3366 lhead->num_retry = req->r_attempts - 1;
3367 if (!old_version) {
3368 nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3369 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3370 }
3371
3372 /* remove cap/dentry releases from message */
3373 lhead->num_releases = 0;
3374
3375 p = msg->front.iov_base + req->r_request_release_offset;
3376 encode_mclientrequest_tail(&p, req);
3377
3378 msg->front.iov_len = p - msg->front.iov_base;
3379 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3380 return 0;
3381 }
3382
3383 if (req->r_request) {
3384 ceph_msg_put(req->r_request);
3385 req->r_request = NULL;
3386 }
3387 msg = create_request_message(session, req, drop_cap_releases);
3388 if (IS_ERR(msg)) {
3389 req->r_err = PTR_ERR(msg);
3390 return PTR_ERR(msg);
3391 }
3392 req->r_request = msg;
3393
3394 lhead = find_legacy_request_head(msg->front.iov_base,
3395 session->s_con.peer_features);
3396 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3397 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3398 flags |= CEPH_MDS_FLAG_REPLAY;
3399 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3400 flags |= CEPH_MDS_FLAG_ASYNC;
3401 if (req->r_parent)
3402 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3403 lhead->flags = cpu_to_le32(flags);
3404 lhead->num_fwd = req->r_num_fwd;
3405 lhead->num_retry = req->r_attempts - 1;
3406 if (!old_version) {
3407 nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3408 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3409 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3410 }
3411
3412 doutc(cl, " r_parent = %p\n", req->r_parent);
3413 return 0;
3414}
3415
3416/*
3417 * called under mdsc->mutex
3418 */
3419static int __send_request(struct ceph_mds_session *session,
3420 struct ceph_mds_request *req,
3421 bool drop_cap_releases)
3422{
3423 int err;
3424
3425 trace_ceph_mdsc_send_request(session, req);
3426
3427 err = __prepare_send_request(session, req, drop_cap_releases);
3428 if (!err) {
3429 ceph_msg_get(req->r_request);
3430 ceph_con_send(&session->s_con, req->r_request);
3431 }
3432
3433 return err;
3434}
3435
3436/*
3437 * send request, or put it on the appropriate wait list.
3438 */
3439static void __do_request(struct ceph_mds_client *mdsc,
3440 struct ceph_mds_request *req)
3441{
3442 struct ceph_client *cl = mdsc->fsc->client;
3443 struct ceph_mds_session *session = NULL;
3444 int mds = -1;
3445 int err = 0;
3446 bool random;
3447
3448 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3449 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3450 __unregister_request(mdsc, req);
3451 return;
3452 }
3453
3454 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3455 doutc(cl, "metadata corrupted\n");
3456 err = -EIO;
3457 goto finish;
3458 }
3459 if (req->r_timeout &&
3460 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3461 doutc(cl, "timed out\n");
3462 err = -ETIMEDOUT;
3463 goto finish;
3464 }
3465 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3466 doutc(cl, "forced umount\n");
3467 err = -EIO;
3468 goto finish;
3469 }
3470 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3471 if (mdsc->mdsmap_err) {
3472 err = mdsc->mdsmap_err;
3473 doutc(cl, "mdsmap err %d\n", err);
3474 goto finish;
3475 }
3476 if (mdsc->mdsmap->m_epoch == 0) {
3477 doutc(cl, "no mdsmap, waiting for map\n");
3478 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3479 ceph_mdsc_suspend_reason_no_mdsmap);
3480 list_add(&req->r_wait, &mdsc->waiting_for_map);
3481 return;
3482 }
3483 if (!(mdsc->fsc->mount_options->flags &
3484 CEPH_MOUNT_OPT_MOUNTWAIT) &&
3485 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3486 err = -EHOSTUNREACH;
3487 goto finish;
3488 }
3489 }
3490
3491 put_request_session(req);
3492
3493 mds = __choose_mds(mdsc, req, &random);
3494 if (mds < 0 ||
3495 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3496 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3497 err = -EJUKEBOX;
3498 goto finish;
3499 }
3500 doutc(cl, "no mds or not active, waiting for map\n");
3501 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3502 ceph_mdsc_suspend_reason_no_active_mds);
3503 list_add(&req->r_wait, &mdsc->waiting_for_map);
3504 return;
3505 }
3506
3507 /* get, open session */
3508 session = __ceph_lookup_mds_session(mdsc, mds);
3509 if (!session) {
3510 session = register_session(mdsc, mds);
3511 if (IS_ERR(session)) {
3512 err = PTR_ERR(session);
3513 goto finish;
3514 }
3515 }
3516 req->r_session = ceph_get_mds_session(session);
3517
3518 doutc(cl, "mds%d session %p state %s\n", mds, session,
3519 ceph_session_state_name(session->s_state));
3520
3521 /*
3522 * The old ceph will crash the MDSs when see unknown OPs
3523 */
3524 if (req->r_feature_needed > 0 &&
3525 !test_bit(req->r_feature_needed, &session->s_features)) {
3526 err = -EOPNOTSUPP;
3527 goto out_session;
3528 }
3529
3530 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3531 session->s_state != CEPH_MDS_SESSION_HUNG) {
3532 /*
3533 * We cannot queue async requests since the caps and delegated
3534 * inodes are bound to the session. Just return -EJUKEBOX and
3535 * let the caller retry a sync request in that case.
3536 */
3537 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3538 err = -EJUKEBOX;
3539 goto out_session;
3540 }
3541
3542 /*
3543 * If the session has been REJECTED, then return a hard error,
3544 * unless it's a CLEANRECOVER mount, in which case we'll queue
3545 * it to the mdsc queue.
3546 */
3547 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3548 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) {
3549 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3550 ceph_mdsc_suspend_reason_rejected);
3551 list_add(&req->r_wait, &mdsc->waiting_for_map);
3552 } else
3553 err = -EACCES;
3554 goto out_session;
3555 }
3556
3557 if (session->s_state == CEPH_MDS_SESSION_NEW ||
3558 session->s_state == CEPH_MDS_SESSION_CLOSING) {
3559 err = __open_session(mdsc, session);
3560 if (err)
3561 goto out_session;
3562 /* retry the same mds later */
3563 if (random)
3564 req->r_resend_mds = mds;
3565 }
3566 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3567 ceph_mdsc_suspend_reason_session);
3568 list_add(&req->r_wait, &session->s_waiting);
3569 goto out_session;
3570 }
3571
3572 /* send request */
3573 req->r_resend_mds = -1; /* forget any previous mds hint */
3574
3575 if (req->r_request_started == 0) /* note request start time */
3576 req->r_request_started = jiffies;
3577
3578 /*
3579 * For async create we will choose the auth MDS of frag in parent
3580 * directory to send the request and usually this works fine, but
3581 * if the migrated the dirtory to another MDS before it could handle
3582 * it the request will be forwarded.
3583 *
3584 * And then the auth cap will be changed.
3585 */
3586 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3587 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3588 struct ceph_inode_info *ci;
3589 struct ceph_cap *cap;
3590
3591 /*
3592 * The request maybe handled very fast and the new inode
3593 * hasn't been linked to the dentry yet. We need to wait
3594 * for the ceph_finish_async_create(), which shouldn't be
3595 * stuck too long or fail in thoery, to finish when forwarding
3596 * the request.
3597 */
3598 if (!d_inode(req->r_dentry)) {
3599 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3600 TASK_KILLABLE);
3601 if (err) {
3602 mutex_lock(&req->r_fill_mutex);
3603 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3604 mutex_unlock(&req->r_fill_mutex);
3605 goto out_session;
3606 }
3607 }
3608
3609 ci = ceph_inode(d_inode(req->r_dentry));
3610
3611 spin_lock(&ci->i_ceph_lock);
3612 cap = ci->i_auth_cap;
3613 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3614 doutc(cl, "session changed for auth cap %d -> %d\n",
3615 cap->session->s_mds, session->s_mds);
3616
3617 /* Remove the auth cap from old session */
3618 spin_lock(&cap->session->s_cap_lock);
3619 cap->session->s_nr_caps--;
3620 list_del_init(&cap->session_caps);
3621 spin_unlock(&cap->session->s_cap_lock);
3622
3623 /* Add the auth cap to the new session */
3624 cap->mds = mds;
3625 cap->session = session;
3626 spin_lock(&session->s_cap_lock);
3627 session->s_nr_caps++;
3628 list_add_tail(&cap->session_caps, &session->s_caps);
3629 spin_unlock(&session->s_cap_lock);
3630
3631 change_auth_cap_ses(ci, session);
3632 }
3633 spin_unlock(&ci->i_ceph_lock);
3634 }
3635
3636 err = __send_request(session, req, false);
3637
3638out_session:
3639 ceph_put_mds_session(session);
3640finish:
3641 if (err) {
3642 doutc(cl, "early error %d\n", err);
3643 req->r_err = err;
3644 complete_request(mdsc, req);
3645 __unregister_request(mdsc, req);
3646 }
3647 return;
3648}
3649
3650/*
3651 * called under mdsc->mutex
3652 */
3653static void __wake_requests(struct ceph_mds_client *mdsc,
3654 struct list_head *head)
3655{
3656 struct ceph_client *cl = mdsc->fsc->client;
3657 struct ceph_mds_request *req;
3658 LIST_HEAD(tmp_list);
3659
3660 list_splice_init(head, &tmp_list);
3661
3662 while (!list_empty(&tmp_list)) {
3663 req = list_entry(tmp_list.next,
3664 struct ceph_mds_request, r_wait);
3665 list_del_init(&req->r_wait);
3666 doutc(cl, " wake request %p tid %llu\n", req,
3667 req->r_tid);
3668 trace_ceph_mdsc_resume_request(mdsc, req);
3669 __do_request(mdsc, req);
3670 }
3671}
3672
3673/*
3674 * Wake up threads with requests pending for @mds, so that they can
3675 * resubmit their requests to a possibly different mds.
3676 */
3677static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3678{
3679 struct ceph_client *cl = mdsc->fsc->client;
3680 struct ceph_mds_request *req;
3681 struct rb_node *p = rb_first(&mdsc->request_tree);
3682
3683 doutc(cl, "kick_requests mds%d\n", mds);
3684 while (p) {
3685 req = rb_entry(p, struct ceph_mds_request, r_node);
3686 p = rb_next(p);
3687 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3688 continue;
3689 if (req->r_attempts > 0)
3690 continue; /* only new requests */
3691 if (req->r_session &&
3692 req->r_session->s_mds == mds) {
3693 doutc(cl, " kicking tid %llu\n", req->r_tid);
3694 list_del_init(&req->r_wait);
3695 trace_ceph_mdsc_resume_request(mdsc, req);
3696 __do_request(mdsc, req);
3697 }
3698 }
3699}
3700
3701int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3702 struct ceph_mds_request *req)
3703{
3704 struct ceph_client *cl = mdsc->fsc->client;
3705 int err = 0;
3706
3707 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3708 if (req->r_inode)
3709 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3710 if (req->r_parent) {
3711 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3712 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3713 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3714 spin_lock(&ci->i_ceph_lock);
3715 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3716 __ceph_touch_fmode(ci, mdsc, fmode);
3717 spin_unlock(&ci->i_ceph_lock);
3718 }
3719 if (req->r_old_dentry_dir)
3720 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3721 CEPH_CAP_PIN);
3722
3723 if (req->r_inode) {
3724 err = ceph_wait_on_async_create(req->r_inode);
3725 if (err) {
3726 doutc(cl, "wait for async create returned: %d\n", err);
3727 return err;
3728 }
3729 }
3730
3731 if (!err && req->r_old_inode) {
3732 err = ceph_wait_on_async_create(req->r_old_inode);
3733 if (err) {
3734 doutc(cl, "wait for async create returned: %d\n", err);
3735 return err;
3736 }
3737 }
3738
3739 doutc(cl, "submit_request on %p for inode %p\n", req, dir);
3740 mutex_lock(&mdsc->mutex);
3741 __register_request(mdsc, req, dir);
3742 trace_ceph_mdsc_submit_request(mdsc, req);
3743 __do_request(mdsc, req);
3744 err = req->r_err;
3745 mutex_unlock(&mdsc->mutex);
3746 return err;
3747}
3748
3749int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3750 struct ceph_mds_request *req,
3751 ceph_mds_request_wait_callback_t wait_func)
3752{
3753 struct ceph_client *cl = mdsc->fsc->client;
3754 int err;
3755
3756 /* wait */
3757 doutc(cl, "do_request waiting\n");
3758 if (wait_func) {
3759 err = wait_func(mdsc, req);
3760 } else {
3761 long timeleft = wait_for_completion_killable_timeout(
3762 &req->r_completion,
3763 ceph_timeout_jiffies(req->r_timeout));
3764 if (timeleft > 0)
3765 err = 0;
3766 else if (!timeleft)
3767 err = -ETIMEDOUT; /* timed out */
3768 else
3769 err = timeleft; /* killed */
3770 }
3771 doutc(cl, "do_request waited, got %d\n", err);
3772 mutex_lock(&mdsc->mutex);
3773
3774 /* only abort if we didn't race with a real reply */
3775 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3776 err = le32_to_cpu(req->r_reply_info.head->result);
3777 } else if (err < 0) {
3778 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err);
3779
3780 /*
3781 * ensure we aren't running concurrently with
3782 * ceph_fill_trace or ceph_readdir_prepopulate, which
3783 * rely on locks (dir mutex) held by our caller.
3784 */
3785 mutex_lock(&req->r_fill_mutex);
3786 req->r_err = err;
3787 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3788 mutex_unlock(&req->r_fill_mutex);
3789
3790 if (req->r_parent &&
3791 (req->r_op & CEPH_MDS_OP_WRITE))
3792 ceph_invalidate_dir_request(req);
3793 } else {
3794 err = req->r_err;
3795 }
3796
3797 mutex_unlock(&mdsc->mutex);
3798 return err;
3799}
3800
3801/*
3802 * Synchrously perform an mds request. Take care of all of the
3803 * session setup, forwarding, retry details.
3804 */
3805int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3806 struct inode *dir,
3807 struct ceph_mds_request *req)
3808{
3809 struct ceph_client *cl = mdsc->fsc->client;
3810 int err;
3811
3812 doutc(cl, "do_request on %p\n", req);
3813
3814 /* issue */
3815 err = ceph_mdsc_submit_request(mdsc, dir, req);
3816 if (!err)
3817 err = ceph_mdsc_wait_request(mdsc, req, NULL);
3818 doutc(cl, "do_request %p done, result %d\n", req, err);
3819 return err;
3820}
3821
3822/*
3823 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3824 * namespace request.
3825 */
3826void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3827{
3828 struct inode *dir = req->r_parent;
3829 struct inode *old_dir = req->r_old_dentry_dir;
3830 struct ceph_client *cl = req->r_mdsc->fsc->client;
3831
3832 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n",
3833 dir, old_dir);
3834
3835 ceph_dir_clear_complete(dir);
3836 if (old_dir)
3837 ceph_dir_clear_complete(old_dir);
3838 if (req->r_dentry)
3839 ceph_invalidate_dentry_lease(req->r_dentry);
3840 if (req->r_old_dentry)
3841 ceph_invalidate_dentry_lease(req->r_old_dentry);
3842}
3843
3844/*
3845 * Handle mds reply.
3846 *
3847 * We take the session mutex and parse and process the reply immediately.
3848 * This preserves the logical ordering of replies, capabilities, etc., sent
3849 * by the MDS as they are applied to our local cache.
3850 */
3851static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3852{
3853 struct ceph_mds_client *mdsc = session->s_mdsc;
3854 struct ceph_client *cl = mdsc->fsc->client;
3855 struct ceph_mds_request *req;
3856 struct ceph_mds_reply_head *head = msg->front.iov_base;
3857 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
3858 struct ceph_snap_realm *realm;
3859 u64 tid;
3860 int err, result;
3861 int mds = session->s_mds;
3862 bool close_sessions = false;
3863
3864 if (msg->front.iov_len < sizeof(*head)) {
3865 pr_err_client(cl, "got corrupt (short) reply\n");
3866 ceph_msg_dump(msg);
3867 return;
3868 }
3869
3870 /* get request, session */
3871 tid = le64_to_cpu(msg->hdr.tid);
3872 mutex_lock(&mdsc->mutex);
3873 req = lookup_get_request(mdsc, tid);
3874 if (!req) {
3875 doutc(cl, "on unknown tid %llu\n", tid);
3876 mutex_unlock(&mdsc->mutex);
3877 return;
3878 }
3879 doutc(cl, "handle_reply %p\n", req);
3880
3881 /* correct session? */
3882 if (req->r_session != session) {
3883 pr_err_client(cl, "got %llu on session mds%d not mds%d\n",
3884 tid, session->s_mds,
3885 req->r_session ? req->r_session->s_mds : -1);
3886 mutex_unlock(&mdsc->mutex);
3887 goto out;
3888 }
3889
3890 /* dup? */
3891 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3892 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3893 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n",
3894 head->safe ? "safe" : "unsafe", tid, mds);
3895 mutex_unlock(&mdsc->mutex);
3896 goto out;
3897 }
3898 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3899 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n",
3900 tid, mds);
3901 mutex_unlock(&mdsc->mutex);
3902 goto out;
3903 }
3904
3905 result = le32_to_cpu(head->result);
3906
3907 if (head->safe) {
3908 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3909 __unregister_request(mdsc, req);
3910
3911 /* last request during umount? */
3912 if (mdsc->stopping && !__get_oldest_req(mdsc))
3913 complete_all(&mdsc->safe_umount_waiters);
3914
3915 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3916 /*
3917 * We already handled the unsafe response, now do the
3918 * cleanup. No need to examine the response; the MDS
3919 * doesn't include any result info in the safe
3920 * response. And even if it did, there is nothing
3921 * useful we could do with a revised return value.
3922 */
3923 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds);
3924
3925 mutex_unlock(&mdsc->mutex);
3926 goto out;
3927 }
3928 } else {
3929 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3930 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3931 }
3932
3933 doutc(cl, "tid %lld result %d\n", tid, result);
3934 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3935 err = parse_reply_info(session, msg, req, (u64)-1);
3936 else
3937 err = parse_reply_info(session, msg, req,
3938 session->s_con.peer_features);
3939 mutex_unlock(&mdsc->mutex);
3940
3941 /* Must find target inode outside of mutexes to avoid deadlocks */
3942 rinfo = &req->r_reply_info;
3943 if ((err >= 0) && rinfo->head->is_target) {
3944 struct inode *in = xchg(&req->r_new_inode, NULL);
3945 struct ceph_vino tvino = {
3946 .ino = le64_to_cpu(rinfo->targeti.in->ino),
3947 .snap = le64_to_cpu(rinfo->targeti.in->snapid)
3948 };
3949
3950 /*
3951 * If we ended up opening an existing inode, discard
3952 * r_new_inode
3953 */
3954 if (req->r_op == CEPH_MDS_OP_CREATE &&
3955 !req->r_reply_info.has_create_ino) {
3956 /* This should never happen on an async create */
3957 WARN_ON_ONCE(req->r_deleg_ino);
3958 iput(in);
3959 in = NULL;
3960 }
3961
3962 in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3963 if (IS_ERR(in)) {
3964 err = PTR_ERR(in);
3965 mutex_lock(&session->s_mutex);
3966 goto out_err;
3967 }
3968 req->r_target_inode = in;
3969 }
3970
3971 mutex_lock(&session->s_mutex);
3972 if (err < 0) {
3973 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n",
3974 mds, tid);
3975 ceph_msg_dump(msg);
3976 goto out_err;
3977 }
3978
3979 /* snap trace */
3980 realm = NULL;
3981 if (rinfo->snapblob_len) {
3982 down_write(&mdsc->snap_rwsem);
3983 err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3984 rinfo->snapblob + rinfo->snapblob_len,
3985 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3986 &realm);
3987 if (err) {
3988 up_write(&mdsc->snap_rwsem);
3989 close_sessions = true;
3990 if (err == -EIO)
3991 ceph_msg_dump(msg);
3992 goto out_err;
3993 }
3994 downgrade_write(&mdsc->snap_rwsem);
3995 } else {
3996 down_read(&mdsc->snap_rwsem);
3997 }
3998
3999 /* insert trace into our cache */
4000 mutex_lock(&req->r_fill_mutex);
4001 current->journal_info = req;
4002 err = ceph_fill_trace(mdsc->fsc->sb, req);
4003 if (err == 0) {
4004 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
4005 req->r_op == CEPH_MDS_OP_LSSNAP))
4006 err = ceph_readdir_prepopulate(req, req->r_session);
4007 }
4008 current->journal_info = NULL;
4009 mutex_unlock(&req->r_fill_mutex);
4010
4011 up_read(&mdsc->snap_rwsem);
4012 if (realm)
4013 ceph_put_snap_realm(mdsc, realm);
4014
4015 if (err == 0) {
4016 if (req->r_target_inode &&
4017 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
4018 struct ceph_inode_info *ci =
4019 ceph_inode(req->r_target_inode);
4020 spin_lock(&ci->i_unsafe_lock);
4021 list_add_tail(&req->r_unsafe_target_item,
4022 &ci->i_unsafe_iops);
4023 spin_unlock(&ci->i_unsafe_lock);
4024 }
4025
4026 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
4027 }
4028out_err:
4029 mutex_lock(&mdsc->mutex);
4030 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4031 if (err) {
4032 req->r_err = err;
4033 } else {
4034 req->r_reply = ceph_msg_get(msg);
4035 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
4036 }
4037 } else {
4038 doutc(cl, "reply arrived after request %lld was aborted\n", tid);
4039 }
4040 mutex_unlock(&mdsc->mutex);
4041
4042 mutex_unlock(&session->s_mutex);
4043
4044 /* kick calling process */
4045 complete_request(mdsc, req);
4046
4047 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
4048 req->r_end_latency, err);
4049out:
4050 ceph_mdsc_put_request(req);
4051
4052 /* Defer closing the sessions after s_mutex lock being released */
4053 if (close_sessions)
4054 ceph_mdsc_close_sessions(mdsc);
4055 return;
4056}
4057
4058
4059
4060/*
4061 * handle mds notification that our request has been forwarded.
4062 */
4063static void handle_forward(struct ceph_mds_client *mdsc,
4064 struct ceph_mds_session *session,
4065 struct ceph_msg *msg)
4066{
4067 struct ceph_client *cl = mdsc->fsc->client;
4068 struct ceph_mds_request *req;
4069 u64 tid = le64_to_cpu(msg->hdr.tid);
4070 u32 next_mds;
4071 u32 fwd_seq;
4072 int err = -EINVAL;
4073 void *p = msg->front.iov_base;
4074 void *end = p + msg->front.iov_len;
4075 bool aborted = false;
4076
4077 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4078 next_mds = ceph_decode_32(&p);
4079 fwd_seq = ceph_decode_32(&p);
4080
4081 mutex_lock(&mdsc->mutex);
4082 req = lookup_get_request(mdsc, tid);
4083 if (!req) {
4084 mutex_unlock(&mdsc->mutex);
4085 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds);
4086 return; /* dup reply? */
4087 }
4088
4089 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4090 doutc(cl, "forward tid %llu aborted, unregistering\n", tid);
4091 __unregister_request(mdsc, req);
4092 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
4093 /*
4094 * Avoid infinite retrying after overflow.
4095 *
4096 * The MDS will increase the fwd count and in client side
4097 * if the num_fwd is less than the one saved in request
4098 * that means the MDS is an old version and overflowed of
4099 * 8 bits.
4100 */
4101 mutex_lock(&req->r_fill_mutex);
4102 req->r_err = -EMULTIHOP;
4103 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
4104 mutex_unlock(&req->r_fill_mutex);
4105 aborted = true;
4106 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n",
4107 tid);
4108 } else {
4109 /* resend. forward race not possible; mds would drop */
4110 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds);
4111 BUG_ON(req->r_err);
4112 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
4113 req->r_attempts = 0;
4114 req->r_num_fwd = fwd_seq;
4115 req->r_resend_mds = next_mds;
4116 put_request_session(req);
4117 __do_request(mdsc, req);
4118 }
4119 mutex_unlock(&mdsc->mutex);
4120
4121 /* kick calling process */
4122 if (aborted)
4123 complete_request(mdsc, req);
4124 ceph_mdsc_put_request(req);
4125 return;
4126
4127bad:
4128 pr_err_client(cl, "decode error err=%d\n", err);
4129 ceph_msg_dump(msg);
4130}
4131
4132static int __decode_session_metadata(void **p, void *end,
4133 bool *blocklisted)
4134{
4135 /* map<string,string> */
4136 u32 n;
4137 bool err_str;
4138 ceph_decode_32_safe(p, end, n, bad);
4139 while (n-- > 0) {
4140 u32 len;
4141 ceph_decode_32_safe(p, end, len, bad);
4142 ceph_decode_need(p, end, len, bad);
4143 err_str = !strncmp(*p, "error_string", len);
4144 *p += len;
4145 ceph_decode_32_safe(p, end, len, bad);
4146 ceph_decode_need(p, end, len, bad);
4147 /*
4148 * Match "blocklisted (blacklisted)" from newer MDSes,
4149 * or "blacklisted" from older MDSes.
4150 */
4151 if (err_str && strnstr(*p, "blacklisted", len))
4152 *blocklisted = true;
4153 *p += len;
4154 }
4155 return 0;
4156bad:
4157 return -1;
4158}
4159
4160/*
4161 * handle a mds session control message
4162 */
4163static void handle_session(struct ceph_mds_session *session,
4164 struct ceph_msg *msg)
4165{
4166 struct ceph_mds_client *mdsc = session->s_mdsc;
4167 struct ceph_client *cl = mdsc->fsc->client;
4168 int mds = session->s_mds;
4169 int msg_version = le16_to_cpu(msg->hdr.version);
4170 void *p = msg->front.iov_base;
4171 void *end = p + msg->front.iov_len;
4172 struct ceph_mds_session_head *h;
4173 struct ceph_mds_cap_auth *cap_auths = NULL;
4174 u32 op, cap_auths_num = 0;
4175 u64 seq, features = 0;
4176 int wake = 0;
4177 bool blocklisted = false;
4178 u32 i;
4179
4180
4181 /* decode */
4182 ceph_decode_need(&p, end, sizeof(*h), bad);
4183 h = p;
4184 p += sizeof(*h);
4185
4186 op = le32_to_cpu(h->op);
4187 seq = le64_to_cpu(h->seq);
4188
4189 if (msg_version >= 3) {
4190 u32 len;
4191 /* version >= 2 and < 5, decode metadata, skip otherwise
4192 * as it's handled via flags.
4193 */
4194 if (msg_version >= 5)
4195 ceph_decode_skip_map(&p, end, string, string, bad);
4196 else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
4197 goto bad;
4198
4199 /* version >= 3, feature bits */
4200 ceph_decode_32_safe(&p, end, len, bad);
4201 if (len) {
4202 ceph_decode_64_safe(&p, end, features, bad);
4203 p += len - sizeof(features);
4204 }
4205 }
4206
4207 if (msg_version >= 5) {
4208 u32 flags, len;
4209
4210 /* version >= 4 */
4211 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
4212 ceph_decode_32_safe(&p, end, len, bad); /* len */
4213 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
4214
4215 /* version >= 5, flags */
4216 ceph_decode_32_safe(&p, end, flags, bad);
4217 if (flags & CEPH_SESSION_BLOCKLISTED) {
4218 pr_warn_client(cl, "mds%d session blocklisted\n",
4219 session->s_mds);
4220 blocklisted = true;
4221 }
4222 }
4223
4224 if (msg_version >= 6) {
4225 ceph_decode_32_safe(&p, end, cap_auths_num, bad);
4226 doutc(cl, "cap_auths_num %d\n", cap_auths_num);
4227
4228 if (cap_auths_num && op != CEPH_SESSION_OPEN) {
4229 WARN_ON_ONCE(op != CEPH_SESSION_OPEN);
4230 goto skip_cap_auths;
4231 }
4232
4233 cap_auths = kcalloc(cap_auths_num,
4234 sizeof(struct ceph_mds_cap_auth),
4235 GFP_KERNEL);
4236 if (!cap_auths) {
4237 pr_err_client(cl, "No memory for cap_auths\n");
4238 return;
4239 }
4240
4241 for (i = 0; i < cap_auths_num; i++) {
4242 u32 _len, j;
4243
4244 /* struct_v, struct_compat, and struct_len in MDSCapAuth */
4245 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4246
4247 /* struct_v, struct_compat, and struct_len in MDSCapMatch */
4248 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4249 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad);
4250 ceph_decode_32_safe(&p, end, _len, bad);
4251 if (_len) {
4252 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32),
4253 GFP_KERNEL);
4254 if (!cap_auths[i].match.gids) {
4255 pr_err_client(cl, "No memory for gids\n");
4256 goto fail;
4257 }
4258
4259 cap_auths[i].match.num_gids = _len;
4260 for (j = 0; j < _len; j++)
4261 ceph_decode_32_safe(&p, end,
4262 cap_auths[i].match.gids[j],
4263 bad);
4264 }
4265
4266 ceph_decode_32_safe(&p, end, _len, bad);
4267 if (_len) {
4268 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char),
4269 GFP_KERNEL);
4270 if (!cap_auths[i].match.path) {
4271 pr_err_client(cl, "No memory for path\n");
4272 goto fail;
4273 }
4274 ceph_decode_copy(&p, cap_auths[i].match.path, _len);
4275
4276 /* Remove the tailing '/' */
4277 while (_len && cap_auths[i].match.path[_len - 1] == '/') {
4278 cap_auths[i].match.path[_len - 1] = '\0';
4279 _len -= 1;
4280 }
4281 }
4282
4283 ceph_decode_32_safe(&p, end, _len, bad);
4284 if (_len) {
4285 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char),
4286 GFP_KERNEL);
4287 if (!cap_auths[i].match.fs_name) {
4288 pr_err_client(cl, "No memory for fs_name\n");
4289 goto fail;
4290 }
4291 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len);
4292 }
4293
4294 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad);
4295 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad);
4296 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad);
4297 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n",
4298 cap_auths[i].match.uid, cap_auths[i].match.num_gids,
4299 cap_auths[i].match.path, cap_auths[i].match.fs_name,
4300 cap_auths[i].match.root_squash,
4301 cap_auths[i].readable, cap_auths[i].writeable);
4302 }
4303 }
4304
4305skip_cap_auths:
4306 mutex_lock(&mdsc->mutex);
4307 if (op == CEPH_SESSION_OPEN) {
4308 if (mdsc->s_cap_auths) {
4309 for (i = 0; i < mdsc->s_cap_auths_num; i++) {
4310 kfree(mdsc->s_cap_auths[i].match.gids);
4311 kfree(mdsc->s_cap_auths[i].match.path);
4312 kfree(mdsc->s_cap_auths[i].match.fs_name);
4313 }
4314 kfree(mdsc->s_cap_auths);
4315 }
4316 mdsc->s_cap_auths_num = cap_auths_num;
4317 mdsc->s_cap_auths = cap_auths;
4318 }
4319 if (op == CEPH_SESSION_CLOSE) {
4320 ceph_get_mds_session(session);
4321 __unregister_session(mdsc, session);
4322 }
4323 /* FIXME: this ttl calculation is generous */
4324 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
4325 mutex_unlock(&mdsc->mutex);
4326
4327 mutex_lock(&session->s_mutex);
4328
4329 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds,
4330 ceph_session_op_name(op), session,
4331 ceph_session_state_name(session->s_state), seq);
4332
4333 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4334 session->s_state = CEPH_MDS_SESSION_OPEN;
4335 pr_info_client(cl, "mds%d came back\n", session->s_mds);
4336 }
4337
4338 switch (op) {
4339 case CEPH_SESSION_OPEN:
4340 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4341 pr_info_client(cl, "mds%d reconnect success\n",
4342 session->s_mds);
4343
4344 session->s_features = features;
4345 if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4346 pr_notice_client(cl, "mds%d is already opened\n",
4347 session->s_mds);
4348 } else {
4349 session->s_state = CEPH_MDS_SESSION_OPEN;
4350 renewed_caps(mdsc, session, 0);
4351 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4352 &session->s_features))
4353 metric_schedule_delayed(&mdsc->metric);
4354 }
4355
4356 /*
4357 * The connection maybe broken and the session in client
4358 * side has been reinitialized, need to update the seq
4359 * anyway.
4360 */
4361 if (!session->s_seq && seq)
4362 session->s_seq = seq;
4363
4364 wake = 1;
4365 if (mdsc->stopping)
4366 __close_session(mdsc, session);
4367 break;
4368
4369 case CEPH_SESSION_RENEWCAPS:
4370 if (session->s_renew_seq == seq)
4371 renewed_caps(mdsc, session, 1);
4372 break;
4373
4374 case CEPH_SESSION_CLOSE:
4375 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4376 pr_info_client(cl, "mds%d reconnect denied\n",
4377 session->s_mds);
4378 session->s_state = CEPH_MDS_SESSION_CLOSED;
4379 cleanup_session_requests(mdsc, session);
4380 remove_session_caps(session);
4381 wake = 2; /* for good measure */
4382 wake_up_all(&mdsc->session_close_wq);
4383 break;
4384
4385 case CEPH_SESSION_STALE:
4386 pr_info_client(cl, "mds%d caps went stale, renewing\n",
4387 session->s_mds);
4388 atomic_inc(&session->s_cap_gen);
4389 session->s_cap_ttl = jiffies - 1;
4390 send_renew_caps(mdsc, session);
4391 break;
4392
4393 case CEPH_SESSION_RECALL_STATE:
4394 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4395 break;
4396
4397 case CEPH_SESSION_FLUSHMSG:
4398 /* flush cap releases */
4399 spin_lock(&session->s_cap_lock);
4400 if (session->s_num_cap_releases)
4401 ceph_flush_session_cap_releases(mdsc, session);
4402 spin_unlock(&session->s_cap_lock);
4403
4404 send_flushmsg_ack(mdsc, session, seq);
4405 break;
4406
4407 case CEPH_SESSION_FORCE_RO:
4408 doutc(cl, "force_session_readonly %p\n", session);
4409 spin_lock(&session->s_cap_lock);
4410 session->s_readonly = true;
4411 spin_unlock(&session->s_cap_lock);
4412 wake_up_session_caps(session, FORCE_RO);
4413 break;
4414
4415 case CEPH_SESSION_REJECT:
4416 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4417 pr_info_client(cl, "mds%d rejected session\n",
4418 session->s_mds);
4419 session->s_state = CEPH_MDS_SESSION_REJECTED;
4420 cleanup_session_requests(mdsc, session);
4421 remove_session_caps(session);
4422 if (blocklisted)
4423 mdsc->fsc->blocklisted = true;
4424 wake = 2; /* for good measure */
4425 break;
4426
4427 default:
4428 pr_err_client(cl, "bad op %d mds%d\n", op, mds);
4429 WARN_ON(1);
4430 }
4431
4432 mutex_unlock(&session->s_mutex);
4433 if (wake) {
4434 mutex_lock(&mdsc->mutex);
4435 __wake_requests(mdsc, &session->s_waiting);
4436 if (wake == 2)
4437 kick_requests(mdsc, mds);
4438 mutex_unlock(&mdsc->mutex);
4439 }
4440 if (op == CEPH_SESSION_CLOSE)
4441 ceph_put_mds_session(session);
4442 return;
4443
4444bad:
4445 pr_err_client(cl, "corrupt message mds%d len %d\n", mds,
4446 (int)msg->front.iov_len);
4447 ceph_msg_dump(msg);
4448fail:
4449 for (i = 0; i < cap_auths_num; i++) {
4450 kfree(cap_auths[i].match.gids);
4451 kfree(cap_auths[i].match.path);
4452 kfree(cap_auths[i].match.fs_name);
4453 }
4454 kfree(cap_auths);
4455 return;
4456}
4457
4458void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4459{
4460 struct ceph_client *cl = req->r_mdsc->fsc->client;
4461 int dcaps;
4462
4463 dcaps = xchg(&req->r_dir_caps, 0);
4464 if (dcaps) {
4465 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4466 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4467 }
4468}
4469
4470void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req)
4471{
4472 struct ceph_client *cl = req->r_mdsc->fsc->client;
4473 int dcaps;
4474
4475 dcaps = xchg(&req->r_dir_caps, 0);
4476 if (dcaps) {
4477 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4478 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps);
4479 }
4480}
4481
4482/*
4483 * called under session->mutex.
4484 */
4485static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4486 struct ceph_mds_session *session)
4487{
4488 struct ceph_mds_request *req, *nreq;
4489 struct rb_node *p;
4490
4491 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds);
4492
4493 mutex_lock(&mdsc->mutex);
4494 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4495 __send_request(session, req, true);
4496
4497 /*
4498 * also re-send old requests when MDS enters reconnect stage. So that MDS
4499 * can process completed request in clientreplay stage.
4500 */
4501 p = rb_first(&mdsc->request_tree);
4502 while (p) {
4503 req = rb_entry(p, struct ceph_mds_request, r_node);
4504 p = rb_next(p);
4505 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4506 continue;
4507 if (req->r_attempts == 0)
4508 continue; /* only old requests */
4509 if (!req->r_session)
4510 continue;
4511 if (req->r_session->s_mds != session->s_mds)
4512 continue;
4513
4514 ceph_mdsc_release_dir_caps_async(req);
4515
4516 __send_request(session, req, true);
4517 }
4518 mutex_unlock(&mdsc->mutex);
4519}
4520
4521static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4522{
4523 struct ceph_msg *reply;
4524 struct ceph_pagelist *_pagelist;
4525 struct page *page;
4526 __le32 *addr;
4527 int err = -ENOMEM;
4528
4529 if (!recon_state->allow_multi)
4530 return -ENOSPC;
4531
4532 /* can't handle message that contains both caps and realm */
4533 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4534
4535 /* pre-allocate new pagelist */
4536 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
4537 if (!_pagelist)
4538 return -ENOMEM;
4539
4540 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4541 if (!reply)
4542 goto fail_msg;
4543
4544 /* placeholder for nr_caps */
4545 err = ceph_pagelist_encode_32(_pagelist, 0);
4546 if (err < 0)
4547 goto fail;
4548
4549 if (recon_state->nr_caps) {
4550 /* currently encoding caps */
4551 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4552 if (err)
4553 goto fail;
4554 } else {
4555 /* placeholder for nr_realms (currently encoding relams) */
4556 err = ceph_pagelist_encode_32(_pagelist, 0);
4557 if (err < 0)
4558 goto fail;
4559 }
4560
4561 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4562 if (err)
4563 goto fail;
4564
4565 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4566 addr = kmap_atomic(page);
4567 if (recon_state->nr_caps) {
4568 /* currently encoding caps */
4569 *addr = cpu_to_le32(recon_state->nr_caps);
4570 } else {
4571 /* currently encoding relams */
4572 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4573 }
4574 kunmap_atomic(addr);
4575
4576 reply->hdr.version = cpu_to_le16(5);
4577 reply->hdr.compat_version = cpu_to_le16(4);
4578
4579 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4580 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4581
4582 ceph_con_send(&recon_state->session->s_con, reply);
4583 ceph_pagelist_release(recon_state->pagelist);
4584
4585 recon_state->pagelist = _pagelist;
4586 recon_state->nr_caps = 0;
4587 recon_state->nr_realms = 0;
4588 recon_state->msg_version = 5;
4589 return 0;
4590fail:
4591 ceph_msg_put(reply);
4592fail_msg:
4593 ceph_pagelist_release(_pagelist);
4594 return err;
4595}
4596
4597static struct dentry* d_find_primary(struct inode *inode)
4598{
4599 struct dentry *alias, *dn = NULL;
4600
4601 if (hlist_empty(&inode->i_dentry))
4602 return NULL;
4603
4604 spin_lock(&inode->i_lock);
4605 if (hlist_empty(&inode->i_dentry))
4606 goto out_unlock;
4607
4608 if (S_ISDIR(inode->i_mode)) {
4609 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4610 if (!IS_ROOT(alias))
4611 dn = dget(alias);
4612 goto out_unlock;
4613 }
4614
4615 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4616 spin_lock(&alias->d_lock);
4617 if (!d_unhashed(alias) &&
4618 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4619 dn = dget_dlock(alias);
4620 }
4621 spin_unlock(&alias->d_lock);
4622 if (dn)
4623 break;
4624 }
4625out_unlock:
4626 spin_unlock(&inode->i_lock);
4627 return dn;
4628}
4629
4630/*
4631 * Encode information about a cap for a reconnect with the MDS.
4632 */
4633static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4634{
4635 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
4636 struct ceph_client *cl = ceph_inode_to_client(inode);
4637 union {
4638 struct ceph_mds_cap_reconnect v2;
4639 struct ceph_mds_cap_reconnect_v1 v1;
4640 } rec;
4641 struct ceph_inode_info *ci = ceph_inode(inode);
4642 struct ceph_reconnect_state *recon_state = arg;
4643 struct ceph_pagelist *pagelist = recon_state->pagelist;
4644 struct dentry *dentry;
4645 struct ceph_cap *cap;
4646 struct ceph_path_info path_info = {0};
4647 int err;
4648 u64 snap_follows;
4649
4650 dentry = d_find_primary(inode);
4651 if (dentry) {
4652 /* set pathbase to parent dir when msg_version >= 2 */
4653 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info,
4654 recon_state->msg_version >= 2);
4655 dput(dentry);
4656 if (IS_ERR(path)) {
4657 err = PTR_ERR(path);
4658 goto out_err;
4659 }
4660 }
4661
4662 spin_lock(&ci->i_ceph_lock);
4663 cap = __get_cap_for_mds(ci, mds);
4664 if (!cap) {
4665 spin_unlock(&ci->i_ceph_lock);
4666 err = 0;
4667 goto out_err;
4668 }
4669 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode,
4670 ceph_vinop(inode), cap, cap->cap_id,
4671 ceph_cap_string(cap->issued));
4672
4673 cap->seq = 0; /* reset cap seq */
4674 cap->issue_seq = 0; /* and issue_seq */
4675 cap->mseq = 0; /* and migrate_seq */
4676 cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4677
4678 /* These are lost when the session goes away */
4679 if (S_ISDIR(inode->i_mode)) {
4680 if (cap->issued & CEPH_CAP_DIR_CREATE) {
4681 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4682 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4683 }
4684 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4685 }
4686
4687 if (recon_state->msg_version >= 2) {
4688 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4689 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4690 rec.v2.issued = cpu_to_le32(cap->issued);
4691 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4692 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino);
4693 rec.v2.flock_len = (__force __le32)
4694 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4695 } else {
4696 struct timespec64 ts;
4697
4698 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4699 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4700 rec.v1.issued = cpu_to_le32(cap->issued);
4701 rec.v1.size = cpu_to_le64(i_size_read(inode));
4702 ts = inode_get_mtime(inode);
4703 ceph_encode_timespec64(&rec.v1.mtime, &ts);
4704 ts = inode_get_atime(inode);
4705 ceph_encode_timespec64(&rec.v1.atime, &ts);
4706 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4707 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino);
4708 }
4709
4710 if (list_empty(&ci->i_cap_snaps)) {
4711 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4712 } else {
4713 struct ceph_cap_snap *capsnap =
4714 list_first_entry(&ci->i_cap_snaps,
4715 struct ceph_cap_snap, ci_item);
4716 snap_follows = capsnap->follows;
4717 }
4718 spin_unlock(&ci->i_ceph_lock);
4719
4720 if (recon_state->msg_version >= 2) {
4721 int num_fcntl_locks, num_flock_locks;
4722 struct ceph_filelock *flocks = NULL;
4723 size_t struct_len, total_len = sizeof(u64);
4724 u8 struct_v = 0;
4725
4726encode_again:
4727 if (rec.v2.flock_len) {
4728 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4729 } else {
4730 num_fcntl_locks = 0;
4731 num_flock_locks = 0;
4732 }
4733 if (num_fcntl_locks + num_flock_locks > 0) {
4734 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4735 sizeof(struct ceph_filelock),
4736 GFP_NOFS);
4737 if (!flocks) {
4738 err = -ENOMEM;
4739 goto out_err;
4740 }
4741 err = ceph_encode_locks_to_buffer(inode, flocks,
4742 num_fcntl_locks,
4743 num_flock_locks);
4744 if (err) {
4745 kfree(flocks);
4746 flocks = NULL;
4747 if (err == -ENOSPC)
4748 goto encode_again;
4749 goto out_err;
4750 }
4751 } else {
4752 kfree(flocks);
4753 flocks = NULL;
4754 }
4755
4756 if (recon_state->msg_version >= 3) {
4757 /* version, compat_version and struct_len */
4758 total_len += 2 * sizeof(u8) + sizeof(u32);
4759 struct_v = 2;
4760 }
4761 /*
4762 * number of encoded locks is stable, so copy to pagelist
4763 */
4764 struct_len = 2 * sizeof(u32) +
4765 (num_fcntl_locks + num_flock_locks) *
4766 sizeof(struct ceph_filelock);
4767 rec.v2.flock_len = cpu_to_le32(struct_len);
4768
4769 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2);
4770
4771 if (struct_v >= 2)
4772 struct_len += sizeof(u64); /* snap_follows */
4773
4774 total_len += struct_len;
4775
4776 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4777 err = send_reconnect_partial(recon_state);
4778 if (err)
4779 goto out_freeflocks;
4780 pagelist = recon_state->pagelist;
4781 }
4782
4783 err = ceph_pagelist_reserve(pagelist, total_len);
4784 if (err)
4785 goto out_freeflocks;
4786
4787 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4788 if (recon_state->msg_version >= 3) {
4789 ceph_pagelist_encode_8(pagelist, struct_v);
4790 ceph_pagelist_encode_8(pagelist, 1);
4791 ceph_pagelist_encode_32(pagelist, struct_len);
4792 }
4793 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4794 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4795 ceph_locks_to_pagelist(flocks, pagelist,
4796 num_fcntl_locks, num_flock_locks);
4797 if (struct_v >= 2)
4798 ceph_pagelist_encode_64(pagelist, snap_follows);
4799out_freeflocks:
4800 kfree(flocks);
4801 } else {
4802 err = ceph_pagelist_reserve(pagelist,
4803 sizeof(u64) + sizeof(u32) +
4804 path_info.pathlen + sizeof(rec.v1));
4805 if (err)
4806 goto out_err;
4807
4808 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4809 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4810 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4811 }
4812
4813out_err:
4814 ceph_mdsc_free_path_info(&path_info);
4815 if (!err)
4816 recon_state->nr_caps++;
4817 return err;
4818}
4819
4820static int encode_snap_realms(struct ceph_mds_client *mdsc,
4821 struct ceph_reconnect_state *recon_state)
4822{
4823 struct rb_node *p;
4824 struct ceph_pagelist *pagelist = recon_state->pagelist;
4825 struct ceph_client *cl = mdsc->fsc->client;
4826 int err = 0;
4827
4828 if (recon_state->msg_version >= 4) {
4829 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4830 if (err < 0)
4831 goto fail;
4832 }
4833
4834 /*
4835 * snaprealms. we provide mds with the ino, seq (version), and
4836 * parent for all of our realms. If the mds has any newer info,
4837 * it will tell us.
4838 */
4839 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4840 struct ceph_snap_realm *realm =
4841 rb_entry(p, struct ceph_snap_realm, node);
4842 struct ceph_mds_snaprealm_reconnect sr_rec;
4843
4844 if (recon_state->msg_version >= 4) {
4845 size_t need = sizeof(u8) * 2 + sizeof(u32) +
4846 sizeof(sr_rec);
4847
4848 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4849 err = send_reconnect_partial(recon_state);
4850 if (err)
4851 goto fail;
4852 pagelist = recon_state->pagelist;
4853 }
4854
4855 err = ceph_pagelist_reserve(pagelist, need);
4856 if (err)
4857 goto fail;
4858
4859 ceph_pagelist_encode_8(pagelist, 1);
4860 ceph_pagelist_encode_8(pagelist, 1);
4861 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4862 }
4863
4864 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n",
4865 realm->ino, realm->seq, realm->parent_ino);
4866 sr_rec.ino = cpu_to_le64(realm->ino);
4867 sr_rec.seq = cpu_to_le64(realm->seq);
4868 sr_rec.parent = cpu_to_le64(realm->parent_ino);
4869
4870 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4871 if (err)
4872 goto fail;
4873
4874 recon_state->nr_realms++;
4875 }
4876fail:
4877 return err;
4878}
4879
4880
4881/*
4882 * If an MDS fails and recovers, clients need to reconnect in order to
4883 * reestablish shared state. This includes all caps issued through
4884 * this session _and_ the snap_realm hierarchy. Because it's not
4885 * clear which snap realms the mds cares about, we send everything we
4886 * know about.. that ensures we'll then get any new info the
4887 * recovering MDS might have.
4888 *
4889 * This is a relatively heavyweight operation, but it's rare.
4890 */
4891static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4892 struct ceph_mds_session *session)
4893{
4894 struct ceph_client *cl = mdsc->fsc->client;
4895 struct ceph_msg *reply;
4896 int mds = session->s_mds;
4897 int err = -ENOMEM;
4898 struct ceph_reconnect_state recon_state = {
4899 .session = session,
4900 };
4901 LIST_HEAD(dispose);
4902
4903 pr_info_client(cl, "mds%d reconnect start\n", mds);
4904
4905 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4906 if (!recon_state.pagelist)
4907 goto fail_nopagelist;
4908
4909 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4910 if (!reply)
4911 goto fail_nomsg;
4912
4913 xa_destroy(&session->s_delegated_inos);
4914
4915 mutex_lock(&session->s_mutex);
4916 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4917 session->s_seq = 0;
4918
4919 doutc(cl, "session %p state %s\n", session,
4920 ceph_session_state_name(session->s_state));
4921
4922 atomic_inc(&session->s_cap_gen);
4923
4924 spin_lock(&session->s_cap_lock);
4925 /* don't know if session is readonly */
4926 session->s_readonly = 0;
4927 /*
4928 * notify __ceph_remove_cap() that we are composing cap reconnect.
4929 * If a cap get released before being added to the cap reconnect,
4930 * __ceph_remove_cap() should skip queuing cap release.
4931 */
4932 session->s_cap_reconnect = 1;
4933 /* drop old cap expires; we're about to reestablish that state */
4934 detach_cap_releases(session, &dispose);
4935 spin_unlock(&session->s_cap_lock);
4936 dispose_cap_releases(mdsc, &dispose);
4937
4938 /* trim unused caps to reduce MDS's cache rejoin time */
4939 if (mdsc->fsc->sb->s_root)
4940 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4941
4942 ceph_con_close(&session->s_con);
4943 ceph_con_open(&session->s_con,
4944 CEPH_ENTITY_TYPE_MDS, mds,
4945 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4946
4947 /* replay unsafe requests */
4948 replay_unsafe_requests(mdsc, session);
4949
4950 ceph_early_kick_flushing_caps(mdsc, session);
4951
4952 down_read(&mdsc->snap_rwsem);
4953
4954 /* placeholder for nr_caps */
4955 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4956 if (err)
4957 goto fail;
4958
4959 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4960 recon_state.msg_version = 3;
4961 recon_state.allow_multi = true;
4962 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4963 recon_state.msg_version = 3;
4964 } else {
4965 recon_state.msg_version = 2;
4966 }
4967 /* traverse this session's caps */
4968 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4969
4970 spin_lock(&session->s_cap_lock);
4971 session->s_cap_reconnect = 0;
4972 spin_unlock(&session->s_cap_lock);
4973
4974 if (err < 0)
4975 goto fail;
4976
4977 /* check if all realms can be encoded into current message */
4978 if (mdsc->num_snap_realms) {
4979 size_t total_len =
4980 recon_state.pagelist->length +
4981 mdsc->num_snap_realms *
4982 sizeof(struct ceph_mds_snaprealm_reconnect);
4983 if (recon_state.msg_version >= 4) {
4984 /* number of realms */
4985 total_len += sizeof(u32);
4986 /* version, compat_version and struct_len */
4987 total_len += mdsc->num_snap_realms *
4988 (2 * sizeof(u8) + sizeof(u32));
4989 }
4990 if (total_len > RECONNECT_MAX_SIZE) {
4991 if (!recon_state.allow_multi) {
4992 err = -ENOSPC;
4993 goto fail;
4994 }
4995 if (recon_state.nr_caps) {
4996 err = send_reconnect_partial(&recon_state);
4997 if (err)
4998 goto fail;
4999 }
5000 recon_state.msg_version = 5;
5001 }
5002 }
5003
5004 err = encode_snap_realms(mdsc, &recon_state);
5005 if (err < 0)
5006 goto fail;
5007
5008 if (recon_state.msg_version >= 5) {
5009 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
5010 if (err < 0)
5011 goto fail;
5012 }
5013
5014 if (recon_state.nr_caps || recon_state.nr_realms) {
5015 struct page *page =
5016 list_first_entry(&recon_state.pagelist->head,
5017 struct page, lru);
5018 __le32 *addr = kmap_atomic(page);
5019 if (recon_state.nr_caps) {
5020 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
5021 *addr = cpu_to_le32(recon_state.nr_caps);
5022 } else if (recon_state.msg_version >= 4) {
5023 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
5024 }
5025 kunmap_atomic(addr);
5026 }
5027
5028 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
5029 if (recon_state.msg_version >= 4)
5030 reply->hdr.compat_version = cpu_to_le16(4);
5031
5032 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
5033 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
5034
5035 ceph_con_send(&session->s_con, reply);
5036
5037 mutex_unlock(&session->s_mutex);
5038
5039 mutex_lock(&mdsc->mutex);
5040 __wake_requests(mdsc, &session->s_waiting);
5041 mutex_unlock(&mdsc->mutex);
5042
5043 up_read(&mdsc->snap_rwsem);
5044 ceph_pagelist_release(recon_state.pagelist);
5045 return;
5046
5047fail:
5048 ceph_msg_put(reply);
5049 up_read(&mdsc->snap_rwsem);
5050 mutex_unlock(&session->s_mutex);
5051fail_nomsg:
5052 ceph_pagelist_release(recon_state.pagelist);
5053fail_nopagelist:
5054 pr_err_client(cl, "error %d preparing reconnect for mds%d\n",
5055 err, mds);
5056 return;
5057}
5058
5059
5060/*
5061 * compare old and new mdsmaps, kicking requests
5062 * and closing out old connections as necessary
5063 *
5064 * called under mdsc->mutex.
5065 */
5066static void check_new_map(struct ceph_mds_client *mdsc,
5067 struct ceph_mdsmap *newmap,
5068 struct ceph_mdsmap *oldmap)
5069{
5070 int i, j, err;
5071 int oldstate, newstate;
5072 struct ceph_mds_session *s;
5073 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
5074 struct ceph_client *cl = mdsc->fsc->client;
5075
5076 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch);
5077
5078 if (newmap->m_info) {
5079 for (i = 0; i < newmap->possible_max_rank; i++) {
5080 for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
5081 set_bit(newmap->m_info[i].export_targets[j], targets);
5082 }
5083 }
5084
5085 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5086 if (!mdsc->sessions[i])
5087 continue;
5088 s = mdsc->sessions[i];
5089 oldstate = ceph_mdsmap_get_state(oldmap, i);
5090 newstate = ceph_mdsmap_get_state(newmap, i);
5091
5092 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n",
5093 i, ceph_mds_state_name(oldstate),
5094 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
5095 ceph_mds_state_name(newstate),
5096 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
5097 ceph_session_state_name(s->s_state));
5098
5099 if (i >= newmap->possible_max_rank) {
5100 /* force close session for stopped mds */
5101 ceph_get_mds_session(s);
5102 __unregister_session(mdsc, s);
5103 __wake_requests(mdsc, &s->s_waiting);
5104 mutex_unlock(&mdsc->mutex);
5105
5106 mutex_lock(&s->s_mutex);
5107 cleanup_session_requests(mdsc, s);
5108 remove_session_caps(s);
5109 mutex_unlock(&s->s_mutex);
5110
5111 ceph_put_mds_session(s);
5112
5113 mutex_lock(&mdsc->mutex);
5114 kick_requests(mdsc, i);
5115 continue;
5116 }
5117
5118 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
5119 ceph_mdsmap_get_addr(newmap, i),
5120 sizeof(struct ceph_entity_addr))) {
5121 /* just close it */
5122 mutex_unlock(&mdsc->mutex);
5123 mutex_lock(&s->s_mutex);
5124 mutex_lock(&mdsc->mutex);
5125 ceph_con_close(&s->s_con);
5126 mutex_unlock(&s->s_mutex);
5127 s->s_state = CEPH_MDS_SESSION_RESTARTING;
5128 } else if (oldstate == newstate) {
5129 continue; /* nothing new with this mds */
5130 }
5131
5132 /*
5133 * send reconnect?
5134 */
5135 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
5136 newstate >= CEPH_MDS_STATE_RECONNECT) {
5137 mutex_unlock(&mdsc->mutex);
5138 clear_bit(i, targets);
5139 send_mds_reconnect(mdsc, s);
5140 mutex_lock(&mdsc->mutex);
5141 }
5142
5143 /*
5144 * kick request on any mds that has gone active.
5145 */
5146 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
5147 newstate >= CEPH_MDS_STATE_ACTIVE) {
5148 if (oldstate != CEPH_MDS_STATE_CREATING &&
5149 oldstate != CEPH_MDS_STATE_STARTING)
5150 pr_info_client(cl, "mds%d recovery completed\n",
5151 s->s_mds);
5152 kick_requests(mdsc, i);
5153 mutex_unlock(&mdsc->mutex);
5154 mutex_lock(&s->s_mutex);
5155 mutex_lock(&mdsc->mutex);
5156 ceph_kick_flushing_caps(mdsc, s);
5157 mutex_unlock(&s->s_mutex);
5158 wake_up_session_caps(s, RECONNECT);
5159 }
5160 }
5161
5162 /*
5163 * Only open and reconnect sessions that don't exist yet.
5164 */
5165 for (i = 0; i < newmap->possible_max_rank; i++) {
5166 /*
5167 * In case the import MDS is crashed just after
5168 * the EImportStart journal is flushed, so when
5169 * a standby MDS takes over it and is replaying
5170 * the EImportStart journal the new MDS daemon
5171 * will wait the client to reconnect it, but the
5172 * client may never register/open the session yet.
5173 *
5174 * Will try to reconnect that MDS daemon if the
5175 * rank number is in the export targets array and
5176 * is the up:reconnect state.
5177 */
5178 newstate = ceph_mdsmap_get_state(newmap, i);
5179 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
5180 continue;
5181
5182 /*
5183 * The session maybe registered and opened by some
5184 * requests which were choosing random MDSes during
5185 * the mdsc->mutex's unlock/lock gap below in rare
5186 * case. But the related MDS daemon will just queue
5187 * that requests and be still waiting for the client's
5188 * reconnection request in up:reconnect state.
5189 */
5190 s = __ceph_lookup_mds_session(mdsc, i);
5191 if (likely(!s)) {
5192 s = __open_export_target_session(mdsc, i);
5193 if (IS_ERR(s)) {
5194 err = PTR_ERR(s);
5195 pr_err_client(cl,
5196 "failed to open export target session, err %d\n",
5197 err);
5198 continue;
5199 }
5200 }
5201 doutc(cl, "send reconnect to export target mds.%d\n", i);
5202 mutex_unlock(&mdsc->mutex);
5203 send_mds_reconnect(mdsc, s);
5204 ceph_put_mds_session(s);
5205 mutex_lock(&mdsc->mutex);
5206 }
5207
5208 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5209 s = mdsc->sessions[i];
5210 if (!s)
5211 continue;
5212 if (!ceph_mdsmap_is_laggy(newmap, i))
5213 continue;
5214 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5215 s->s_state == CEPH_MDS_SESSION_HUNG ||
5216 s->s_state == CEPH_MDS_SESSION_CLOSING) {
5217 doutc(cl, " connecting to export targets of laggy mds%d\n", i);
5218 __open_export_target_sessions(mdsc, s);
5219 }
5220 }
5221}
5222
5223
5224
5225/*
5226 * leases
5227 */
5228
5229/*
5230 * caller must hold session s_mutex, dentry->d_lock
5231 */
5232void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
5233{
5234 struct ceph_dentry_info *di = ceph_dentry(dentry);
5235
5236 ceph_put_mds_session(di->lease_session);
5237 di->lease_session = NULL;
5238}
5239
5240static void handle_lease(struct ceph_mds_client *mdsc,
5241 struct ceph_mds_session *session,
5242 struct ceph_msg *msg)
5243{
5244 struct ceph_client *cl = mdsc->fsc->client;
5245 struct super_block *sb = mdsc->fsc->sb;
5246 struct inode *inode;
5247 struct dentry *parent, *dentry;
5248 struct ceph_dentry_info *di;
5249 int mds = session->s_mds;
5250 struct ceph_mds_lease *h = msg->front.iov_base;
5251 u32 seq;
5252 struct ceph_vino vino;
5253 struct qstr dname;
5254 int release = 0;
5255
5256 doutc(cl, "from mds%d\n", mds);
5257
5258 if (!ceph_inc_mds_stopping_blocker(mdsc, session))
5259 return;
5260
5261 /* decode */
5262 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
5263 goto bad;
5264 vino.ino = le64_to_cpu(h->ino);
5265 vino.snap = CEPH_NOSNAP;
5266 seq = le32_to_cpu(h->seq);
5267 dname.len = get_unaligned_le32(h + 1);
5268 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
5269 goto bad;
5270 dname.name = (void *)(h + 1) + sizeof(u32);
5271
5272 /* lookup inode */
5273 inode = ceph_find_inode(sb, vino);
5274 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action),
5275 vino.ino, inode, dname.len, dname.name);
5276
5277 mutex_lock(&session->s_mutex);
5278 if (!inode) {
5279 doutc(cl, "no inode %llx\n", vino.ino);
5280 goto release;
5281 }
5282
5283 /* dentry */
5284 parent = d_find_alias(inode);
5285 if (!parent) {
5286 doutc(cl, "no parent dentry on inode %p\n", inode);
5287 WARN_ON(1);
5288 goto release; /* hrm... */
5289 }
5290 dname.hash = full_name_hash(parent, dname.name, dname.len);
5291 dentry = d_lookup(parent, &dname);
5292 dput(parent);
5293 if (!dentry)
5294 goto release;
5295
5296 spin_lock(&dentry->d_lock);
5297 di = ceph_dentry(dentry);
5298 switch (h->action) {
5299 case CEPH_MDS_LEASE_REVOKE:
5300 if (di->lease_session == session) {
5301 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
5302 h->seq = cpu_to_le32(di->lease_seq);
5303 __ceph_mdsc_drop_dentry_lease(dentry);
5304 }
5305 release = 1;
5306 break;
5307
5308 case CEPH_MDS_LEASE_RENEW:
5309 if (di->lease_session == session &&
5310 di->lease_gen == atomic_read(&session->s_cap_gen) &&
5311 di->lease_renew_from &&
5312 di->lease_renew_after == 0) {
5313 unsigned long duration =
5314 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
5315
5316 di->lease_seq = seq;
5317 di->time = di->lease_renew_from + duration;
5318 di->lease_renew_after = di->lease_renew_from +
5319 (duration >> 1);
5320 di->lease_renew_from = 0;
5321 }
5322 break;
5323 }
5324 spin_unlock(&dentry->d_lock);
5325 dput(dentry);
5326
5327 if (!release)
5328 goto out;
5329
5330release:
5331 /* let's just reuse the same message */
5332 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
5333 ceph_msg_get(msg);
5334 ceph_con_send(&session->s_con, msg);
5335
5336out:
5337 mutex_unlock(&session->s_mutex);
5338 iput(inode);
5339
5340 ceph_dec_mds_stopping_blocker(mdsc);
5341 return;
5342
5343bad:
5344 ceph_dec_mds_stopping_blocker(mdsc);
5345
5346 pr_err_client(cl, "corrupt lease message\n");
5347 ceph_msg_dump(msg);
5348}
5349
5350void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5351 struct dentry *dentry, char action,
5352 u32 seq)
5353{
5354 struct ceph_client *cl = session->s_mdsc->fsc->client;
5355 struct ceph_msg *msg;
5356 struct ceph_mds_lease *lease;
5357 struct inode *dir;
5358 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5359
5360 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action),
5361 session->s_mds);
5362
5363 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5364 if (!msg)
5365 return;
5366 lease = msg->front.iov_base;
5367 lease->action = action;
5368 lease->seq = cpu_to_le32(seq);
5369
5370 spin_lock(&dentry->d_lock);
5371 dir = d_inode(dentry->d_parent);
5372 lease->ino = cpu_to_le64(ceph_ino(dir));
5373 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5374
5375 put_unaligned_le32(dentry->d_name.len, lease + 1);
5376 memcpy((void *)(lease + 1) + 4,
5377 dentry->d_name.name, dentry->d_name.len);
5378 spin_unlock(&dentry->d_lock);
5379
5380 ceph_con_send(&session->s_con, msg);
5381}
5382
5383/*
5384 * lock unlock the session, to wait ongoing session activities
5385 */
5386static void lock_unlock_session(struct ceph_mds_session *s)
5387{
5388 mutex_lock(&s->s_mutex);
5389 mutex_unlock(&s->s_mutex);
5390}
5391
5392static void maybe_recover_session(struct ceph_mds_client *mdsc)
5393{
5394 struct ceph_client *cl = mdsc->fsc->client;
5395 struct ceph_fs_client *fsc = mdsc->fsc;
5396
5397 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5398 return;
5399
5400 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5401 return;
5402
5403 if (!READ_ONCE(fsc->blocklisted))
5404 return;
5405
5406 pr_info_client(cl, "auto reconnect after blocklisted\n");
5407 ceph_force_reconnect(fsc->sb);
5408}
5409
5410bool check_session_state(struct ceph_mds_session *s)
5411{
5412 struct ceph_client *cl = s->s_mdsc->fsc->client;
5413
5414 switch (s->s_state) {
5415 case CEPH_MDS_SESSION_OPEN:
5416 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5417 s->s_state = CEPH_MDS_SESSION_HUNG;
5418 pr_info_client(cl, "mds%d hung\n", s->s_mds);
5419 }
5420 break;
5421 case CEPH_MDS_SESSION_CLOSING:
5422 case CEPH_MDS_SESSION_NEW:
5423 case CEPH_MDS_SESSION_RESTARTING:
5424 case CEPH_MDS_SESSION_CLOSED:
5425 case CEPH_MDS_SESSION_REJECTED:
5426 return false;
5427 }
5428
5429 return true;
5430}
5431
5432/*
5433 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5434 * then we need to retransmit that request.
5435 */
5436void inc_session_sequence(struct ceph_mds_session *s)
5437{
5438 struct ceph_client *cl = s->s_mdsc->fsc->client;
5439
5440 lockdep_assert_held(&s->s_mutex);
5441
5442 s->s_seq++;
5443
5444 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5445 int ret;
5446
5447 doutc(cl, "resending session close request for mds%d\n", s->s_mds);
5448 ret = request_close_session(s);
5449 if (ret < 0)
5450 pr_err_client(cl, "unable to close session to mds%d: %d\n",
5451 s->s_mds, ret);
5452 }
5453}
5454
5455/*
5456 * delayed work -- periodically trim expired leases, renew caps with mds. If
5457 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
5458 * workqueue delay value of 5 secs will be used.
5459 */
5460static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5461{
5462 unsigned long max_delay = HZ * 5;
5463
5464 /* 5 secs default delay */
5465 if (!delay || (delay > max_delay))
5466 delay = max_delay;
5467 schedule_delayed_work(&mdsc->delayed_work,
5468 round_jiffies_relative(delay));
5469}
5470
5471static void delayed_work(struct work_struct *work)
5472{
5473 struct ceph_mds_client *mdsc =
5474 container_of(work, struct ceph_mds_client, delayed_work.work);
5475 unsigned long delay;
5476 int renew_interval;
5477 int renew_caps;
5478 int i;
5479
5480 doutc(mdsc->fsc->client, "mdsc delayed_work\n");
5481
5482 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5483 return;
5484
5485 mutex_lock(&mdsc->mutex);
5486 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5487 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5488 mdsc->last_renew_caps);
5489 if (renew_caps)
5490 mdsc->last_renew_caps = jiffies;
5491
5492 for (i = 0; i < mdsc->max_sessions; i++) {
5493 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5494 if (!s)
5495 continue;
5496
5497 if (!check_session_state(s)) {
5498 ceph_put_mds_session(s);
5499 continue;
5500 }
5501 mutex_unlock(&mdsc->mutex);
5502
5503 ceph_flush_session_cap_releases(mdsc, s);
5504
5505 mutex_lock(&s->s_mutex);
5506 if (renew_caps)
5507 send_renew_caps(mdsc, s);
5508 else
5509 ceph_con_keepalive(&s->s_con);
5510 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5511 s->s_state == CEPH_MDS_SESSION_HUNG)
5512 ceph_send_cap_releases(mdsc, s);
5513 mutex_unlock(&s->s_mutex);
5514 ceph_put_mds_session(s);
5515
5516 mutex_lock(&mdsc->mutex);
5517 }
5518 mutex_unlock(&mdsc->mutex);
5519
5520 delay = ceph_check_delayed_caps(mdsc);
5521
5522 ceph_queue_cap_reclaim_work(mdsc);
5523
5524 ceph_trim_snapid_map(mdsc);
5525
5526 maybe_recover_session(mdsc);
5527
5528 schedule_delayed(mdsc, delay);
5529}
5530
5531int ceph_mdsc_init(struct ceph_fs_client *fsc)
5532
5533{
5534 struct ceph_mds_client *mdsc;
5535 int err;
5536
5537 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5538 if (!mdsc)
5539 return -ENOMEM;
5540 mdsc->fsc = fsc;
5541 mutex_init(&mdsc->mutex);
5542 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5543 if (!mdsc->mdsmap) {
5544 err = -ENOMEM;
5545 goto err_mdsc;
5546 }
5547
5548 init_completion(&mdsc->safe_umount_waiters);
5549 spin_lock_init(&mdsc->stopping_lock);
5550 atomic_set(&mdsc->stopping_blockers, 0);
5551 init_completion(&mdsc->stopping_waiter);
5552 atomic64_set(&mdsc->dirty_folios, 0);
5553 init_waitqueue_head(&mdsc->flush_end_wq);
5554 init_waitqueue_head(&mdsc->session_close_wq);
5555 INIT_LIST_HEAD(&mdsc->waiting_for_map);
5556 mdsc->quotarealms_inodes = RB_ROOT;
5557 mutex_init(&mdsc->quotarealms_inodes_mutex);
5558 init_rwsem(&mdsc->snap_rwsem);
5559 mdsc->snap_realms = RB_ROOT;
5560 INIT_LIST_HEAD(&mdsc->snap_empty);
5561 spin_lock_init(&mdsc->snap_empty_lock);
5562 mdsc->request_tree = RB_ROOT;
5563 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5564 mdsc->last_renew_caps = jiffies;
5565 INIT_LIST_HEAD(&mdsc->cap_delay_list);
5566#ifdef CONFIG_DEBUG_FS
5567 INIT_LIST_HEAD(&mdsc->cap_wait_list);
5568#endif
5569 spin_lock_init(&mdsc->cap_delay_lock);
5570 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
5571 INIT_LIST_HEAD(&mdsc->snap_flush_list);
5572 spin_lock_init(&mdsc->snap_flush_lock);
5573 mdsc->last_cap_flush_tid = 1;
5574 INIT_LIST_HEAD(&mdsc->cap_flush_list);
5575 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5576 spin_lock_init(&mdsc->cap_dirty_lock);
5577 init_waitqueue_head(&mdsc->cap_flushing_wq);
5578 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5579 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
5580 err = ceph_metric_init(&mdsc->metric);
5581 if (err)
5582 goto err_mdsmap;
5583
5584 spin_lock_init(&mdsc->dentry_list_lock);
5585 INIT_LIST_HEAD(&mdsc->dentry_leases);
5586 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5587
5588 ceph_caps_init(mdsc);
5589 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5590
5591 spin_lock_init(&mdsc->snapid_map_lock);
5592 mdsc->snapid_map_tree = RB_ROOT;
5593 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5594
5595 init_rwsem(&mdsc->pool_perm_rwsem);
5596 mdsc->pool_perm_tree = RB_ROOT;
5597
5598 strscpy(mdsc->nodename, utsname()->nodename,
5599 sizeof(mdsc->nodename));
5600
5601 fsc->mdsc = mdsc;
5602 return 0;
5603
5604err_mdsmap:
5605 kfree(mdsc->mdsmap);
5606err_mdsc:
5607 kfree(mdsc);
5608 return err;
5609}
5610
5611/*
5612 * Wait for safe replies on open mds requests. If we time out, drop
5613 * all requests from the tree to avoid dangling dentry refs.
5614 */
5615static void wait_requests(struct ceph_mds_client *mdsc)
5616{
5617 struct ceph_client *cl = mdsc->fsc->client;
5618 struct ceph_options *opts = mdsc->fsc->client->options;
5619 struct ceph_mds_request *req;
5620
5621 mutex_lock(&mdsc->mutex);
5622 if (__get_oldest_req(mdsc)) {
5623 mutex_unlock(&mdsc->mutex);
5624
5625 doutc(cl, "waiting for requests\n");
5626 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5627 ceph_timeout_jiffies(opts->mount_timeout));
5628
5629 /* tear down remaining requests */
5630 mutex_lock(&mdsc->mutex);
5631 while ((req = __get_oldest_req(mdsc))) {
5632 doutc(cl, "timed out on tid %llu\n", req->r_tid);
5633 list_del_init(&req->r_wait);
5634 __unregister_request(mdsc, req);
5635 }
5636 }
5637 mutex_unlock(&mdsc->mutex);
5638 doutc(cl, "done\n");
5639}
5640
5641void send_flush_mdlog(struct ceph_mds_session *s)
5642{
5643 struct ceph_client *cl = s->s_mdsc->fsc->client;
5644 struct ceph_msg *msg;
5645
5646 /*
5647 * Pre-luminous MDS crashes when it sees an unknown session request
5648 */
5649 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5650 return;
5651
5652 mutex_lock(&s->s_mutex);
5653 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n",
5654 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5655 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5656 s->s_seq);
5657 if (!msg) {
5658 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n",
5659 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5660 } else {
5661 ceph_con_send(&s->s_con, msg);
5662 }
5663 mutex_unlock(&s->s_mutex);
5664}
5665
5666static int ceph_mds_auth_match(struct ceph_mds_client *mdsc,
5667 struct ceph_mds_cap_auth *auth,
5668 const struct cred *cred,
5669 char *tpath)
5670{
5671 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5672 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5673 struct ceph_client *cl = mdsc->fsc->client;
5674 const char *fs_name = mdsc->fsc->mount_options->mds_namespace;
5675 const char *spath = mdsc->fsc->mount_options->server_path;
5676 bool gid_matched = false;
5677 u32 gid, tlen, len;
5678 int i, j;
5679
5680 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n",
5681 fs_name, auth->match.fs_name ? auth->match.fs_name : "");
5682 if (auth->match.fs_name && strcmp(auth->match.fs_name, fs_name)) {
5683 /* fsname mismatch, try next one */
5684 return 0;
5685 }
5686
5687 doutc(cl, "match.uid %lld\n", auth->match.uid);
5688 if (auth->match.uid != MDS_AUTH_UID_ANY) {
5689 if (auth->match.uid != caller_uid)
5690 return 0;
5691 if (auth->match.num_gids) {
5692 for (i = 0; i < auth->match.num_gids; i++) {
5693 if (caller_gid == auth->match.gids[i])
5694 gid_matched = true;
5695 }
5696 if (!gid_matched && cred->group_info->ngroups) {
5697 for (i = 0; i < cred->group_info->ngroups; i++) {
5698 gid = from_kgid(&init_user_ns,
5699 cred->group_info->gid[i]);
5700 for (j = 0; j < auth->match.num_gids; j++) {
5701 if (gid == auth->match.gids[j]) {
5702 gid_matched = true;
5703 break;
5704 }
5705 }
5706 if (gid_matched)
5707 break;
5708 }
5709 }
5710 if (!gid_matched)
5711 return 0;
5712 }
5713 }
5714
5715 /* path match */
5716 if (auth->match.path) {
5717 if (!tpath)
5718 return 0;
5719
5720 tlen = strlen(tpath);
5721 len = strlen(auth->match.path);
5722 if (len) {
5723 char *_tpath = tpath;
5724 bool free_tpath = false;
5725 int m, n;
5726
5727 doutc(cl, "server path %s, tpath %s, match.path %s\n",
5728 spath, tpath, auth->match.path);
5729 if (spath && (m = strlen(spath)) != 1) {
5730 /* mount path + '/' + tpath + an extra space */
5731 n = m + 1 + tlen + 1;
5732 _tpath = kmalloc(n, GFP_NOFS);
5733 if (!_tpath)
5734 return -ENOMEM;
5735 /* remove the leading '/' */
5736 snprintf(_tpath, n, "%s/%s", spath + 1, tpath);
5737 free_tpath = true;
5738 tlen = strlen(_tpath);
5739 }
5740
5741 /*
5742 * Please note the tailing '/' for match.path has already
5743 * been removed when parsing.
5744 *
5745 * Remove the tailing '/' for the target path.
5746 */
5747 while (tlen && _tpath[tlen - 1] == '/') {
5748 _tpath[tlen - 1] = '\0';
5749 tlen -= 1;
5750 }
5751 doutc(cl, "_tpath %s\n", _tpath);
5752
5753 /*
5754 * In case first == _tpath && tlen == len:
5755 * match.path=/foo --> /foo _path=/foo --> match
5756 * match.path=/foo/ --> /foo _path=/foo --> match
5757 *
5758 * In case first == _tmatch.path && tlen > len:
5759 * match.path=/foo/ --> /foo _path=/foo/ --> match
5760 * match.path=/foo --> /foo _path=/foo/ --> match
5761 * match.path=/foo/ --> /foo _path=/foo/d --> match
5762 * match.path=/foo --> /foo _path=/food --> mismatch
5763 *
5764 * All the other cases --> mismatch
5765 */
5766 bool path_matched = true;
5767 char *first = strstr(_tpath, auth->match.path);
5768 if (first != _tpath ||
5769 (tlen > len && _tpath[len] != '/')) {
5770 path_matched = false;
5771 }
5772
5773 if (free_tpath)
5774 kfree(_tpath);
5775
5776 if (!path_matched)
5777 return 0;
5778 }
5779 }
5780
5781 doutc(cl, "matched\n");
5782 return 1;
5783}
5784
5785int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask)
5786{
5787 const struct cred *cred = get_current_cred();
5788 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5789 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5790 struct ceph_mds_cap_auth *rw_perms_s = NULL;
5791 struct ceph_client *cl = mdsc->fsc->client;
5792 bool root_squash_perms = true;
5793 int i, err;
5794
5795 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n",
5796 tpath, mask, caller_uid, caller_gid);
5797
5798 for (i = 0; i < mdsc->s_cap_auths_num; i++) {
5799 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i];
5800
5801 err = ceph_mds_auth_match(mdsc, s, cred, tpath);
5802 if (err < 0) {
5803 put_cred(cred);
5804 return err;
5805 } else if (err > 0) {
5806 /* always follow the last auth caps' permission */
5807 root_squash_perms = true;
5808 rw_perms_s = NULL;
5809 if ((mask & MAY_WRITE) && s->writeable &&
5810 s->match.root_squash && (!caller_uid || !caller_gid))
5811 root_squash_perms = false;
5812
5813 if (((mask & MAY_WRITE) && !s->writeable) ||
5814 ((mask & MAY_READ) && !s->readable))
5815 rw_perms_s = s;
5816 }
5817 }
5818
5819 put_cred(cred);
5820
5821 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms,
5822 rw_perms_s);
5823 if (root_squash_perms && rw_perms_s == NULL) {
5824 doutc(cl, "access allowed\n");
5825 return 0;
5826 }
5827
5828 if (!root_squash_perms) {
5829 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write",
5830 caller_uid, caller_gid);
5831 }
5832 if (rw_perms_s) {
5833 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d",
5834 rw_perms_s->readable, rw_perms_s->writeable,
5835 !!(mask & MAY_READ), !!(mask & MAY_WRITE));
5836 }
5837 doutc(cl, "access denied\n");
5838 return -EACCES;
5839}
5840
5841/*
5842 * called before mount is ro, and before dentries are torn down.
5843 * (hmm, does this still race with new lookups?)
5844 */
5845void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5846{
5847 doutc(mdsc->fsc->client, "begin\n");
5848 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5849
5850 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5851 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5852 ceph_flush_dirty_caps(mdsc);
5853 wait_requests(mdsc);
5854
5855 /*
5856 * wait for reply handlers to drop their request refs and
5857 * their inode/dcache refs
5858 */
5859 ceph_msgr_flush();
5860
5861 ceph_cleanup_quotarealms_inodes(mdsc);
5862 doutc(mdsc->fsc->client, "done\n");
5863}
5864
5865/*
5866 * flush the mdlog and wait for all write mds requests to flush.
5867 */
5868static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5869 u64 want_tid)
5870{
5871 struct ceph_client *cl = mdsc->fsc->client;
5872 struct ceph_mds_request *req = NULL, *nextreq;
5873 struct ceph_mds_session *last_session = NULL;
5874 struct rb_node *n;
5875
5876 mutex_lock(&mdsc->mutex);
5877 doutc(cl, "want %lld\n", want_tid);
5878restart:
5879 req = __get_oldest_req(mdsc);
5880 while (req && req->r_tid <= want_tid) {
5881 /* find next request */
5882 n = rb_next(&req->r_node);
5883 if (n)
5884 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5885 else
5886 nextreq = NULL;
5887 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5888 (req->r_op & CEPH_MDS_OP_WRITE)) {
5889 struct ceph_mds_session *s = req->r_session;
5890
5891 if (!s) {
5892 req = nextreq;
5893 continue;
5894 }
5895
5896 /* write op */
5897 ceph_mdsc_get_request(req);
5898 if (nextreq)
5899 ceph_mdsc_get_request(nextreq);
5900 s = ceph_get_mds_session(s);
5901 mutex_unlock(&mdsc->mutex);
5902
5903 /* send flush mdlog request to MDS */
5904 if (last_session != s) {
5905 send_flush_mdlog(s);
5906 ceph_put_mds_session(last_session);
5907 last_session = s;
5908 } else {
5909 ceph_put_mds_session(s);
5910 }
5911 doutc(cl, "wait on %llu (want %llu)\n",
5912 req->r_tid, want_tid);
5913 wait_for_completion(&req->r_safe_completion);
5914
5915 mutex_lock(&mdsc->mutex);
5916 ceph_mdsc_put_request(req);
5917 if (!nextreq)
5918 break; /* next dne before, so we're done! */
5919 if (RB_EMPTY_NODE(&nextreq->r_node)) {
5920 /* next request was removed from tree */
5921 ceph_mdsc_put_request(nextreq);
5922 goto restart;
5923 }
5924 ceph_mdsc_put_request(nextreq); /* won't go away */
5925 }
5926 req = nextreq;
5927 }
5928 mutex_unlock(&mdsc->mutex);
5929 ceph_put_mds_session(last_session);
5930 doutc(cl, "done\n");
5931}
5932
5933void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5934{
5935 struct ceph_client *cl = mdsc->fsc->client;
5936 u64 want_tid, want_flush;
5937
5938 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5939 return;
5940
5941 doutc(cl, "sync\n");
5942 mutex_lock(&mdsc->mutex);
5943 want_tid = mdsc->last_tid;
5944 mutex_unlock(&mdsc->mutex);
5945
5946 ceph_flush_dirty_caps(mdsc);
5947 ceph_flush_cap_releases(mdsc);
5948 spin_lock(&mdsc->cap_dirty_lock);
5949 want_flush = mdsc->last_cap_flush_tid;
5950 if (!list_empty(&mdsc->cap_flush_list)) {
5951 struct ceph_cap_flush *cf =
5952 list_last_entry(&mdsc->cap_flush_list,
5953 struct ceph_cap_flush, g_list);
5954 cf->wake = true;
5955 }
5956 spin_unlock(&mdsc->cap_dirty_lock);
5957
5958 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
5959
5960 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5961 wait_caps_flush(mdsc, want_flush);
5962}
5963
5964/*
5965 * true if all sessions are closed, or we force unmount
5966 */
5967static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5968{
5969 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5970 return true;
5971 return atomic_read(&mdsc->num_sessions) <= skipped;
5972}
5973
5974/*
5975 * called after sb is ro or when metadata corrupted.
5976 */
5977void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5978{
5979 struct ceph_options *opts = mdsc->fsc->client->options;
5980 struct ceph_client *cl = mdsc->fsc->client;
5981 struct ceph_mds_session *session;
5982 int i;
5983 int skipped = 0;
5984
5985 doutc(cl, "begin\n");
5986
5987 /* close sessions */
5988 mutex_lock(&mdsc->mutex);
5989 for (i = 0; i < mdsc->max_sessions; i++) {
5990 session = __ceph_lookup_mds_session(mdsc, i);
5991 if (!session)
5992 continue;
5993 mutex_unlock(&mdsc->mutex);
5994 mutex_lock(&session->s_mutex);
5995 if (__close_session(mdsc, session) <= 0)
5996 skipped++;
5997 mutex_unlock(&session->s_mutex);
5998 ceph_put_mds_session(session);
5999 mutex_lock(&mdsc->mutex);
6000 }
6001 mutex_unlock(&mdsc->mutex);
6002
6003 doutc(cl, "waiting for sessions to close\n");
6004 wait_event_timeout(mdsc->session_close_wq,
6005 done_closing_sessions(mdsc, skipped),
6006 ceph_timeout_jiffies(opts->mount_timeout));
6007
6008 /* tear down remaining sessions */
6009 mutex_lock(&mdsc->mutex);
6010 for (i = 0; i < mdsc->max_sessions; i++) {
6011 if (mdsc->sessions[i]) {
6012 session = ceph_get_mds_session(mdsc->sessions[i]);
6013 __unregister_session(mdsc, session);
6014 mutex_unlock(&mdsc->mutex);
6015 mutex_lock(&session->s_mutex);
6016 remove_session_caps(session);
6017 mutex_unlock(&session->s_mutex);
6018 ceph_put_mds_session(session);
6019 mutex_lock(&mdsc->mutex);
6020 }
6021 }
6022 WARN_ON(!list_empty(&mdsc->cap_delay_list));
6023 mutex_unlock(&mdsc->mutex);
6024
6025 ceph_cleanup_snapid_map(mdsc);
6026 ceph_cleanup_global_and_empty_realms(mdsc);
6027
6028 cancel_work_sync(&mdsc->cap_reclaim_work);
6029 cancel_work_sync(&mdsc->cap_unlink_work);
6030 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
6031
6032 doutc(cl, "done\n");
6033}
6034
6035void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
6036{
6037 struct ceph_mds_session *session;
6038 int mds;
6039
6040 doutc(mdsc->fsc->client, "force umount\n");
6041
6042 mutex_lock(&mdsc->mutex);
6043 for (mds = 0; mds < mdsc->max_sessions; mds++) {
6044 session = __ceph_lookup_mds_session(mdsc, mds);
6045 if (!session)
6046 continue;
6047
6048 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
6049 __unregister_session(mdsc, session);
6050 __wake_requests(mdsc, &session->s_waiting);
6051 mutex_unlock(&mdsc->mutex);
6052
6053 mutex_lock(&session->s_mutex);
6054 __close_session(mdsc, session);
6055 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
6056 cleanup_session_requests(mdsc, session);
6057 remove_session_caps(session);
6058 }
6059 mutex_unlock(&session->s_mutex);
6060 ceph_put_mds_session(session);
6061
6062 mutex_lock(&mdsc->mutex);
6063 kick_requests(mdsc, mds);
6064 }
6065 __wake_requests(mdsc, &mdsc->waiting_for_map);
6066 mutex_unlock(&mdsc->mutex);
6067}
6068
6069static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
6070{
6071 doutc(mdsc->fsc->client, "stop\n");
6072 /*
6073 * Make sure the delayed work stopped before releasing
6074 * the resources.
6075 *
6076 * Because the cancel_delayed_work_sync() will only
6077 * guarantee that the work finishes executing. But the
6078 * delayed work will re-arm itself again after that.
6079 */
6080 flush_delayed_work(&mdsc->delayed_work);
6081
6082 if (mdsc->mdsmap)
6083 ceph_mdsmap_destroy(mdsc->mdsmap);
6084 kfree(mdsc->sessions);
6085 ceph_caps_finalize(mdsc);
6086
6087 if (mdsc->s_cap_auths) {
6088 int i;
6089
6090 for (i = 0; i < mdsc->s_cap_auths_num; i++) {
6091 kfree(mdsc->s_cap_auths[i].match.gids);
6092 kfree(mdsc->s_cap_auths[i].match.path);
6093 kfree(mdsc->s_cap_auths[i].match.fs_name);
6094 }
6095 kfree(mdsc->s_cap_auths);
6096 }
6097
6098 ceph_pool_perm_destroy(mdsc);
6099}
6100
6101void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
6102{
6103 struct ceph_mds_client *mdsc = fsc->mdsc;
6104 doutc(fsc->client, "%p\n", mdsc);
6105
6106 if (!mdsc)
6107 return;
6108
6109 /* flush out any connection work with references to us */
6110 ceph_msgr_flush();
6111
6112 ceph_mdsc_stop(mdsc);
6113
6114 ceph_metric_destroy(&mdsc->metric);
6115
6116 fsc->mdsc = NULL;
6117 kfree(mdsc);
6118 doutc(fsc->client, "%p done\n", mdsc);
6119}
6120
6121void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6122{
6123 struct ceph_fs_client *fsc = mdsc->fsc;
6124 struct ceph_client *cl = fsc->client;
6125 const char *mds_namespace = fsc->mount_options->mds_namespace;
6126 void *p = msg->front.iov_base;
6127 void *end = p + msg->front.iov_len;
6128 u32 epoch;
6129 u32 num_fs;
6130 u32 mount_fscid = (u32)-1;
6131 int err = -EINVAL;
6132
6133 ceph_decode_need(&p, end, sizeof(u32), bad);
6134 epoch = ceph_decode_32(&p);
6135
6136 doutc(cl, "epoch %u\n", epoch);
6137
6138 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
6139 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
6140
6141 ceph_decode_32_safe(&p, end, num_fs, bad);
6142 while (num_fs-- > 0) {
6143 void *info_p, *info_end;
6144 u32 info_len;
6145 u32 fscid, namelen;
6146
6147 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
6148 p += 2; // info_v, info_cv
6149 info_len = ceph_decode_32(&p);
6150 ceph_decode_need(&p, end, info_len, bad);
6151 info_p = p;
6152 info_end = p + info_len;
6153 p = info_end;
6154
6155 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
6156 fscid = ceph_decode_32(&info_p);
6157 namelen = ceph_decode_32(&info_p);
6158 ceph_decode_need(&info_p, info_end, namelen, bad);
6159
6160 if (mds_namespace &&
6161 strlen(mds_namespace) == namelen &&
6162 !strncmp(mds_namespace, (char *)info_p, namelen)) {
6163 mount_fscid = fscid;
6164 break;
6165 }
6166 }
6167
6168 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
6169 if (mount_fscid != (u32)-1) {
6170 fsc->client->monc.fs_cluster_id = mount_fscid;
6171 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
6172 0, true);
6173 ceph_monc_renew_subs(&fsc->client->monc);
6174 } else {
6175 err = -ENOENT;
6176 goto err_out;
6177 }
6178 return;
6179
6180bad:
6181 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n",
6182 err);
6183 ceph_umount_begin(mdsc->fsc->sb);
6184 ceph_msg_dump(msg);
6185err_out:
6186 mutex_lock(&mdsc->mutex);
6187 mdsc->mdsmap_err = err;
6188 __wake_requests(mdsc, &mdsc->waiting_for_map);
6189 mutex_unlock(&mdsc->mutex);
6190}
6191
6192/*
6193 * handle mds map update.
6194 */
6195void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6196{
6197 struct ceph_client *cl = mdsc->fsc->client;
6198 u32 epoch;
6199 u32 maplen;
6200 void *p = msg->front.iov_base;
6201 void *end = p + msg->front.iov_len;
6202 struct ceph_mdsmap *newmap, *oldmap;
6203 struct ceph_fsid fsid;
6204 int err = -EINVAL;
6205
6206 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
6207 ceph_decode_copy(&p, &fsid, sizeof(fsid));
6208 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
6209 return;
6210 epoch = ceph_decode_32(&p);
6211 maplen = ceph_decode_32(&p);
6212 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen);
6213
6214 /* do we need it? */
6215 mutex_lock(&mdsc->mutex);
6216 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
6217 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch);
6218 mutex_unlock(&mdsc->mutex);
6219 return;
6220 }
6221
6222 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client));
6223 if (IS_ERR(newmap)) {
6224 err = PTR_ERR(newmap);
6225 goto bad_unlock;
6226 }
6227
6228 /* swap into place */
6229 if (mdsc->mdsmap) {
6230 oldmap = mdsc->mdsmap;
6231 mdsc->mdsmap = newmap;
6232 check_new_map(mdsc, newmap, oldmap);
6233 ceph_mdsmap_destroy(oldmap);
6234 } else {
6235 mdsc->mdsmap = newmap; /* first mds map */
6236 }
6237 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
6238 MAX_LFS_FILESIZE);
6239
6240 __wake_requests(mdsc, &mdsc->waiting_for_map);
6241 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
6242 mdsc->mdsmap->m_epoch);
6243
6244 mutex_unlock(&mdsc->mutex);
6245 schedule_delayed(mdsc, 0);
6246 return;
6247
6248bad_unlock:
6249 mutex_unlock(&mdsc->mutex);
6250bad:
6251 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n",
6252 err);
6253 ceph_umount_begin(mdsc->fsc->sb);
6254 ceph_msg_dump(msg);
6255 return;
6256}
6257
6258static struct ceph_connection *mds_get_con(struct ceph_connection *con)
6259{
6260 struct ceph_mds_session *s = con->private;
6261
6262 if (ceph_get_mds_session(s))
6263 return con;
6264 return NULL;
6265}
6266
6267static void mds_put_con(struct ceph_connection *con)
6268{
6269 struct ceph_mds_session *s = con->private;
6270
6271 ceph_put_mds_session(s);
6272}
6273
6274/*
6275 * if the client is unresponsive for long enough, the mds will kill
6276 * the session entirely.
6277 */
6278static void mds_peer_reset(struct ceph_connection *con)
6279{
6280 struct ceph_mds_session *s = con->private;
6281 struct ceph_mds_client *mdsc = s->s_mdsc;
6282
6283 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n",
6284 s->s_mds);
6285 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO &&
6286 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT)
6287 send_mds_reconnect(mdsc, s);
6288}
6289
6290static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
6291{
6292 struct ceph_mds_session *s = con->private;
6293 struct ceph_mds_client *mdsc = s->s_mdsc;
6294 struct ceph_client *cl = mdsc->fsc->client;
6295 int type = le16_to_cpu(msg->hdr.type);
6296
6297 mutex_lock(&mdsc->mutex);
6298 if (__verify_registered_session(mdsc, s) < 0) {
6299 mutex_unlock(&mdsc->mutex);
6300 goto out;
6301 }
6302 mutex_unlock(&mdsc->mutex);
6303
6304 switch (type) {
6305 case CEPH_MSG_MDS_MAP:
6306 ceph_mdsc_handle_mdsmap(mdsc, msg);
6307 break;
6308 case CEPH_MSG_FS_MAP_USER:
6309 ceph_mdsc_handle_fsmap(mdsc, msg);
6310 break;
6311 case CEPH_MSG_CLIENT_SESSION:
6312 handle_session(s, msg);
6313 break;
6314 case CEPH_MSG_CLIENT_REPLY:
6315 handle_reply(s, msg);
6316 break;
6317 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
6318 handle_forward(mdsc, s, msg);
6319 break;
6320 case CEPH_MSG_CLIENT_CAPS:
6321 ceph_handle_caps(s, msg);
6322 break;
6323 case CEPH_MSG_CLIENT_SNAP:
6324 ceph_handle_snap(mdsc, s, msg);
6325 break;
6326 case CEPH_MSG_CLIENT_LEASE:
6327 handle_lease(mdsc, s, msg);
6328 break;
6329 case CEPH_MSG_CLIENT_QUOTA:
6330 ceph_handle_quota(mdsc, s, msg);
6331 break;
6332
6333 default:
6334 pr_err_client(cl, "received unknown message type %d %s\n",
6335 type, ceph_msg_type_name(type));
6336 }
6337out:
6338 ceph_msg_put(msg);
6339}
6340
6341/*
6342 * authentication
6343 */
6344
6345/*
6346 * Note: returned pointer is the address of a structure that's
6347 * managed separately. Caller must *not* attempt to free it.
6348 */
6349static struct ceph_auth_handshake *
6350mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
6351{
6352 struct ceph_mds_session *s = con->private;
6353 struct ceph_mds_client *mdsc = s->s_mdsc;
6354 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6355 struct ceph_auth_handshake *auth = &s->s_auth;
6356 int ret;
6357
6358 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6359 force_new, proto, NULL, NULL);
6360 if (ret)
6361 return ERR_PTR(ret);
6362
6363 return auth;
6364}
6365
6366static int mds_add_authorizer_challenge(struct ceph_connection *con,
6367 void *challenge_buf, int challenge_buf_len)
6368{
6369 struct ceph_mds_session *s = con->private;
6370 struct ceph_mds_client *mdsc = s->s_mdsc;
6371 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6372
6373 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
6374 challenge_buf, challenge_buf_len);
6375}
6376
6377static int mds_verify_authorizer_reply(struct ceph_connection *con)
6378{
6379 struct ceph_mds_session *s = con->private;
6380 struct ceph_mds_client *mdsc = s->s_mdsc;
6381 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6382 struct ceph_auth_handshake *auth = &s->s_auth;
6383
6384 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
6385 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
6386 NULL, NULL, NULL, NULL);
6387}
6388
6389static int mds_invalidate_authorizer(struct ceph_connection *con)
6390{
6391 struct ceph_mds_session *s = con->private;
6392 struct ceph_mds_client *mdsc = s->s_mdsc;
6393 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6394
6395 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
6396
6397 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
6398}
6399
6400static int mds_get_auth_request(struct ceph_connection *con,
6401 void *buf, int *buf_len,
6402 void **authorizer, int *authorizer_len)
6403{
6404 struct ceph_mds_session *s = con->private;
6405 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6406 struct ceph_auth_handshake *auth = &s->s_auth;
6407 int ret;
6408
6409 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6410 buf, buf_len);
6411 if (ret)
6412 return ret;
6413
6414 *authorizer = auth->authorizer_buf;
6415 *authorizer_len = auth->authorizer_buf_len;
6416 return 0;
6417}
6418
6419static int mds_handle_auth_reply_more(struct ceph_connection *con,
6420 void *reply, int reply_len,
6421 void *buf, int *buf_len,
6422 void **authorizer, int *authorizer_len)
6423{
6424 struct ceph_mds_session *s = con->private;
6425 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6426 struct ceph_auth_handshake *auth = &s->s_auth;
6427 int ret;
6428
6429 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
6430 buf, buf_len);
6431 if (ret)
6432 return ret;
6433
6434 *authorizer = auth->authorizer_buf;
6435 *authorizer_len = auth->authorizer_buf_len;
6436 return 0;
6437}
6438
6439static int mds_handle_auth_done(struct ceph_connection *con,
6440 u64 global_id, void *reply, int reply_len,
6441 u8 *session_key, int *session_key_len,
6442 u8 *con_secret, int *con_secret_len)
6443{
6444 struct ceph_mds_session *s = con->private;
6445 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6446 struct ceph_auth_handshake *auth = &s->s_auth;
6447
6448 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
6449 session_key, session_key_len,
6450 con_secret, con_secret_len);
6451}
6452
6453static int mds_handle_auth_bad_method(struct ceph_connection *con,
6454 int used_proto, int result,
6455 const int *allowed_protos, int proto_cnt,
6456 const int *allowed_modes, int mode_cnt)
6457{
6458 struct ceph_mds_session *s = con->private;
6459 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
6460 int ret;
6461
6462 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
6463 used_proto, result,
6464 allowed_protos, proto_cnt,
6465 allowed_modes, mode_cnt)) {
6466 ret = ceph_monc_validate_auth(monc);
6467 if (ret)
6468 return ret;
6469 }
6470
6471 return -EACCES;
6472}
6473
6474static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
6475 struct ceph_msg_header *hdr, int *skip)
6476{
6477 struct ceph_msg *msg;
6478 int type = (int) le16_to_cpu(hdr->type);
6479 int front_len = (int) le32_to_cpu(hdr->front_len);
6480
6481 if (con->in_msg)
6482 return con->in_msg;
6483
6484 *skip = 0;
6485 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
6486 if (!msg) {
6487 pr_err("unable to allocate msg type %d len %d\n",
6488 type, front_len);
6489 return NULL;
6490 }
6491
6492 return msg;
6493}
6494
6495static int mds_sign_message(struct ceph_msg *msg)
6496{
6497 struct ceph_mds_session *s = msg->con->private;
6498 struct ceph_auth_handshake *auth = &s->s_auth;
6499
6500 return ceph_auth_sign_message(auth, msg);
6501}
6502
6503static int mds_check_message_signature(struct ceph_msg *msg)
6504{
6505 struct ceph_mds_session *s = msg->con->private;
6506 struct ceph_auth_handshake *auth = &s->s_auth;
6507
6508 return ceph_auth_check_message_signature(auth, msg);
6509}
6510
6511static const struct ceph_connection_operations mds_con_ops = {
6512 .get = mds_get_con,
6513 .put = mds_put_con,
6514 .alloc_msg = mds_alloc_msg,
6515 .dispatch = mds_dispatch,
6516 .peer_reset = mds_peer_reset,
6517 .get_authorizer = mds_get_authorizer,
6518 .add_authorizer_challenge = mds_add_authorizer_challenge,
6519 .verify_authorizer_reply = mds_verify_authorizer_reply,
6520 .invalidate_authorizer = mds_invalidate_authorizer,
6521 .sign_message = mds_sign_message,
6522 .check_message_signature = mds_check_message_signature,
6523 .get_auth_request = mds_get_auth_request,
6524 .handle_auth_reply_more = mds_handle_auth_reply_more,
6525 .handle_auth_done = mds_handle_auth_done,
6526 .handle_auth_bad_method = mds_handle_auth_bad_method,
6527};
6528
6529/* eof */