Merge tag 'ceph-for-4.7-rc8' of git://github.com/ceph/ceph-client

Pull ceph fix from Ilya Dryomov:
"A fix for a long-standing bug in the incremental osdmap handling code
that caused misdirected requests, tagged for stable"

The tag is signed with a brand new key - Sage is on vacation and I
didn't anticipate this"

* tag 'ceph-for-4.7-rc8' of git://github.com/ceph/ceph-client:
libceph: apply new_state before new_up_client on incrementals

+113 -43
+113 -43
net/ceph/osdmap.c
··· 1261 1261 } 1262 1262 1263 1263 /* 1264 + * Encoding order is (new_up_client, new_state, new_weight). Need to 1265 + * apply in the (new_weight, new_state, new_up_client) order, because 1266 + * an incremental map may look like e.g. 1267 + * 1268 + * new_up_client: { osd=6, addr=... } # set osd_state and addr 1269 + * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1270 + */ 1271 + static int decode_new_up_state_weight(void **p, void *end, 1272 + struct ceph_osdmap *map) 1273 + { 1274 + void *new_up_client; 1275 + void *new_state; 1276 + void *new_weight_end; 1277 + u32 len; 1278 + 1279 + new_up_client = *p; 1280 + ceph_decode_32_safe(p, end, len, e_inval); 1281 + len *= sizeof(u32) + sizeof(struct ceph_entity_addr); 1282 + ceph_decode_need(p, end, len, e_inval); 1283 + *p += len; 1284 + 1285 + new_state = *p; 1286 + ceph_decode_32_safe(p, end, len, e_inval); 1287 + len *= sizeof(u32) + sizeof(u8); 1288 + ceph_decode_need(p, end, len, e_inval); 1289 + *p += len; 1290 + 1291 + /* new_weight */ 1292 + ceph_decode_32_safe(p, end, len, e_inval); 1293 + while (len--) { 1294 + s32 osd; 1295 + u32 w; 1296 + 1297 + ceph_decode_need(p, end, 2*sizeof(u32), e_inval); 1298 + osd = ceph_decode_32(p); 1299 + w = ceph_decode_32(p); 1300 + BUG_ON(osd >= map->max_osd); 1301 + pr_info("osd%d weight 0x%x %s\n", osd, w, 1302 + w == CEPH_OSD_IN ? "(in)" : 1303 + (w == CEPH_OSD_OUT ? "(out)" : "")); 1304 + map->osd_weight[osd] = w; 1305 + 1306 + /* 1307 + * If we are marking in, set the EXISTS, and clear the 1308 + * AUTOOUT and NEW bits. 1309 + */ 1310 + if (w) { 1311 + map->osd_state[osd] |= CEPH_OSD_EXISTS; 1312 + map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT | 1313 + CEPH_OSD_NEW); 1314 + } 1315 + } 1316 + new_weight_end = *p; 1317 + 1318 + /* new_state (up/down) */ 1319 + *p = new_state; 1320 + len = ceph_decode_32(p); 1321 + while (len--) { 1322 + s32 osd; 1323 + u8 xorstate; 1324 + int ret; 1325 + 1326 + osd = ceph_decode_32(p); 1327 + xorstate = ceph_decode_8(p); 1328 + if (xorstate == 0) 1329 + xorstate = CEPH_OSD_UP; 1330 + BUG_ON(osd >= map->max_osd); 1331 + if ((map->osd_state[osd] & CEPH_OSD_UP) && 1332 + (xorstate & CEPH_OSD_UP)) 1333 + pr_info("osd%d down\n", osd); 1334 + if ((map->osd_state[osd] & CEPH_OSD_EXISTS) && 1335 + (xorstate & CEPH_OSD_EXISTS)) { 1336 + pr_info("osd%d does not exist\n", osd); 1337 + map->osd_weight[osd] = CEPH_OSD_IN; 1338 + ret = set_primary_affinity(map, osd, 1339 + CEPH_OSD_DEFAULT_PRIMARY_AFFINITY); 1340 + if (ret) 1341 + return ret; 1342 + memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr)); 1343 + map->osd_state[osd] = 0; 1344 + } else { 1345 + map->osd_state[osd] ^= xorstate; 1346 + } 1347 + } 1348 + 1349 + /* new_up_client */ 1350 + *p = new_up_client; 1351 + len = ceph_decode_32(p); 1352 + while (len--) { 1353 + s32 osd; 1354 + struct ceph_entity_addr addr; 1355 + 1356 + osd = ceph_decode_32(p); 1357 + ceph_decode_copy(p, &addr, sizeof(addr)); 1358 + ceph_decode_addr(&addr); 1359 + BUG_ON(osd >= map->max_osd); 1360 + pr_info("osd%d up\n", osd); 1361 + map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; 1362 + map->osd_addr[osd] = addr; 1363 + } 1364 + 1365 + *p = new_weight_end; 1366 + return 0; 1367 + 1368 + e_inval: 1369 + return -EINVAL; 1370 + } 1371 + 1372 + /* 1264 1373 * decode and apply an incremental map update. 1265 1374 */ 1266 1375 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ··· 1467 1358 __remove_pg_pool(&map->pg_pools, pi); 1468 1359 } 1469 1360 1470 - /* new_up */ 1471 - ceph_decode_32_safe(p, end, len, e_inval); 1472 - while (len--) { 1473 - u32 osd; 1474 - struct ceph_entity_addr addr; 1475 - ceph_decode_32_safe(p, end, osd, e_inval); 1476 - ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval); 1477 - ceph_decode_addr(&addr); 1478 - pr_info("osd%d up\n", osd); 1479 - BUG_ON(osd >= map->max_osd); 1480 - map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS; 1481 - map->osd_addr[osd] = addr; 1482 - } 1483 - 1484 - /* new_state */ 1485 - ceph_decode_32_safe(p, end, len, e_inval); 1486 - while (len--) { 1487 - u32 osd; 1488 - u8 xorstate; 1489 - ceph_decode_32_safe(p, end, osd, e_inval); 1490 - xorstate = **(u8 **)p; 1491 - (*p)++; /* clean flag */ 1492 - if (xorstate == 0) 1493 - xorstate = CEPH_OSD_UP; 1494 - if (xorstate & CEPH_OSD_UP) 1495 - pr_info("osd%d down\n", osd); 1496 - if (osd < map->max_osd) 1497 - map->osd_state[osd] ^= xorstate; 1498 - } 1499 - 1500 - /* new_weight */ 1501 - ceph_decode_32_safe(p, end, len, e_inval); 1502 - while (len--) { 1503 - u32 osd, off; 1504 - ceph_decode_need(p, end, sizeof(u32)*2, e_inval); 1505 - osd = ceph_decode_32(p); 1506 - off = ceph_decode_32(p); 1507 - pr_info("osd%d weight 0x%x %s\n", osd, off, 1508 - off == CEPH_OSD_IN ? "(in)" : 1509 - (off == CEPH_OSD_OUT ? "(out)" : "")); 1510 - if (osd < map->max_osd) 1511 - map->osd_weight[osd] = off; 1512 - } 1361 + /* new_up_client, new_state, new_weight */ 1362 + err = decode_new_up_state_weight(p, end, map); 1363 + if (err) 1364 + goto bad; 1513 1365 1514 1366 /* new_pg_temp */ 1515 1367 err = decode_new_pg_temp(p, end, map);