#include #include #include #include "lib/lib.h" #include "lib/enchufe.h" #include "lib/uuid.h" #include "lib/log.h" #define BUF_LEN 0x1000 /// Handle when a client wants to read a block void lire(Enchufe cliente, UUID cid, SafeStr data_dir_path) { // Create path to black file in data directory Str data_fpath = calloc(data_dir_path.len + 1 + sizeof(UUID_Str) + 5, sizeof(char)); UUID_Str cid_str = utoa(cid); exists(data_fpath); memcpy((void*)data_fpath, data_dir_path.str, data_dir_path.len); memcpy((void*)(data_fpath + data_dir_path.len), "/", 1); memcpy((void*)(data_fpath + data_dir_path.len + 1), cid_str.uuid, sizeof(UUID_Str)); memcpy((void*)(data_fpath + data_dir_path.len + 1 + sizeof(UUID_Str) - 1), ".blk", 4); log(INFO, "Opening data file: %s\n", data_fpath); FILE* f = fopen(data_fpath, "r"); exists(f); // Calculate size of block file fseek(f, 0L, SEEK_END); size_t nbytes = (size_t)ftell(f); fseek(f, 0L, SEEK_SET); // Send nbytes :: size_t to client Buffer nbytes_buf = size_t_serialize(nbytes); zumba(cliente, nbytes_buf); free(nbytes_buf.buf); // Send all the bytes in the block file to the client size_t bytes_sent = 0; LazyBuffer b = open(data_fpath); log(INFO, "Sending %zu bytes\n", nbytes); while (true) { b = next_chunk(b); if (b.len == 0) break; // Send the bytes zumba(cliente, (Buffer){ .buf = b.buf, .len = b.len }); printf("\r"); log(INFO, "Sent %zu bytes out of %zu", bytes_sent, nbytes); fflush(stdout); bytes_sent += b.len; } printf("\n"); fclose(b.fptr); } /// Handle when a cllient wants to write a block void ecriver(Enchufe cliente, size_t nbytes, SafeStr data_dir_path) { // Send cid :: UUID to client UUID cid = new_uuid(); Buffer uuid_buf = UUID_serialize(cid); log(INFO, "Responding with cid: %s\n", utoa(cid).uuid); zumba(cliente, uuid_buf); free(uuid_buf.buf); // create path for block file Str data_fpath = calloc(data_dir_path.len + 1 + sizeof(UUID_Str) + 5, sizeof(char)); UUID_Str cid_str = utoa(cid); exists(data_fpath); memcpy((void*)data_fpath, data_dir_path.str, data_dir_path.len); memcpy((void*)(data_fpath + data_dir_path.len), "/", 1); memcpy((void*)(data_fpath + data_dir_path.len + 1), cid_str.uuid, sizeof(UUID_Str)); memcpy((void*)(data_fpath + data_dir_path.len + 1 + sizeof(UUID_Str) - 1), ".blk", 4); log(INFO, "Opening data file: %s\n", data_fpath); FILE* f = fopen(data_fpath, "w"); exists(f); // Receive all the bytes from the client log(INFO, "Receiving %zu bytes\n", nbytes); for (size_t bytes_received = 0; bytes_received < nbytes;) { // Receive buf :: Buffer from client Byte buf[CHUNK_SIZE] = {0}; Buffer outside_buf = (Buffer){ .buf = buf, .len = CHUNK_SIZE }; size_t len = recibe(cliente, outside_buf); outside_buf.len = len; bytes_received += len; printf("\r"); log(INFO, "Received %zu bytes out of %zu", bytes_received, nbytes); fflush(stdout); Buffer_write(outside_buf, f); } printf("\n"); fclose(f); } /// This is the data node implementation int main(int argc, Str* argv) { // Parse command line arguments if (argc < 4 || argc > 5) { fprintf(stderr, "USAGE: %s IPv4 PORT PATH [PORT]\n", argv[0]); exit(1); } IPv4 this_address = (IPv4){ .bytes = {127,0,0,1}}; Port this_port = (Port)atoi(argv[2]); Str data_dir_path = argv[3]; IPv4 metadata_address = parse_address(argv[1]); Port metadata_port = argc == 5 ? (Port)atoi(argv[4]) : 8000; log(INFO, "Creating data node directory inside: %s\n", data_dir_path); mkdir_p(atoss(data_dir_path), 0700); // Connect to the metadata server Enchufe metadataserver = enchufa(metadata_address, metadata_port); conecta(metadataserver); // Send REG(address :: IPv4, port :: Port) to metadata server Command reg = (Command){ .tag = REG, .as.reg = { .address = this_address, .port = this_port, }, }; Buffer reg_buf = Command_serialize(reg); zumba(metadataserver, reg_buf); free(reg_buf.buf); desenchufa(metadataserver); // Open a socket to listen for other connections. Enchufe enchufe = enchufa(this_address, this_port); amarra(enchufe); escucha(enchufe, 3); // Wait for connections while (true) { log(INFO, "Waiting for clients\n"); Enchufe cliente = acepta(enchufe); log(INFO, "Received a client\n"); // Receive Command from client Byte buf[BUF_LEN] = {0}; Buffer outside_buf = { .buf = buf, .len = BUF_LEN }; size_t len = recibe(cliente, outside_buf); outside_buf.len = len; Command cmd = Command_deserialize(outside_buf); switch (cmd.tag) { case WRITE: { log(INFO, "Received WRITE Command\n"); ecriver(cliente, cmd.as.write.nbytes, atoss(data_dir_path)); log(INFO, "Finished WRITE Command\n"); } break; case READ: { log(INFO, "Received READ Command\n"); lire(cliente, cmd.as.read.cid, atoss(data_dir_path)); log(INFO, "Finished READ Command\n"); } break; default: { log(ERROR, "That command was incorrect, proceeding to ignore\n"); } break; } } }