Distributed File System written in C
1#include <stdio.h>
2#include <stdlib.h>
3#include "lib/enchufe.h"
4#include "lib/inode.h"
5#include "lib/dnode.h"
6#include "lib/block.h"
7#include "lib/log.h"
8#define BUF_LEN 0x1000
9
10void add(DNode node) {
11 insert_into_dnode(node.address, node.port);
12}
13
14void list(Enchufe cliente) {
15 ListINode inodes = INode_get_db();
16
17 Buffer inside_buf = ListINode_serialize(inodes);
18 zumba(cliente, inside_buf);
19 free(inside_buf.buf);
20
21 for (ListINode iptr = inodes; iptr != NULL;) {
22 free((void*)iptr->head.fname.str);
23 ListINode kod = iptr;
24 iptr = iptr->rest;
25 free(kod);
26 }
27}
28
29/// Registers a data node
30void reg(IPv4 address, Port port) {
31 // Create data node tuple
32 DNode dnode = {
33 .nid = new_uuid(),
34 .address = address,
35 .port = port,
36 };
37
38 // Insert it in the data base
39 insert_into_dnode(dnode.address, dnode.port);
40
41 log(INFO, "Finished registering data node on: %d.%d.%d.%d:%d\n", dnode.address.bytes[0], dnode.address.bytes[1], dnode.address.bytes[2], dnode.address.bytes[3], dnode.port);
42}
43
44/// Puts file data on the server
45void put(Enchufe cliente, size_t fsize, SafeStr fname) {
46 // Send dnodes :: ListDNode to the client
47 log(INFO, "Sending the following data nodes:\n");
48 ListDNode dnodes = select_from_dnode(NULL, NULL, NULL);
49 for (ListDNode dptr = dnodes; dptr != NULL; dptr = dptr->rest) {
50 DNode d = dptr->head;
51 log(INFO, "\tdnode on %d.%d.%d.%d:%d\n", d.address.bytes[0], d.address.bytes[1], d.address.bytes[2], d.address.bytes[3], d.port);
52 }
53 Buffer inside_buf = ListDNode_serialize(dnodes);
54 zumba(cliente, inside_buf);
55 free(inside_buf.buf);
56
57 // Create an inode for the new file
58 insert_into_inode(fname, fsize);
59 // Get new file UUID
60 ListINode inodes = select_from_inode(NULL, &fname, &fsize);
61 exists(inodes);
62 UUID fid = inodes->head.fid;
63 // Free the memory
64 ListINode_deinit(inodes);
65 free((void*)fname.str);
66
67
68 // Receive dinfos :: ListDNodeInfo from client
69 Byte buf[BUF_LEN] = {0};
70 Buffer outside_buf = { .buf = buf, .len = BUF_LEN };
71 size_t len = recibe(cliente, outside_buf);
72 outside_buf.len = len;
73
74 ListDNodeInfo dinfo = ListDNodeInfo_deserialize(outside_buf);
75 exists(dinfo);
76
77 // Create tuples in Block table
78 for (ListDNodeInfo lptr = dinfo; lptr != NULL; lptr = lptr->rest) {
79 DNodeInfo d = lptr->head;
80
81 insert_into_block(&fid, &d.dnode.nid, &d.cid, &d.seq);
82
83 log(INFO, "Received the following Block: (cid: %s, seq: %zu, dnode: %d.%d.%d.%d:%d)\n", utoa(d.cid).uuid, d.seq, d.dnode.address.bytes[0], d.dnode.address.bytes[1], d.dnode.address.bytes[2], d.dnode.address.bytes[3], d.dnode.port);
84 }
85}
86
87/// Get file data from server
88void get(Enchufe cliente, SafeStr fname) {
89 // Get the inode associated to that file
90 ListINode inodes = select_from_inode(NULL, &fname, NULL);
91 exists(inodes);
92 INode inode = inodes->head;
93 //
94 // Get the blocks associated to that inode
95 ListBlock blocks = select_from_block(NULL, &inode.fid, NULL, NULL, NULL);
96 exists(blocks);
97
98 // Construct list of dnode info for client
99 ListDNodeInfo dinfos = NULL;
100 for (ListBlock bptr = blocks; bptr != NULL; bptr = bptr->rest) {
101 // Get datanode associated to th node UUID of the block
102 ListDNode dnodes = select_from_dnode(&bptr->head.nid, NULL, NULL);
103 exists(dnodes);
104
105 // construct the data node info
106 for (ListDNode dptr = dnodes; dptr != NULL; dptr = dptr->rest) {
107 log(INFO, "Sending DNodeInfo(dnode: ");
108 print_dnode(dptr->head);
109 printf(", cid: %s, seq: %zu)\n", utoa(bptr->head.cid).uuid, bptr->head.seq);
110 dinfos = ListDNodeInfo_cons((DNodeInfo){
111 .dnode = dptr->head,
112 .cid = bptr->head.cid,
113 .seq = bptr->head.seq,
114 }, dinfos);
115 }
116 }
117
118 // Send dinfos :: ListDNodeInfo to client
119 Buffer dinfo_buf = ListDNodeInfo_serialize(dinfos);
120 zumba(cliente, dinfo_buf);
121 free(dinfo_buf.buf);
122}
123
124/// Handles the connections when they come in
125bool handler(Enchufe enchufe) {
126 log(INFO, "Waiting for a connection\n");
127 Enchufe cliente = acepta(enchufe);
128 log(INFO, "Accepted a connection\n");
129
130 // Receive Command from client
131 Byte buf[BUF_LEN] = {0};
132 Buffer outside_buf = { .buf = buf, .len = BUF_LEN };
133 size_t len = recibe(cliente, outside_buf);
134 outside_buf.len = len;
135
136 // Call the correct sub-handler depending on the command
137 Command cmd = Command_deserialize(outside_buf);
138 switch (cmd.tag) {
139 case REG: {
140 log(INFO, "Received REG\n");
141 reg(cmd.as.reg.address, cmd.as.reg.port);
142 log(INFO, "Finished REG\n");
143 } break;
144 case PUT: {
145 log(INFO, "Received PUT(fsize: %zu, fname: \"%s\")\n", cmd.as.put.fsize, cmd.as.put.fname.str);
146 put(cliente, cmd.as.put.fsize, cmd.as.put.fname);
147 // Free memory allocated for file name
148 log(INFO, "Finished PUT\n");
149 } break;
150 case LIST: {
151 log(INFO, "Received LIST\n");
152 list(cliente);
153 log(INFO, "Finished LIST\n");
154 } break;
155 case GET: {
156 log(INFO, "Received GET(fname: \"%s\")\n", cmd.as.get.fname.str);
157 get(cliente, cmd.as.get.fname);
158 // Free memory allocated for file name
159 log(INFO, "Finished LIST\n");
160 } break;
161 default: {
162 log(ERROR, "That command was incorrect, proceeding to ignore\n");
163 } break;
164 }
165 desenchufa(cliente);
166
167 return true;
168}
169
170/// This is the main metadata server
171int main(int argc, char** argv) {
172 // Parse command line arguments
173 Port port;
174 switch (argc) {
175 case 1: {
176 port = 8000;
177 } break;
178 case 2: {
179 Str str = argv[1];
180 port = (Port)atoi(str);
181 } break;
182 default: {
183 log(ERROR, "USAGE: %s [PORT]\n", argv[0]);
184 log(ERROR, "When no port is issued, metadata server will listen on port 8000\n");
185 exit(EXIT_FAILURE);
186 } break;
187 }
188
189 // Wait for connections
190 log(INFO, "Creating socket for metadata server on 127.0.0.1:%d\n", port);
191 Enchufe enchufe = enchufa((IPv4){ .bytes = {127,0,0,1}}, port);
192 amarra(enchufe);
193 log(INFO, "Waiting for up to 10 connections\n");
194 escucha(enchufe, 10);
195
196 // Call the handler for ench connection
197 log(INFO, "Waiting for connections\n");
198 while (true) {
199 if (!handler(enchufe)) break;
200 }
201
202 desenchufa(enchufe);
203 return EXIT_SUCCESS;
204}