Distributed File System written in C
at main 5.9 kB view raw
1#include <stdio.h> 2#include <stdlib.h> 3#include "lib/enchufe.h" 4#include "lib/inode.h" 5#include "lib/dnode.h" 6#include "lib/block.h" 7#include "lib/log.h" 8#define BUF_LEN 0x1000 9 10void add(DNode node) { 11 insert_into_dnode(node.address, node.port); 12} 13 14void list(Enchufe cliente) { 15 ListINode inodes = INode_get_db(); 16 17 Buffer inside_buf = ListINode_serialize(inodes); 18 zumba(cliente, inside_buf); 19 free(inside_buf.buf); 20 21 for (ListINode iptr = inodes; iptr != NULL;) { 22 free((void*)iptr->head.fname.str); 23 ListINode kod = iptr; 24 iptr = iptr->rest; 25 free(kod); 26 } 27} 28 29/// Registers a data node 30void reg(IPv4 address, Port port) { 31 // Create data node tuple 32 DNode dnode = { 33 .nid = new_uuid(), 34 .address = address, 35 .port = port, 36 }; 37 38 // Insert it in the data base 39 insert_into_dnode(dnode.address, dnode.port); 40 41 log(INFO, "Finished registering data node on: %d.%d.%d.%d:%d\n", dnode.address.bytes[0], dnode.address.bytes[1], dnode.address.bytes[2], dnode.address.bytes[3], dnode.port); 42} 43 44/// Puts file data on the server 45void put(Enchufe cliente, size_t fsize, SafeStr fname) { 46 // Send dnodes :: ListDNode to the client 47 log(INFO, "Sending the following data nodes:\n"); 48 ListDNode dnodes = select_from_dnode(NULL, NULL, NULL); 49 for (ListDNode dptr = dnodes; dptr != NULL; dptr = dptr->rest) { 50 DNode d = dptr->head; 51 log(INFO, "\tdnode on %d.%d.%d.%d:%d\n", d.address.bytes[0], d.address.bytes[1], d.address.bytes[2], d.address.bytes[3], d.port); 52 } 53 Buffer inside_buf = ListDNode_serialize(dnodes); 54 zumba(cliente, inside_buf); 55 free(inside_buf.buf); 56 57 // Create an inode for the new file 58 insert_into_inode(fname, fsize); 59 // Get new file UUID 60 ListINode inodes = select_from_inode(NULL, &fname, &fsize); 61 exists(inodes); 62 UUID fid = inodes->head.fid; 63 // Free the memory 64 ListINode_deinit(inodes); 65 free((void*)fname.str); 66 67 68 // Receive dinfos :: ListDNodeInfo from client 69 Byte buf[BUF_LEN] = {0}; 70 Buffer outside_buf = { .buf = buf, .len = BUF_LEN }; 71 size_t len = recibe(cliente, outside_buf); 72 outside_buf.len = len; 73 74 ListDNodeInfo dinfo = ListDNodeInfo_deserialize(outside_buf); 75 exists(dinfo); 76 77 // Create tuples in Block table 78 for (ListDNodeInfo lptr = dinfo; lptr != NULL; lptr = lptr->rest) { 79 DNodeInfo d = lptr->head; 80 81 insert_into_block(&fid, &d.dnode.nid, &d.cid, &d.seq); 82 83 log(INFO, "Received the following Block: (cid: %s, seq: %zu, dnode: %d.%d.%d.%d:%d)\n", utoa(d.cid).uuid, d.seq, d.dnode.address.bytes[0], d.dnode.address.bytes[1], d.dnode.address.bytes[2], d.dnode.address.bytes[3], d.dnode.port); 84 } 85} 86 87/// Get file data from server 88void get(Enchufe cliente, SafeStr fname) { 89 // Get the inode associated to that file 90 ListINode inodes = select_from_inode(NULL, &fname, NULL); 91 exists(inodes); 92 INode inode = inodes->head; 93 // 94 // Get the blocks associated to that inode 95 ListBlock blocks = select_from_block(NULL, &inode.fid, NULL, NULL, NULL); 96 exists(blocks); 97 98 // Construct list of dnode info for client 99 ListDNodeInfo dinfos = NULL; 100 for (ListBlock bptr = blocks; bptr != NULL; bptr = bptr->rest) { 101 // Get datanode associated to th node UUID of the block 102 ListDNode dnodes = select_from_dnode(&bptr->head.nid, NULL, NULL); 103 exists(dnodes); 104 105 // construct the data node info 106 for (ListDNode dptr = dnodes; dptr != NULL; dptr = dptr->rest) { 107 log(INFO, "Sending DNodeInfo(dnode: "); 108 print_dnode(dptr->head); 109 printf(", cid: %s, seq: %zu)\n", utoa(bptr->head.cid).uuid, bptr->head.seq); 110 dinfos = ListDNodeInfo_cons((DNodeInfo){ 111 .dnode = dptr->head, 112 .cid = bptr->head.cid, 113 .seq = bptr->head.seq, 114 }, dinfos); 115 } 116 } 117 118 // Send dinfos :: ListDNodeInfo to client 119 Buffer dinfo_buf = ListDNodeInfo_serialize(dinfos); 120 zumba(cliente, dinfo_buf); 121 free(dinfo_buf.buf); 122} 123 124/// Handles the connections when they come in 125bool handler(Enchufe enchufe) { 126 log(INFO, "Waiting for a connection\n"); 127 Enchufe cliente = acepta(enchufe); 128 log(INFO, "Accepted a connection\n"); 129 130 // Receive Command from client 131 Byte buf[BUF_LEN] = {0}; 132 Buffer outside_buf = { .buf = buf, .len = BUF_LEN }; 133 size_t len = recibe(cliente, outside_buf); 134 outside_buf.len = len; 135 136 // Call the correct sub-handler depending on the command 137 Command cmd = Command_deserialize(outside_buf); 138 switch (cmd.tag) { 139 case REG: { 140 log(INFO, "Received REG\n"); 141 reg(cmd.as.reg.address, cmd.as.reg.port); 142 log(INFO, "Finished REG\n"); 143 } break; 144 case PUT: { 145 log(INFO, "Received PUT(fsize: %zu, fname: \"%s\")\n", cmd.as.put.fsize, cmd.as.put.fname.str); 146 put(cliente, cmd.as.put.fsize, cmd.as.put.fname); 147 // Free memory allocated for file name 148 log(INFO, "Finished PUT\n"); 149 } break; 150 case LIST: { 151 log(INFO, "Received LIST\n"); 152 list(cliente); 153 log(INFO, "Finished LIST\n"); 154 } break; 155 case GET: { 156 log(INFO, "Received GET(fname: \"%s\")\n", cmd.as.get.fname.str); 157 get(cliente, cmd.as.get.fname); 158 // Free memory allocated for file name 159 log(INFO, "Finished LIST\n"); 160 } break; 161 default: { 162 log(ERROR, "That command was incorrect, proceeding to ignore\n"); 163 } break; 164 } 165 desenchufa(cliente); 166 167 return true; 168} 169 170/// This is the main metadata server 171int main(int argc, char** argv) { 172 // Parse command line arguments 173 Port port; 174 switch (argc) { 175 case 1: { 176 port = 8000; 177 } break; 178 case 2: { 179 Str str = argv[1]; 180 port = (Port)atoi(str); 181 } break; 182 default: { 183 log(ERROR, "USAGE: %s [PORT]\n", argv[0]); 184 log(ERROR, "When no port is issued, metadata server will listen on port 8000\n"); 185 exit(EXIT_FAILURE); 186 } break; 187 } 188 189 // Wait for connections 190 log(INFO, "Creating socket for metadata server on 127.0.0.1:%d\n", port); 191 Enchufe enchufe = enchufa((IPv4){ .bytes = {127,0,0,1}}, port); 192 amarra(enchufe); 193 log(INFO, "Waiting for up to 10 connections\n"); 194 escucha(enchufe, 10); 195 196 // Call the handler for ench connection 197 log(INFO, "Waiting for connections\n"); 198 while (true) { 199 if (!handler(enchufe)) break; 200 } 201 202 desenchufa(enchufe); 203 return EXIT_SUCCESS; 204}