Distributed File System written in C
1#include <stdio.h>
2#include <assert.h>
3#include <stdlib.h>
4#include <unistd.h>
5#include "lib/lib.h"
6#include "lib/enchufe.h"
7#include "lib/dnode.h"
8#include "lib/log.h"
9#define BUF_LEN 0x1000
10
11/// Copies from the server to the client.
12void server_to_client(Enchufe dnode, UUID cid, FILE* f) {
13 conecta(dnode);
14
15 // Send READ(cid :: UID) to dnode
16 Command read = (Command){
17 .tag = READ,
18 .as.read.cid = cid,
19 };
20 Buffer read_buf = Command_serialize(read);
21 zumba(dnode, read_buf);
22 free(read_buf.buf);
23
24 // Receive nbytes :: size_t from dnode
25 Byte fsize_bytes[sizeof(size_t)] = {0};
26 Buffer fsize_buf = (Buffer){ .buf = fsize_bytes, .len = sizeof(size_t) };
27 size_t len = recibe(dnode, fsize_buf);
28
29 // Crash if metadata server sent something wrong
30 if (len != sizeof(size_t)) {
31 log(FATAL, "Metadata server sent wrong amount of bytes. Expected %zu, got %zu\n", sizeof(size_t), len);
32 exit(EXIT_FAILURE);
33 }
34
35 size_t nbytes = size_t_deserialize(fsize_buf);
36
37 // Receive all bytes from dnode
38 for (size_t bytes_received = 0; bytes_received < nbytes;) {
39 Byte buf[CHUNK_SIZE] = {0};
40 Buffer outside_buf = (Buffer){ .buf = buf, .len = CHUNK_SIZE };
41 len = recibe(dnode, outside_buf);
42 outside_buf.len = len;
43
44 Buffer_write(outside_buf, f);
45
46 bytes_received += len;
47 printf("\r");
48 log(INFO, "Received %zu bytes out of %zu", bytes_received, nbytes);
49 fflush(stdout);
50 }
51 printf("\n");
52}
53
54/// Copy from the client to the server
55UUID client_to_server(Enchufe dnode, LazyBuffer* b, size_t nbytes, size_t chunks_per_dnode) {
56 exists(b);
57 conecta(dnode);
58
59 // Send WRITE(nbytes :: size_t) to dnode
60 Command write = (Command){
61 .tag = WRITE,
62 .as.write = {
63 .nbytes = nbytes,
64 },
65 };
66 Buffer write_buf = Command_serialize(write);
67 zumba(dnode, write_buf);
68 free(write_buf.buf);
69
70 // Receive cid :: UUID from dnode
71 Buffer cid_buf = UUID_serialize((UUID){0});
72 recibe(dnode, cid_buf);
73 UUID cid = UUID_deserialize(cid_buf);
74
75 size_t bytes_sent = 0;
76 // Send all bytes to dnode
77 for (size_t sent_chunks = 0; sent_chunks < chunks_per_dnode; sent_chunks++) {
78 // Get the next chunk of bytes
79 *b = next_chunk(*b);
80 if (b->len == 0) break;
81
82 // Send buf :: Buffer to dnode
83 zumba(dnode, (Buffer){ .buf = b->buf, .len = b->len });
84
85 bytes_sent += b->len;
86 printf("\r");
87 log(INFO, "Sent %zu bytes out of %zu", bytes_sent, nbytes);
88 fflush(stdout);
89 }
90 printf("\n");
91 desenchufa(dnode);
92
93 return cid;
94}
95
96/// This is the copy client implementation
97int main(int argc, char** argv) {
98 // Parse command line arguments
99 if (argc != 6) {
100 for (int i = 0; i < argc; ++i) {
101 fprintf(stderr, "%s ", argv[i]);
102 }
103 fprintf(stderr, "\n");
104 fprintf(stderr, "USAGE: %s IPv4 Port [-s] path/to/source_file.ext [-s] path/to/dest_file.ext\n", argv[0]);
105 fprintf(stderr, "USAGE: use the -s flag to indicate which file is in the server\n");
106 exit(1);
107 }
108
109 // Depending on this flag, we can determine whether the file is being copied
110 // from the server or not.
111 bool from_server = Str_eq(argv[3], "-s");
112
113 IPv4 ip = parse_address(argv[1]);
114 Port port = (Port)atoi(argv[2]);
115 Str dest = argv[5];
116 Str source = from_server ? argv[4] : argv[3];
117
118 if (from_server) {
119 // Connect to metadata server
120 Enchufe metadata = enchufa(ip, port);
121 conecta(metadata);
122
123 // Send GET(fname :: SafeStr) to metadata server
124 Command get = (Command){
125 .tag = GET,
126 .as.get.fname = atoss(source), // INFO: This is safe because source is a command line argument
127 };
128 Buffer get_buf = Command_serialize(get);
129 zumba(metadata, get_buf);
130 free(get_buf.buf);
131
132 // Receive dinfo :: ListDNodeInfo from metadata server
133 Byte buf[BUF_LEN] = {0};
134 Buffer outside_buf = (Buffer){
135 .buf = buf,
136 .len = BUF_LEN,
137 };
138 size_t len = recibe(metadata, outside_buf);
139 outside_buf.len = len;
140 ListDNodeInfo dinfos = ListDNodeInfo_deserialize(outside_buf);
141
142 // Sort the ListDNodeInfo depending on the sequence number.
143 dinfos = sort(dinfos);
144
145 desenchufa(metadata);
146
147 FILE* f = fopen(dest, "w");
148 exists(f);
149
150 // Get data from each dnode and copy it to the client.
151 for (ListDNodeInfo dptr = dinfos; dptr != NULL; dptr = dptr->rest) {
152 DNodeInfo dinfo = dptr->head;
153
154 // Do exchange with data node
155 Enchufe dnode = enchufa(dinfo.dnode.address, dinfo.dnode.port);
156 server_to_client(dnode, dinfo.cid, f);
157 desenchufa(dnode);
158 }
159 fclose(f);
160 } else {
161 // Check if file exists
162 if (access(source, F_OK) == 0) {
163 // Connect to metadata server
164 Enchufe metadata = enchufa(ip, port);
165 conecta(metadata);
166
167 // Open lazyBuffer for reading
168 LazyBuffer b = open(source);
169 exists(b.fptr);
170
171 // Send PUT(fsize :: size_t, fname :: SafeStr) to metadata server
172 Command put = {
173 .tag = PUT,
174 .as.put = {
175 .fsize = b.size,
176 .fname = atoss(dest), // INFO: This is safe because dest is a command line argument
177 },
178 };
179 Buffer put_buf = Command_serialize(put);
180 zumba(metadata, put_buf);
181 free(put_buf.buf);
182
183 // Receive dnodes :: ListDNode from metadata server
184 Byte buf1[BUF_LEN] = {0};
185 Buffer outside_buf = { .buf = buf1, .len = BUF_LEN };
186 size_t len = recibe(metadata, outside_buf);
187 outside_buf.len = len;
188
189 if (len == 0) {
190 log(FATAL, "Metadata server closed the connection right after sending a PUT request\n");
191 exit(EXIT_FAILURE);
192 }
193
194 ListDNode dnodes = ListDNode_deserialize(outside_buf);
195
196 // INFO: This will calculate how many chunks will have to be sent. In
197 // the case where the amount of bytes that have to be sent aren't the a
198 // multiple of CHUNK_SIZE, this will have an extra 1 added. Hence, the slightly
199 // funny looking line:
200 // \code
201 // !!(b.size % CHUNK_SIZE)
202 // \endcode
203 size_t chunk_count = (b.size / CHUNK_SIZE) + !!(b.size % CHUNK_SIZE);
204
205 // INFO: Additionally, the chunks_per_dnode line looks like that because I
206 // wanted a fast ceiling integer division.
207 // from https://stackoverflow.com/questions/2745074/fast-ceiling-of-an-integer-division-in-c-c
208 size_t dnode_count = ListDNode_length(dnodes);
209 size_t chunks_per_dnode = 1 + (chunk_count - 1) / dnode_count;
210
211 // Connect to each data node and do the exchange
212 ListDNodeInfo dinfos = NULL;
213 size_t seq = 1;
214 for (ListDNode dptr = dnodes; dptr != NULL; dptr = dptr ->rest) {
215 DNode d = dptr->head;
216
217 // Connect to data node
218 Enchufe dnode = enchufa(d.address, d.port);
219
220 // Calculate remaining bytes
221 size_t bytes_remaining = b.size - b.position;
222 size_t nbytes = min(bytes_remaining, chunks_per_dnode * CHUNK_SIZE);
223
224 // Do the exchange
225 UUID cid = client_to_server(dnode, &b, nbytes, chunks_per_dnode);
226 dinfos = ListDNodeInfo_cons((DNodeInfo){
227 .dnode = d,
228 .cid = cid,
229 .seq = seq,
230 }, dinfos);
231 seq += 1;
232 desenchufa(dnode);
233 }
234 fclose(b.fptr);
235
236 // Send dinfos :: ListDNodeInfo to metadata server
237 Buffer dinfo_buf = ListDNodeInfo_serialize(dinfos);
238 zumba(metadata, dinfo_buf);
239 desenchufa(metadata);
240 } else {
241 log(FATAL, "'%s': No such file or directory\n", source);
242 exit(EXIT_FAILURE);
243 }
244 }
245}