Distributed File System written in C
at main 7.0 kB view raw
1#include <stdio.h> 2#include <assert.h> 3#include <stdlib.h> 4#include <unistd.h> 5#include "lib/lib.h" 6#include "lib/enchufe.h" 7#include "lib/dnode.h" 8#include "lib/log.h" 9#define BUF_LEN 0x1000 10 11/// Copies from the server to the client. 12void server_to_client(Enchufe dnode, UUID cid, FILE* f) { 13 conecta(dnode); 14 15 // Send READ(cid :: UID) to dnode 16 Command read = (Command){ 17 .tag = READ, 18 .as.read.cid = cid, 19 }; 20 Buffer read_buf = Command_serialize(read); 21 zumba(dnode, read_buf); 22 free(read_buf.buf); 23 24 // Receive nbytes :: size_t from dnode 25 Byte fsize_bytes[sizeof(size_t)] = {0}; 26 Buffer fsize_buf = (Buffer){ .buf = fsize_bytes, .len = sizeof(size_t) }; 27 size_t len = recibe(dnode, fsize_buf); 28 29 // Crash if metadata server sent something wrong 30 if (len != sizeof(size_t)) { 31 log(FATAL, "Metadata server sent wrong amount of bytes. Expected %zu, got %zu\n", sizeof(size_t), len); 32 exit(EXIT_FAILURE); 33 } 34 35 size_t nbytes = size_t_deserialize(fsize_buf); 36 37 // Receive all bytes from dnode 38 for (size_t bytes_received = 0; bytes_received < nbytes;) { 39 Byte buf[CHUNK_SIZE] = {0}; 40 Buffer outside_buf = (Buffer){ .buf = buf, .len = CHUNK_SIZE }; 41 len = recibe(dnode, outside_buf); 42 outside_buf.len = len; 43 44 Buffer_write(outside_buf, f); 45 46 bytes_received += len; 47 printf("\r"); 48 log(INFO, "Received %zu bytes out of %zu", bytes_received, nbytes); 49 fflush(stdout); 50 } 51 printf("\n"); 52} 53 54/// Copy from the client to the server 55UUID client_to_server(Enchufe dnode, LazyBuffer* b, size_t nbytes, size_t chunks_per_dnode) { 56 exists(b); 57 conecta(dnode); 58 59 // Send WRITE(nbytes :: size_t) to dnode 60 Command write = (Command){ 61 .tag = WRITE, 62 .as.write = { 63 .nbytes = nbytes, 64 }, 65 }; 66 Buffer write_buf = Command_serialize(write); 67 zumba(dnode, write_buf); 68 free(write_buf.buf); 69 70 // Receive cid :: UUID from dnode 71 Buffer cid_buf = UUID_serialize((UUID){0}); 72 recibe(dnode, cid_buf); 73 UUID cid = UUID_deserialize(cid_buf); 74 75 size_t bytes_sent = 0; 76 // Send all bytes to dnode 77 for (size_t sent_chunks = 0; sent_chunks < chunks_per_dnode; sent_chunks++) { 78 // Get the next chunk of bytes 79 *b = next_chunk(*b); 80 if (b->len == 0) break; 81 82 // Send buf :: Buffer to dnode 83 zumba(dnode, (Buffer){ .buf = b->buf, .len = b->len }); 84 85 bytes_sent += b->len; 86 printf("\r"); 87 log(INFO, "Sent %zu bytes out of %zu", bytes_sent, nbytes); 88 fflush(stdout); 89 } 90 printf("\n"); 91 desenchufa(dnode); 92 93 return cid; 94} 95 96/// This is the copy client implementation 97int main(int argc, char** argv) { 98 // Parse command line arguments 99 if (argc != 6) { 100 for (int i = 0; i < argc; ++i) { 101 fprintf(stderr, "%s ", argv[i]); 102 } 103 fprintf(stderr, "\n"); 104 fprintf(stderr, "USAGE: %s IPv4 Port [-s] path/to/source_file.ext [-s] path/to/dest_file.ext\n", argv[0]); 105 fprintf(stderr, "USAGE: use the -s flag to indicate which file is in the server\n"); 106 exit(1); 107 } 108 109 // Depending on this flag, we can determine whether the file is being copied 110 // from the server or not. 111 bool from_server = Str_eq(argv[3], "-s"); 112 113 IPv4 ip = parse_address(argv[1]); 114 Port port = (Port)atoi(argv[2]); 115 Str dest = argv[5]; 116 Str source = from_server ? argv[4] : argv[3]; 117 118 if (from_server) { 119 // Connect to metadata server 120 Enchufe metadata = enchufa(ip, port); 121 conecta(metadata); 122 123 // Send GET(fname :: SafeStr) to metadata server 124 Command get = (Command){ 125 .tag = GET, 126 .as.get.fname = atoss(source), // INFO: This is safe because source is a command line argument 127 }; 128 Buffer get_buf = Command_serialize(get); 129 zumba(metadata, get_buf); 130 free(get_buf.buf); 131 132 // Receive dinfo :: ListDNodeInfo from metadata server 133 Byte buf[BUF_LEN] = {0}; 134 Buffer outside_buf = (Buffer){ 135 .buf = buf, 136 .len = BUF_LEN, 137 }; 138 size_t len = recibe(metadata, outside_buf); 139 outside_buf.len = len; 140 ListDNodeInfo dinfos = ListDNodeInfo_deserialize(outside_buf); 141 142 // Sort the ListDNodeInfo depending on the sequence number. 143 dinfos = sort(dinfos); 144 145 desenchufa(metadata); 146 147 FILE* f = fopen(dest, "w"); 148 exists(f); 149 150 // Get data from each dnode and copy it to the client. 151 for (ListDNodeInfo dptr = dinfos; dptr != NULL; dptr = dptr->rest) { 152 DNodeInfo dinfo = dptr->head; 153 154 // Do exchange with data node 155 Enchufe dnode = enchufa(dinfo.dnode.address, dinfo.dnode.port); 156 server_to_client(dnode, dinfo.cid, f); 157 desenchufa(dnode); 158 } 159 fclose(f); 160 } else { 161 // Check if file exists 162 if (access(source, F_OK) == 0) { 163 // Connect to metadata server 164 Enchufe metadata = enchufa(ip, port); 165 conecta(metadata); 166 167 // Open lazyBuffer for reading 168 LazyBuffer b = open(source); 169 exists(b.fptr); 170 171 // Send PUT(fsize :: size_t, fname :: SafeStr) to metadata server 172 Command put = { 173 .tag = PUT, 174 .as.put = { 175 .fsize = b.size, 176 .fname = atoss(dest), // INFO: This is safe because dest is a command line argument 177 }, 178 }; 179 Buffer put_buf = Command_serialize(put); 180 zumba(metadata, put_buf); 181 free(put_buf.buf); 182 183 // Receive dnodes :: ListDNode from metadata server 184 Byte buf1[BUF_LEN] = {0}; 185 Buffer outside_buf = { .buf = buf1, .len = BUF_LEN }; 186 size_t len = recibe(metadata, outside_buf); 187 outside_buf.len = len; 188 189 if (len == 0) { 190 log(FATAL, "Metadata server closed the connection right after sending a PUT request\n"); 191 exit(EXIT_FAILURE); 192 } 193 194 ListDNode dnodes = ListDNode_deserialize(outside_buf); 195 196 // INFO: This will calculate how many chunks will have to be sent. In 197 // the case where the amount of bytes that have to be sent aren't the a 198 // multiple of CHUNK_SIZE, this will have an extra 1 added. Hence, the slightly 199 // funny looking line: 200 // \code 201 // !!(b.size % CHUNK_SIZE) 202 // \endcode 203 size_t chunk_count = (b.size / CHUNK_SIZE) + !!(b.size % CHUNK_SIZE); 204 205 // INFO: Additionally, the chunks_per_dnode line looks like that because I 206 // wanted a fast ceiling integer division. 207 // from https://stackoverflow.com/questions/2745074/fast-ceiling-of-an-integer-division-in-c-c 208 size_t dnode_count = ListDNode_length(dnodes); 209 size_t chunks_per_dnode = 1 + (chunk_count - 1) / dnode_count; 210 211 // Connect to each data node and do the exchange 212 ListDNodeInfo dinfos = NULL; 213 size_t seq = 1; 214 for (ListDNode dptr = dnodes; dptr != NULL; dptr = dptr ->rest) { 215 DNode d = dptr->head; 216 217 // Connect to data node 218 Enchufe dnode = enchufa(d.address, d.port); 219 220 // Calculate remaining bytes 221 size_t bytes_remaining = b.size - b.position; 222 size_t nbytes = min(bytes_remaining, chunks_per_dnode * CHUNK_SIZE); 223 224 // Do the exchange 225 UUID cid = client_to_server(dnode, &b, nbytes, chunks_per_dnode); 226 dinfos = ListDNodeInfo_cons((DNodeInfo){ 227 .dnode = d, 228 .cid = cid, 229 .seq = seq, 230 }, dinfos); 231 seq += 1; 232 desenchufa(dnode); 233 } 234 fclose(b.fptr); 235 236 // Send dinfos :: ListDNodeInfo to metadata server 237 Buffer dinfo_buf = ListDNodeInfo_serialize(dinfos); 238 zumba(metadata, dinfo_buf); 239 desenchufa(metadata); 240 } else { 241 log(FATAL, "'%s': No such file or directory\n", source); 242 exit(EXIT_FAILURE); 243 } 244 } 245}