at v3.3 653 lines 17 kB view raw
1/* 2 * pNFS Objects layout driver high level definitions 3 * 4 * Copyright (C) 2007 Panasas Inc. [year of first publication] 5 * All rights reserved. 6 * 7 * Benny Halevy <bhalevy@panasas.com> 8 * Boaz Harrosh <bharrosh@panasas.com> 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 12 * See the file COPYING included with this distribution for more details. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions 16 * are met: 17 * 18 * 1. Redistributions of source code must retain the above copyright 19 * notice, this list of conditions and the following disclaimer. 20 * 2. Redistributions in binary form must reproduce the above copyright 21 * notice, this list of conditions and the following disclaimer in the 22 * documentation and/or other materials provided with the distribution. 23 * 3. Neither the name of the Panasas company nor the names of its 24 * contributors may be used to endorse or promote products derived 25 * from this software without specific prior written permission. 26 * 27 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED 28 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 29 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 30 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 31 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 32 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 33 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR 34 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 35 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 36 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 37 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40#include <scsi/osd_initiator.h> 41#include "objlayout.h" 42 43#define NFSDBG_FACILITY NFSDBG_PNFS_LD 44/* 45 * Create a objlayout layout structure for the given inode and return it. 46 */ 47struct pnfs_layout_hdr * 48objlayout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags) 49{ 50 struct objlayout *objlay; 51 52 objlay = kzalloc(sizeof(struct objlayout), gfp_flags); 53 if (objlay) { 54 spin_lock_init(&objlay->lock); 55 INIT_LIST_HEAD(&objlay->err_list); 56 } 57 dprintk("%s: Return %p\n", __func__, objlay); 58 return &objlay->pnfs_layout; 59} 60 61/* 62 * Free an objlayout layout structure 63 */ 64void 65objlayout_free_layout_hdr(struct pnfs_layout_hdr *lo) 66{ 67 struct objlayout *objlay = OBJLAYOUT(lo); 68 69 dprintk("%s: objlay %p\n", __func__, objlay); 70 71 WARN_ON(!list_empty(&objlay->err_list)); 72 kfree(objlay); 73} 74 75/* 76 * Unmarshall layout and store it in pnfslay. 77 */ 78struct pnfs_layout_segment * 79objlayout_alloc_lseg(struct pnfs_layout_hdr *pnfslay, 80 struct nfs4_layoutget_res *lgr, 81 gfp_t gfp_flags) 82{ 83 int status = -ENOMEM; 84 struct xdr_stream stream; 85 struct xdr_buf buf = { 86 .pages = lgr->layoutp->pages, 87 .page_len = lgr->layoutp->len, 88 .buflen = lgr->layoutp->len, 89 .len = lgr->layoutp->len, 90 }; 91 struct page *scratch; 92 struct pnfs_layout_segment *lseg; 93 94 dprintk("%s: Begin pnfslay %p\n", __func__, pnfslay); 95 96 scratch = alloc_page(gfp_flags); 97 if (!scratch) 98 goto err_nofree; 99 100 xdr_init_decode(&stream, &buf, NULL); 101 xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE); 102 103 status = objio_alloc_lseg(&lseg, pnfslay, &lgr->range, &stream, gfp_flags); 104 if (unlikely(status)) { 105 dprintk("%s: objio_alloc_lseg Return err %d\n", __func__, 106 status); 107 goto err; 108 } 109 110 __free_page(scratch); 111 112 dprintk("%s: Return %p\n", __func__, lseg); 113 return lseg; 114 115err: 116 __free_page(scratch); 117err_nofree: 118 dprintk("%s: Err Return=>%d\n", __func__, status); 119 return ERR_PTR(status); 120} 121 122/* 123 * Free a layout segement 124 */ 125void 126objlayout_free_lseg(struct pnfs_layout_segment *lseg) 127{ 128 dprintk("%s: freeing layout segment %p\n", __func__, lseg); 129 130 if (unlikely(!lseg)) 131 return; 132 133 objio_free_lseg(lseg); 134} 135 136/* 137 * I/O Operations 138 */ 139static inline u64 140end_offset(u64 start, u64 len) 141{ 142 u64 end; 143 144 end = start + len; 145 return end >= start ? end : NFS4_MAX_UINT64; 146} 147 148/* last octet in a range */ 149static inline u64 150last_byte_offset(u64 start, u64 len) 151{ 152 u64 end; 153 154 BUG_ON(!len); 155 end = start + len; 156 return end > start ? end - 1 : NFS4_MAX_UINT64; 157} 158 159void _fix_verify_io_params(struct pnfs_layout_segment *lseg, 160 struct page ***p_pages, unsigned *p_pgbase, 161 u64 offset, unsigned long count) 162{ 163 u64 lseg_end_offset; 164 165 BUG_ON(offset < lseg->pls_range.offset); 166 lseg_end_offset = end_offset(lseg->pls_range.offset, 167 lseg->pls_range.length); 168 BUG_ON(offset >= lseg_end_offset); 169 WARN_ON(offset + count > lseg_end_offset); 170 171 if (*p_pgbase > PAGE_SIZE) { 172 dprintk("%s: pgbase(0x%x) > PAGE_SIZE\n", __func__, *p_pgbase); 173 *p_pages += *p_pgbase >> PAGE_SHIFT; 174 *p_pgbase &= ~PAGE_MASK; 175 } 176} 177 178/* 179 * I/O done common code 180 */ 181static void 182objlayout_iodone(struct objlayout_io_res *oir) 183{ 184 if (likely(oir->status >= 0)) { 185 objio_free_result(oir); 186 } else { 187 struct objlayout *objlay = oir->objlay; 188 189 spin_lock(&objlay->lock); 190 objlay->delta_space_valid = OBJ_DSU_INVALID; 191 list_add(&objlay->err_list, &oir->err_list); 192 spin_unlock(&objlay->lock); 193 } 194} 195 196/* 197 * objlayout_io_set_result - Set an osd_error code on a specific osd comp. 198 * 199 * The @index component IO failed (error returned from target). Register 200 * the error for later reporting at layout-return. 201 */ 202void 203objlayout_io_set_result(struct objlayout_io_res *oir, unsigned index, 204 struct pnfs_osd_objid *pooid, int osd_error, 205 u64 offset, u64 length, bool is_write) 206{ 207 struct pnfs_osd_ioerr *ioerr = &oir->ioerrs[index]; 208 209 BUG_ON(index >= oir->num_comps); 210 if (osd_error) { 211 ioerr->oer_component = *pooid; 212 ioerr->oer_comp_offset = offset; 213 ioerr->oer_comp_length = length; 214 ioerr->oer_iswrite = is_write; 215 ioerr->oer_errno = osd_error; 216 217 dprintk("%s: err[%d]: errno=%d is_write=%d dev(%llx:%llx) " 218 "par=0x%llx obj=0x%llx offset=0x%llx length=0x%llx\n", 219 __func__, index, ioerr->oer_errno, 220 ioerr->oer_iswrite, 221 _DEVID_LO(&ioerr->oer_component.oid_device_id), 222 _DEVID_HI(&ioerr->oer_component.oid_device_id), 223 ioerr->oer_component.oid_partition_id, 224 ioerr->oer_component.oid_object_id, 225 ioerr->oer_comp_offset, 226 ioerr->oer_comp_length); 227 } else { 228 /* User need not call if no error is reported */ 229 ioerr->oer_errno = 0; 230 } 231} 232 233/* Function scheduled on rpc workqueue to call ->nfs_readlist_complete(). 234 * This is because the osd completion is called with ints-off from 235 * the block layer 236 */ 237static void _rpc_read_complete(struct work_struct *work) 238{ 239 struct rpc_task *task; 240 struct nfs_read_data *rdata; 241 242 dprintk("%s enter\n", __func__); 243 task = container_of(work, struct rpc_task, u.tk_work); 244 rdata = container_of(task, struct nfs_read_data, task); 245 246 pnfs_ld_read_done(rdata); 247} 248 249void 250objlayout_read_done(struct objlayout_io_res *oir, ssize_t status, bool sync) 251{ 252 struct nfs_read_data *rdata = oir->rpcdata; 253 254 oir->status = rdata->task.tk_status = status; 255 if (status >= 0) 256 rdata->res.count = status; 257 else 258 rdata->pnfs_error = status; 259 objlayout_iodone(oir); 260 /* must not use oir after this point */ 261 262 dprintk("%s: Return status=%zd eof=%d sync=%d\n", __func__, 263 status, rdata->res.eof, sync); 264 265 if (sync) 266 pnfs_ld_read_done(rdata); 267 else { 268 INIT_WORK(&rdata->task.u.tk_work, _rpc_read_complete); 269 schedule_work(&rdata->task.u.tk_work); 270 } 271} 272 273/* 274 * Perform sync or async reads. 275 */ 276enum pnfs_try_status 277objlayout_read_pagelist(struct nfs_read_data *rdata) 278{ 279 loff_t offset = rdata->args.offset; 280 size_t count = rdata->args.count; 281 int err; 282 loff_t eof; 283 284 eof = i_size_read(rdata->inode); 285 if (unlikely(offset + count > eof)) { 286 if (offset >= eof) { 287 err = 0; 288 rdata->res.count = 0; 289 rdata->res.eof = 1; 290 /*FIXME: do we need to call pnfs_ld_read_done() */ 291 goto out; 292 } 293 count = eof - offset; 294 } 295 296 rdata->res.eof = (offset + count) >= eof; 297 _fix_verify_io_params(rdata->lseg, &rdata->args.pages, 298 &rdata->args.pgbase, 299 rdata->args.offset, rdata->args.count); 300 301 dprintk("%s: inode(%lx) offset 0x%llx count 0x%Zx eof=%d\n", 302 __func__, rdata->inode->i_ino, offset, count, rdata->res.eof); 303 304 err = objio_read_pagelist(rdata); 305 out: 306 if (unlikely(err)) { 307 rdata->pnfs_error = err; 308 dprintk("%s: Returned Error %d\n", __func__, err); 309 return PNFS_NOT_ATTEMPTED; 310 } 311 return PNFS_ATTEMPTED; 312} 313 314/* Function scheduled on rpc workqueue to call ->nfs_writelist_complete(). 315 * This is because the osd completion is called with ints-off from 316 * the block layer 317 */ 318static void _rpc_write_complete(struct work_struct *work) 319{ 320 struct rpc_task *task; 321 struct nfs_write_data *wdata; 322 323 dprintk("%s enter\n", __func__); 324 task = container_of(work, struct rpc_task, u.tk_work); 325 wdata = container_of(task, struct nfs_write_data, task); 326 327 pnfs_ld_write_done(wdata); 328} 329 330void 331objlayout_write_done(struct objlayout_io_res *oir, ssize_t status, bool sync) 332{ 333 struct nfs_write_data *wdata = oir->rpcdata; 334 335 oir->status = wdata->task.tk_status = status; 336 if (status >= 0) { 337 wdata->res.count = status; 338 wdata->verf.committed = oir->committed; 339 } else { 340 wdata->pnfs_error = status; 341 } 342 objlayout_iodone(oir); 343 /* must not use oir after this point */ 344 345 dprintk("%s: Return status %zd committed %d sync=%d\n", __func__, 346 status, wdata->verf.committed, sync); 347 348 if (sync) 349 pnfs_ld_write_done(wdata); 350 else { 351 INIT_WORK(&wdata->task.u.tk_work, _rpc_write_complete); 352 schedule_work(&wdata->task.u.tk_work); 353 } 354} 355 356/* 357 * Perform sync or async writes. 358 */ 359enum pnfs_try_status 360objlayout_write_pagelist(struct nfs_write_data *wdata, 361 int how) 362{ 363 int err; 364 365 _fix_verify_io_params(wdata->lseg, &wdata->args.pages, 366 &wdata->args.pgbase, 367 wdata->args.offset, wdata->args.count); 368 369 err = objio_write_pagelist(wdata, how); 370 if (unlikely(err)) { 371 wdata->pnfs_error = err; 372 dprintk("%s: Returned Error %d\n", __func__, err); 373 return PNFS_NOT_ATTEMPTED; 374 } 375 return PNFS_ATTEMPTED; 376} 377 378void 379objlayout_encode_layoutcommit(struct pnfs_layout_hdr *pnfslay, 380 struct xdr_stream *xdr, 381 const struct nfs4_layoutcommit_args *args) 382{ 383 struct objlayout *objlay = OBJLAYOUT(pnfslay); 384 struct pnfs_osd_layoutupdate lou; 385 __be32 *start; 386 387 dprintk("%s: Begin\n", __func__); 388 389 spin_lock(&objlay->lock); 390 lou.dsu_valid = (objlay->delta_space_valid == OBJ_DSU_VALID); 391 lou.dsu_delta = objlay->delta_space_used; 392 objlay->delta_space_used = 0; 393 objlay->delta_space_valid = OBJ_DSU_INIT; 394 lou.olu_ioerr_flag = !list_empty(&objlay->err_list); 395 spin_unlock(&objlay->lock); 396 397 start = xdr_reserve_space(xdr, 4); 398 399 BUG_ON(pnfs_osd_xdr_encode_layoutupdate(xdr, &lou)); 400 401 *start = cpu_to_be32((xdr->p - start - 1) * 4); 402 403 dprintk("%s: Return delta_space_used %lld err %d\n", __func__, 404 lou.dsu_delta, lou.olu_ioerr_flag); 405} 406 407static int 408err_prio(u32 oer_errno) 409{ 410 switch (oer_errno) { 411 case 0: 412 return 0; 413 414 case PNFS_OSD_ERR_RESOURCE: 415 return OSD_ERR_PRI_RESOURCE; 416 case PNFS_OSD_ERR_BAD_CRED: 417 return OSD_ERR_PRI_BAD_CRED; 418 case PNFS_OSD_ERR_NO_ACCESS: 419 return OSD_ERR_PRI_NO_ACCESS; 420 case PNFS_OSD_ERR_UNREACHABLE: 421 return OSD_ERR_PRI_UNREACHABLE; 422 case PNFS_OSD_ERR_NOT_FOUND: 423 return OSD_ERR_PRI_NOT_FOUND; 424 case PNFS_OSD_ERR_NO_SPACE: 425 return OSD_ERR_PRI_NO_SPACE; 426 default: 427 WARN_ON(1); 428 /* fallthrough */ 429 case PNFS_OSD_ERR_EIO: 430 return OSD_ERR_PRI_EIO; 431 } 432} 433 434static void 435merge_ioerr(struct pnfs_osd_ioerr *dest_err, 436 const struct pnfs_osd_ioerr *src_err) 437{ 438 u64 dest_end, src_end; 439 440 if (!dest_err->oer_errno) { 441 *dest_err = *src_err; 442 /* accumulated device must be blank */ 443 memset(&dest_err->oer_component.oid_device_id, 0, 444 sizeof(dest_err->oer_component.oid_device_id)); 445 446 return; 447 } 448 449 if (dest_err->oer_component.oid_partition_id != 450 src_err->oer_component.oid_partition_id) 451 dest_err->oer_component.oid_partition_id = 0; 452 453 if (dest_err->oer_component.oid_object_id != 454 src_err->oer_component.oid_object_id) 455 dest_err->oer_component.oid_object_id = 0; 456 457 if (dest_err->oer_comp_offset > src_err->oer_comp_offset) 458 dest_err->oer_comp_offset = src_err->oer_comp_offset; 459 460 dest_end = end_offset(dest_err->oer_comp_offset, 461 dest_err->oer_comp_length); 462 src_end = end_offset(src_err->oer_comp_offset, 463 src_err->oer_comp_length); 464 if (dest_end < src_end) 465 dest_end = src_end; 466 467 dest_err->oer_comp_length = dest_end - dest_err->oer_comp_offset; 468 469 if ((src_err->oer_iswrite == dest_err->oer_iswrite) && 470 (err_prio(src_err->oer_errno) > err_prio(dest_err->oer_errno))) { 471 dest_err->oer_errno = src_err->oer_errno; 472 } else if (src_err->oer_iswrite) { 473 dest_err->oer_iswrite = true; 474 dest_err->oer_errno = src_err->oer_errno; 475 } 476} 477 478static void 479encode_accumulated_error(struct objlayout *objlay, __be32 *p) 480{ 481 struct objlayout_io_res *oir, *tmp; 482 struct pnfs_osd_ioerr accumulated_err = {.oer_errno = 0}; 483 484 list_for_each_entry_safe(oir, tmp, &objlay->err_list, err_list) { 485 unsigned i; 486 487 for (i = 0; i < oir->num_comps; i++) { 488 struct pnfs_osd_ioerr *ioerr = &oir->ioerrs[i]; 489 490 if (!ioerr->oer_errno) 491 continue; 492 493 printk(KERN_ERR "%s: err[%d]: errno=%d is_write=%d " 494 "dev(%llx:%llx) par=0x%llx obj=0x%llx " 495 "offset=0x%llx length=0x%llx\n", 496 __func__, i, ioerr->oer_errno, 497 ioerr->oer_iswrite, 498 _DEVID_LO(&ioerr->oer_component.oid_device_id), 499 _DEVID_HI(&ioerr->oer_component.oid_device_id), 500 ioerr->oer_component.oid_partition_id, 501 ioerr->oer_component.oid_object_id, 502 ioerr->oer_comp_offset, 503 ioerr->oer_comp_length); 504 505 merge_ioerr(&accumulated_err, ioerr); 506 } 507 list_del(&oir->err_list); 508 objio_free_result(oir); 509 } 510 511 pnfs_osd_xdr_encode_ioerr(p, &accumulated_err); 512} 513 514void 515objlayout_encode_layoutreturn(struct pnfs_layout_hdr *pnfslay, 516 struct xdr_stream *xdr, 517 const struct nfs4_layoutreturn_args *args) 518{ 519 struct objlayout *objlay = OBJLAYOUT(pnfslay); 520 struct objlayout_io_res *oir, *tmp; 521 __be32 *start; 522 523 dprintk("%s: Begin\n", __func__); 524 start = xdr_reserve_space(xdr, 4); 525 BUG_ON(!start); 526 527 spin_lock(&objlay->lock); 528 529 list_for_each_entry_safe(oir, tmp, &objlay->err_list, err_list) { 530 __be32 *last_xdr = NULL, *p; 531 unsigned i; 532 int res = 0; 533 534 for (i = 0; i < oir->num_comps; i++) { 535 struct pnfs_osd_ioerr *ioerr = &oir->ioerrs[i]; 536 537 if (!ioerr->oer_errno) 538 continue; 539 540 dprintk("%s: err[%d]: errno=%d is_write=%d " 541 "dev(%llx:%llx) par=0x%llx obj=0x%llx " 542 "offset=0x%llx length=0x%llx\n", 543 __func__, i, ioerr->oer_errno, 544 ioerr->oer_iswrite, 545 _DEVID_LO(&ioerr->oer_component.oid_device_id), 546 _DEVID_HI(&ioerr->oer_component.oid_device_id), 547 ioerr->oer_component.oid_partition_id, 548 ioerr->oer_component.oid_object_id, 549 ioerr->oer_comp_offset, 550 ioerr->oer_comp_length); 551 552 p = pnfs_osd_xdr_ioerr_reserve_space(xdr); 553 if (unlikely(!p)) { 554 res = -E2BIG; 555 break; /* accumulated_error */ 556 } 557 558 last_xdr = p; 559 pnfs_osd_xdr_encode_ioerr(p, &oir->ioerrs[i]); 560 } 561 562 /* TODO: use xdr_write_pages */ 563 if (unlikely(res)) { 564 /* no space for even one error descriptor */ 565 BUG_ON(!last_xdr); 566 567 /* we've encountered a situation with lots and lots of 568 * errors and no space to encode them all. Use the last 569 * available slot to report the union of all the 570 * remaining errors. 571 */ 572 encode_accumulated_error(objlay, last_xdr); 573 goto loop_done; 574 } 575 list_del(&oir->err_list); 576 objio_free_result(oir); 577 } 578loop_done: 579 spin_unlock(&objlay->lock); 580 581 *start = cpu_to_be32((xdr->p - start - 1) * 4); 582 dprintk("%s: Return\n", __func__); 583} 584 585 586/* 587 * Get Device Info API for io engines 588 */ 589struct objlayout_deviceinfo { 590 struct page *page; 591 struct pnfs_osd_deviceaddr da; /* This must be last */ 592}; 593 594/* Initialize and call nfs_getdeviceinfo, then decode and return a 595 * "struct pnfs_osd_deviceaddr *" Eventually objlayout_put_deviceinfo() 596 * should be called. 597 */ 598int objlayout_get_deviceinfo(struct pnfs_layout_hdr *pnfslay, 599 struct nfs4_deviceid *d_id, struct pnfs_osd_deviceaddr **deviceaddr, 600 gfp_t gfp_flags) 601{ 602 struct objlayout_deviceinfo *odi; 603 struct pnfs_device pd; 604 struct super_block *sb; 605 struct page *page, **pages; 606 u32 *p; 607 int err; 608 609 page = alloc_page(gfp_flags); 610 if (!page) 611 return -ENOMEM; 612 613 pages = &page; 614 pd.pages = pages; 615 616 memcpy(&pd.dev_id, d_id, sizeof(*d_id)); 617 pd.layout_type = LAYOUT_OSD2_OBJECTS; 618 pd.pages = &page; 619 pd.pgbase = 0; 620 pd.pglen = PAGE_SIZE; 621 pd.mincount = 0; 622 623 sb = pnfslay->plh_inode->i_sb; 624 err = nfs4_proc_getdeviceinfo(NFS_SERVER(pnfslay->plh_inode), &pd); 625 dprintk("%s nfs_getdeviceinfo returned %d\n", __func__, err); 626 if (err) 627 goto err_out; 628 629 p = page_address(page); 630 odi = kzalloc(sizeof(*odi), gfp_flags); 631 if (!odi) { 632 err = -ENOMEM; 633 goto err_out; 634 } 635 pnfs_osd_xdr_decode_deviceaddr(&odi->da, p); 636 odi->page = page; 637 *deviceaddr = &odi->da; 638 return 0; 639 640err_out: 641 __free_page(page); 642 return err; 643} 644 645void objlayout_put_deviceinfo(struct pnfs_osd_deviceaddr *deviceaddr) 646{ 647 struct objlayout_deviceinfo *odi = container_of(deviceaddr, 648 struct objlayout_deviceinfo, 649 da); 650 651 __free_page(odi->page); 652 kfree(odi); 653}