[SCSI] libosd: Support for scatter gather write/read commands

This patch adds the Scatter-Gather (sg) API to libosd.
Scatter-gather enables a write/read of multiple none-contiguous
areas of an object, in a single call. The extents may overlap
and/or be in any order.

The Scatter-Gather list is sent to the target in what is called
a "cdb continuation segment". This is yet another possible segment
in the osd-out-buffer. It is unlike all other segments in that it
sits before the actual "data" segment (which until now was always
first), and that it is signed by itself and not part of the data
buffer. This is because the cdb-continuation-segment is considered
a spill-over of the CDB data, and is therefor signed under
OSD_SEC_CAPKEY and higher.

TODO: A new osd_finalize_request_ex version should be supplied so
the @caps received on the network also contains a size parameter
and can be spilled over into the "cdb continuation segment".

Thanks to John Chandy <john.chandy@uconn.edu> for the original
code, and investigations. And the implementation of SG support
in the osd-target.

Original-coded-by: John Chandy <john.chandy@uconn.edu>
Signed-off-by: Boaz Harrosh <bharrosh@panasas.com>
Signed-off-by: James Bottomley <James.Bottomley@suse.de>

authored by Boaz Harrosh and committed by James Bottomley e96e72c4 c4df46c4

+198 -6
+143 -5
drivers/scsi/osd/osd_initiator.c
··· 464 _osd_free_seg(or, &or->get_attr); 465 _osd_free_seg(or, &or->enc_get_attr); 466 _osd_free_seg(or, &or->set_attr); 467 468 _osd_request_free(or); 469 } ··· 547 seg->buff = buff; 548 seg->alloc_size = max_bytes; 549 return 0; 550 } 551 552 static int _alloc_set_attr_list(struct osd_request *or, ··· 892 return 0; 893 } 894 EXPORT_SYMBOL(osd_req_read_kern); 895 896 void osd_req_get_attributes(struct osd_request *or, 897 const struct osd_obj_id *obj) ··· 1410 } 1411 1412 static int _osd_req_finalize_data_integrity(struct osd_request *or, 1413 - bool has_in, bool has_out, u64 out_data_bytes, const u8 *cap_key) 1414 { 1415 struct osd_security_parameters *sec_parms = _osd_req_sec_params(or); 1416 int ret; ··· 1442 or->out.last_seg = NULL; 1443 1444 /* they are now all chained to request sign them all together */ 1445 - osd_sec_sign_data(&or->out_data_integ, or->out.req->bio, 1446 cap_key); 1447 } 1448 ··· 1538 { 1539 struct osd_cdb_head *cdbh = osd_cdb_head(&or->cdb); 1540 bool has_in, has_out; 1541 u64 out_data_bytes = or->out.total_bytes; 1542 int ret; 1543 ··· 1555 osd_set_caps(&or->cdb, cap); 1556 1557 has_in = or->in.bio || or->get_attr.total_bytes; 1558 - has_out = or->out.bio || or->set_attr.total_bytes || 1559 - or->enc_get_attr.total_bytes; 1560 1561 ret = _init_blk_request(or, has_in, has_out); 1562 if (ret) { 1563 OSD_DEBUG("_init_blk_request failed\n"); ··· 1600 } 1601 1602 ret = _osd_req_finalize_data_integrity(or, has_in, has_out, 1603 - out_data_bytes, cap_key); 1604 if (ret) 1605 return ret; 1606
··· 464 _osd_free_seg(or, &or->get_attr); 465 _osd_free_seg(or, &or->enc_get_attr); 466 _osd_free_seg(or, &or->set_attr); 467 + _osd_free_seg(or, &or->cdb_cont); 468 469 _osd_request_free(or); 470 } ··· 546 seg->buff = buff; 547 seg->alloc_size = max_bytes; 548 return 0; 549 + } 550 + 551 + static int _alloc_cdb_cont(struct osd_request *or, unsigned total_bytes) 552 + { 553 + OSD_DEBUG("total_bytes=%d\n", total_bytes); 554 + return _osd_realloc_seg(or, &or->cdb_cont, total_bytes); 555 } 556 557 static int _alloc_set_attr_list(struct osd_request *or, ··· 885 return 0; 886 } 887 EXPORT_SYMBOL(osd_req_read_kern); 888 + 889 + static int _add_sg_continuation_descriptor(struct osd_request *or, 890 + const struct osd_sg_entry *sglist, unsigned numentries, u64 *len) 891 + { 892 + struct osd_sg_continuation_descriptor *oscd; 893 + u32 oscd_size; 894 + unsigned i; 895 + int ret; 896 + 897 + oscd_size = sizeof(*oscd) + numentries * sizeof(oscd->entries[0]); 898 + 899 + if (!or->cdb_cont.total_bytes) { 900 + /* First time, jump over the header, we will write to: 901 + * cdb_cont.buff + cdb_cont.total_bytes 902 + */ 903 + or->cdb_cont.total_bytes = 904 + sizeof(struct osd_continuation_segment_header); 905 + } 906 + 907 + ret = _alloc_cdb_cont(or, or->cdb_cont.total_bytes + oscd_size); 908 + if (unlikely(ret)) 909 + return ret; 910 + 911 + oscd = or->cdb_cont.buff + or->cdb_cont.total_bytes; 912 + oscd->hdr.type = cpu_to_be16(SCATTER_GATHER_LIST); 913 + oscd->hdr.pad_length = 0; 914 + oscd->hdr.length = cpu_to_be32(oscd_size - sizeof(*oscd)); 915 + 916 + *len = 0; 917 + /* copy the sg entries and convert to network byte order */ 918 + for (i = 0; i < numentries; i++) { 919 + oscd->entries[i].offset = cpu_to_be64(sglist[i].offset); 920 + oscd->entries[i].len = cpu_to_be64(sglist[i].len); 921 + *len += sglist[i].len; 922 + } 923 + 924 + or->cdb_cont.total_bytes += oscd_size; 925 + OSD_DEBUG("total_bytes=%d oscd_size=%d numentries=%d\n", 926 + or->cdb_cont.total_bytes, oscd_size, numentries); 927 + return 0; 928 + } 929 + 930 + static int _osd_req_finalize_cdb_cont(struct osd_request *or, const u8 *cap_key) 931 + { 932 + struct request_queue *req_q = osd_request_queue(or->osd_dev); 933 + struct bio *bio; 934 + struct osd_cdb_head *cdbh = osd_cdb_head(&or->cdb); 935 + struct osd_continuation_segment_header *cont_seg_hdr; 936 + 937 + if (!or->cdb_cont.total_bytes) 938 + return 0; 939 + 940 + cont_seg_hdr = or->cdb_cont.buff; 941 + cont_seg_hdr->format = CDB_CONTINUATION_FORMAT_V2; 942 + cont_seg_hdr->service_action = cdbh->varlen_cdb.service_action; 943 + 944 + /* create a bio for continuation segment */ 945 + bio = bio_map_kern(req_q, or->cdb_cont.buff, or->cdb_cont.total_bytes, 946 + GFP_KERNEL); 947 + if (unlikely(!bio)) 948 + return -ENOMEM; 949 + 950 + bio->bi_rw |= REQ_WRITE; 951 + 952 + /* integrity check the continuation before the bio is linked 953 + * with the other data segments since the continuation 954 + * integrity is separate from the other data segments. 955 + */ 956 + osd_sec_sign_data(cont_seg_hdr->integrity_check, bio, cap_key); 957 + 958 + cdbh->v2.cdb_continuation_length = cpu_to_be32(or->cdb_cont.total_bytes); 959 + 960 + /* we can't use _req_append_segment, because we need to link in the 961 + * continuation bio to the head of the bio list - the 962 + * continuation segment (if it exists) is always the first segment in 963 + * the out data buffer. 964 + */ 965 + bio->bi_next = or->out.bio; 966 + or->out.bio = bio; 967 + or->out.total_bytes += or->cdb_cont.total_bytes; 968 + 969 + return 0; 970 + } 971 + 972 + /* osd_req_write_sg: Takes a @bio that points to the data out buffer and an 973 + * @sglist that has the scatter gather entries. Scatter-gather enables a write 974 + * of multiple none-contiguous areas of an object, in a single call. The extents 975 + * may overlap and/or be in any order. The only constrain is that: 976 + * total_bytes(sglist) >= total_bytes(bio) 977 + */ 978 + int osd_req_write_sg(struct osd_request *or, 979 + const struct osd_obj_id *obj, struct bio *bio, 980 + const struct osd_sg_entry *sglist, unsigned numentries) 981 + { 982 + u64 len; 983 + int ret = _add_sg_continuation_descriptor(or, sglist, numentries, &len); 984 + 985 + if (ret) 986 + return ret; 987 + osd_req_write(or, obj, 0, bio, len); 988 + 989 + return 0; 990 + } 991 + EXPORT_SYMBOL(osd_req_write_sg); 992 + 993 + /* osd_req_read_sg: Read multiple extents of an object into @bio 994 + * See osd_req_write_sg 995 + */ 996 + int osd_req_read_sg(struct osd_request *or, 997 + const struct osd_obj_id *obj, struct bio *bio, 998 + const struct osd_sg_entry *sglist, unsigned numentries) 999 + { 1000 + u64 len; 1001 + int ret = _add_sg_continuation_descriptor(or, sglist, numentries, &len); 1002 + 1003 + if (ret) 1004 + return ret; 1005 + osd_req_read(or, obj, 0, bio, len); 1006 + 1007 + return 0; 1008 + } 1009 + EXPORT_SYMBOL(osd_req_read_sg); 1010 1011 void osd_req_get_attributes(struct osd_request *or, 1012 const struct osd_obj_id *obj) ··· 1281 } 1282 1283 static int _osd_req_finalize_data_integrity(struct osd_request *or, 1284 + bool has_in, bool has_out, struct bio *out_data_bio, u64 out_data_bytes, 1285 + const u8 *cap_key) 1286 { 1287 struct osd_security_parameters *sec_parms = _osd_req_sec_params(or); 1288 int ret; ··· 1312 or->out.last_seg = NULL; 1313 1314 /* they are now all chained to request sign them all together */ 1315 + osd_sec_sign_data(&or->out_data_integ, out_data_bio, 1316 cap_key); 1317 } 1318 ··· 1408 { 1409 struct osd_cdb_head *cdbh = osd_cdb_head(&or->cdb); 1410 bool has_in, has_out; 1411 + /* Save for data_integrity without the cdb_continuation */ 1412 + struct bio *out_data_bio = or->out.bio; 1413 u64 out_data_bytes = or->out.total_bytes; 1414 int ret; 1415 ··· 1423 osd_set_caps(&or->cdb, cap); 1424 1425 has_in = or->in.bio || or->get_attr.total_bytes; 1426 + has_out = or->out.bio || or->cdb_cont.total_bytes || 1427 + or->set_attr.total_bytes || or->enc_get_attr.total_bytes; 1428 1429 + ret = _osd_req_finalize_cdb_cont(or, cap_key); 1430 + if (ret) { 1431 + OSD_DEBUG("_osd_req_finalize_cdb_cont failed\n"); 1432 + return ret; 1433 + } 1434 ret = _init_blk_request(or, has_in, has_out); 1435 if (ret) { 1436 OSD_DEBUG("_init_blk_request failed\n"); ··· 1463 } 1464 1465 ret = _osd_req_finalize_data_integrity(or, has_in, has_out, 1466 + out_data_bio, out_data_bytes, 1467 + cap_key); 1468 if (ret) 1469 return ret; 1470
+8 -1
include/scsi/osd_initiator.h
··· 137 void *buff; 138 unsigned alloc_size; /* 0 here means: don't call kfree */ 139 unsigned total_bytes; 140 - } set_attr, enc_get_attr, get_attr; 141 142 struct _osd_io_info { 143 struct bio *bio; ··· 448 int osd_req_read_kern(struct osd_request *or, 449 const struct osd_obj_id *obj, u64 offset, void *buff, u64 len); 450 451 /* 452 * Root/Partition/Collection/Object Attributes commands 453 */
··· 137 void *buff; 138 unsigned alloc_size; /* 0 here means: don't call kfree */ 139 unsigned total_bytes; 140 + } cdb_cont, set_attr, enc_get_attr, get_attr; 141 142 struct _osd_io_info { 143 struct bio *bio; ··· 448 int osd_req_read_kern(struct osd_request *or, 449 const struct osd_obj_id *obj, u64 offset, void *buff, u64 len); 450 451 + /* Scatter/Gather write/read commands */ 452 + int osd_req_write_sg(struct osd_request *or, 453 + const struct osd_obj_id *obj, struct bio *bio, 454 + const struct osd_sg_entry *sglist, unsigned numentries); 455 + int osd_req_read_sg(struct osd_request *or, 456 + const struct osd_obj_id *obj, struct bio *bio, 457 + const struct osd_sg_entry *sglist, unsigned numentries); 458 /* 459 * Root/Partition/Collection/Object Attributes commands 460 */
+42
include/scsi/osd_protocol.h
··· 631 put_unaligned_le16(bit_mask, &cap->permissions_bit_mask); 632 } 633 634 #endif /* ndef __OSD_PROTOCOL_H__ */
··· 631 put_unaligned_le16(bit_mask, &cap->permissions_bit_mask); 632 } 633 634 + /* osd2r05a sec 5.3: CDB continuation segment formats */ 635 + enum osd_continuation_segment_format { 636 + CDB_CONTINUATION_FORMAT_V2 = 0x01, 637 + }; 638 + 639 + struct osd_continuation_segment_header { 640 + u8 format; 641 + u8 reserved1; 642 + __be16 service_action; 643 + __be32 reserved2; 644 + u8 integrity_check[OSDv2_CRYPTO_KEYID_SIZE]; 645 + } __packed; 646 + 647 + /* osd2r05a sec 5.4.1: CDB continuation descriptors */ 648 + enum osd_continuation_descriptor_type { 649 + NO_MORE_DESCRIPTORS = 0x0000, 650 + SCATTER_GATHER_LIST = 0x0001, 651 + QUERY_LIST = 0x0002, 652 + USER_OBJECT = 0x0003, 653 + COPY_USER_OBJECT_SOURCE = 0x0101, 654 + EXTENSION_CAPABILITIES = 0xFFEE 655 + }; 656 + 657 + struct osd_continuation_descriptor_header { 658 + __be16 type; 659 + u8 reserved; 660 + u8 pad_length; 661 + __be32 length; 662 + } __packed; 663 + 664 + 665 + /* osd2r05a sec 5.4.2: Scatter/gather list */ 666 + struct osd_sg_list_entry { 667 + __be64 offset; 668 + __be64 len; 669 + }; 670 + 671 + struct osd_sg_continuation_descriptor { 672 + struct osd_continuation_descriptor_header hdr; 673 + struct osd_sg_list_entry entries[]; 674 + }; 675 + 676 #endif /* ndef __OSD_PROTOCOL_H__ */
+5
include/scsi/osd_types.h
··· 37 void *val_ptr; /* in network order */ 38 }; 39 40 #endif /* ndef __OSD_TYPES_H__ */
··· 37 void *val_ptr; /* in network order */ 38 }; 39 40 + struct osd_sg_entry { 41 + u64 offset; 42 + u64 len; 43 + }; 44 + 45 #endif /* ndef __OSD_TYPES_H__ */