Distributed File System written in C
at main 5.0 kB view raw
1#include <stdio.h> 2#include <sys/types.h> 3#include <sys/stat.h> 4#include "lib/lib.h" 5#include "lib/enchufe.h" 6#include "lib/uuid.h" 7#include "lib/log.h" 8#define BUF_LEN 0x1000 9 10/// Handle when a client wants to read a block 11void lire(Enchufe cliente, UUID cid, SafeStr data_dir_path) { 12 // Create path to black file in data directory 13 Str data_fpath = calloc(data_dir_path.len + 1 + sizeof(UUID_Str) + 5, sizeof(char)); 14 UUID_Str cid_str = utoa(cid); 15 exists(data_fpath); 16 memcpy((void*)data_fpath, data_dir_path.str, data_dir_path.len); 17 memcpy((void*)(data_fpath + data_dir_path.len), "/", 1); 18 memcpy((void*)(data_fpath + data_dir_path.len + 1), cid_str.uuid, sizeof(UUID_Str)); 19 memcpy((void*)(data_fpath + data_dir_path.len + 1 + sizeof(UUID_Str) - 1), ".blk", 4); 20 21 log(INFO, "Opening data file: %s\n", data_fpath); 22 FILE* f = fopen(data_fpath, "r"); 23 exists(f); 24 25 // Calculate size of block file 26 fseek(f, 0L, SEEK_END); 27 size_t nbytes = (size_t)ftell(f); 28 fseek(f, 0L, SEEK_SET); 29 30 // Send nbytes :: size_t to client 31 Buffer nbytes_buf = size_t_serialize(nbytes); 32 zumba(cliente, nbytes_buf); 33 free(nbytes_buf.buf); 34 35 // Send all the bytes in the block file to the client 36 size_t bytes_sent = 0; 37 LazyBuffer b = open(data_fpath); 38 log(INFO, "Sending %zu bytes\n", nbytes); 39 while (true) { 40 b = next_chunk(b); 41 if (b.len == 0) break; 42 43 // Send the bytes 44 zumba(cliente, (Buffer){ .buf = b.buf, .len = b.len }); 45 46 printf("\r"); 47 log(INFO, "Sent %zu bytes out of %zu", bytes_sent, nbytes); 48 fflush(stdout); 49 bytes_sent += b.len; 50 } 51 printf("\n"); 52 fclose(b.fptr); 53} 54 55/// Handle when a cllient wants to write a block 56void ecriver(Enchufe cliente, size_t nbytes, SafeStr data_dir_path) { 57 // Send cid :: UUID to client 58 UUID cid = new_uuid(); 59 Buffer uuid_buf = UUID_serialize(cid); 60 log(INFO, "Responding with cid: %s\n", utoa(cid).uuid); 61 zumba(cliente, uuid_buf); 62 free(uuid_buf.buf); 63 64 // create path for block file 65 Str data_fpath = calloc(data_dir_path.len + 1 + sizeof(UUID_Str) + 5, sizeof(char)); 66 UUID_Str cid_str = utoa(cid); 67 exists(data_fpath); 68 memcpy((void*)data_fpath, data_dir_path.str, data_dir_path.len); 69 memcpy((void*)(data_fpath + data_dir_path.len), "/", 1); 70 memcpy((void*)(data_fpath + data_dir_path.len + 1), cid_str.uuid, sizeof(UUID_Str)); 71 memcpy((void*)(data_fpath + data_dir_path.len + 1 + sizeof(UUID_Str) - 1), ".blk", 4); 72 73 log(INFO, "Opening data file: %s\n", data_fpath); 74 FILE* f = fopen(data_fpath, "w"); 75 exists(f); 76 77 // Receive all the bytes from the client 78 log(INFO, "Receiving %zu bytes\n", nbytes); 79 for (size_t bytes_received = 0; bytes_received < nbytes;) { 80 // Receive buf :: Buffer from client 81 Byte buf[CHUNK_SIZE] = {0}; 82 Buffer outside_buf = (Buffer){ .buf = buf, .len = CHUNK_SIZE }; 83 size_t len = recibe(cliente, outside_buf); 84 outside_buf.len = len; 85 86 bytes_received += len; 87 printf("\r"); 88 log(INFO, "Received %zu bytes out of %zu", bytes_received, nbytes); 89 fflush(stdout); 90 91 Buffer_write(outside_buf, f); 92 } 93 printf("\n"); 94 fclose(f); 95} 96 97/// This is the data node implementation 98int main(int argc, Str* argv) { 99 // Parse command line arguments 100 if (argc < 4 || argc > 5) { 101 fprintf(stderr, "USAGE: %s IPv4 PORT PATH [PORT]\n", argv[0]); 102 exit(1); 103 } 104 105 IPv4 this_address = (IPv4){ .bytes = {127,0,0,1}}; 106 Port this_port = (Port)atoi(argv[2]); 107 Str data_dir_path = argv[3]; 108 IPv4 metadata_address = parse_address(argv[1]); 109 Port metadata_port = argc == 5 ? (Port)atoi(argv[4]) : 8000; 110 111 log(INFO, "Creating data node directory inside: %s\n", data_dir_path); 112 mkdir_p(atoss(data_dir_path), 0700); 113 114 // Connect to the metadata server 115 Enchufe metadataserver = enchufa(metadata_address, metadata_port); 116 conecta(metadataserver); 117 118 // Send REG(address :: IPv4, port :: Port) to metadata server 119 Command reg = (Command){ 120 .tag = REG, 121 .as.reg = { 122 .address = this_address, 123 .port = this_port, 124 }, 125 }; 126 Buffer reg_buf = Command_serialize(reg); 127 zumba(metadataserver, reg_buf); 128 free(reg_buf.buf); 129 130 desenchufa(metadataserver); 131 132 // Open a socket to listen for other connections. 133 Enchufe enchufe = enchufa(this_address, this_port); 134 amarra(enchufe); 135 escucha(enchufe, 3); 136 137 // Wait for connections 138 while (true) { 139 log(INFO, "Waiting for clients\n"); 140 Enchufe cliente = acepta(enchufe); 141 log(INFO, "Received a client\n"); 142 143 // Receive Command from client 144 Byte buf[BUF_LEN] = {0}; 145 Buffer outside_buf = { .buf = buf, .len = BUF_LEN }; 146 size_t len = recibe(cliente, outside_buf); 147 outside_buf.len = len; 148 Command cmd = Command_deserialize(outside_buf); 149 150 switch (cmd.tag) { 151 case WRITE: { 152 log(INFO, "Received WRITE Command\n"); 153 ecriver(cliente, cmd.as.write.nbytes, atoss(data_dir_path)); 154 log(INFO, "Finished WRITE Command\n"); 155 } break; 156 case READ: { 157 log(INFO, "Received READ Command\n"); 158 lire(cliente, cmd.as.read.cid, atoss(data_dir_path)); 159 log(INFO, "Finished READ Command\n"); 160 } break; 161 default: { 162 log(ERROR, "That command was incorrect, proceeding to ignore\n"); 163 } break; 164 } 165 } 166}