Distributed File System written in C
1#include <stdio.h>
2#include <sys/types.h>
3#include <sys/stat.h>
4#include "lib/lib.h"
5#include "lib/enchufe.h"
6#include "lib/uuid.h"
7#include "lib/log.h"
8#define BUF_LEN 0x1000
9
10/// Handle when a client wants to read a block
11void lire(Enchufe cliente, UUID cid, SafeStr data_dir_path) {
12 // Create path to black file in data directory
13 Str data_fpath = calloc(data_dir_path.len + 1 + sizeof(UUID_Str) + 5, sizeof(char));
14 UUID_Str cid_str = utoa(cid);
15 exists(data_fpath);
16 memcpy((void*)data_fpath, data_dir_path.str, data_dir_path.len);
17 memcpy((void*)(data_fpath + data_dir_path.len), "/", 1);
18 memcpy((void*)(data_fpath + data_dir_path.len + 1), cid_str.uuid, sizeof(UUID_Str));
19 memcpy((void*)(data_fpath + data_dir_path.len + 1 + sizeof(UUID_Str) - 1), ".blk", 4);
20
21 log(INFO, "Opening data file: %s\n", data_fpath);
22 FILE* f = fopen(data_fpath, "r");
23 exists(f);
24
25 // Calculate size of block file
26 fseek(f, 0L, SEEK_END);
27 size_t nbytes = (size_t)ftell(f);
28 fseek(f, 0L, SEEK_SET);
29
30 // Send nbytes :: size_t to client
31 Buffer nbytes_buf = size_t_serialize(nbytes);
32 zumba(cliente, nbytes_buf);
33 free(nbytes_buf.buf);
34
35 // Send all the bytes in the block file to the client
36 size_t bytes_sent = 0;
37 LazyBuffer b = open(data_fpath);
38 log(INFO, "Sending %zu bytes\n", nbytes);
39 while (true) {
40 b = next_chunk(b);
41 if (b.len == 0) break;
42
43 // Send the bytes
44 zumba(cliente, (Buffer){ .buf = b.buf, .len = b.len });
45
46 printf("\r");
47 log(INFO, "Sent %zu bytes out of %zu", bytes_sent, nbytes);
48 fflush(stdout);
49 bytes_sent += b.len;
50 }
51 printf("\n");
52 fclose(b.fptr);
53}
54
55/// Handle when a cllient wants to write a block
56void ecriver(Enchufe cliente, size_t nbytes, SafeStr data_dir_path) {
57 // Send cid :: UUID to client
58 UUID cid = new_uuid();
59 Buffer uuid_buf = UUID_serialize(cid);
60 log(INFO, "Responding with cid: %s\n", utoa(cid).uuid);
61 zumba(cliente, uuid_buf);
62 free(uuid_buf.buf);
63
64 // create path for block file
65 Str data_fpath = calloc(data_dir_path.len + 1 + sizeof(UUID_Str) + 5, sizeof(char));
66 UUID_Str cid_str = utoa(cid);
67 exists(data_fpath);
68 memcpy((void*)data_fpath, data_dir_path.str, data_dir_path.len);
69 memcpy((void*)(data_fpath + data_dir_path.len), "/", 1);
70 memcpy((void*)(data_fpath + data_dir_path.len + 1), cid_str.uuid, sizeof(UUID_Str));
71 memcpy((void*)(data_fpath + data_dir_path.len + 1 + sizeof(UUID_Str) - 1), ".blk", 4);
72
73 log(INFO, "Opening data file: %s\n", data_fpath);
74 FILE* f = fopen(data_fpath, "w");
75 exists(f);
76
77 // Receive all the bytes from the client
78 log(INFO, "Receiving %zu bytes\n", nbytes);
79 for (size_t bytes_received = 0; bytes_received < nbytes;) {
80 // Receive buf :: Buffer from client
81 Byte buf[CHUNK_SIZE] = {0};
82 Buffer outside_buf = (Buffer){ .buf = buf, .len = CHUNK_SIZE };
83 size_t len = recibe(cliente, outside_buf);
84 outside_buf.len = len;
85
86 bytes_received += len;
87 printf("\r");
88 log(INFO, "Received %zu bytes out of %zu", bytes_received, nbytes);
89 fflush(stdout);
90
91 Buffer_write(outside_buf, f);
92 }
93 printf("\n");
94 fclose(f);
95}
96
97/// This is the data node implementation
98int main(int argc, Str* argv) {
99 // Parse command line arguments
100 if (argc < 4 || argc > 5) {
101 fprintf(stderr, "USAGE: %s IPv4 PORT PATH [PORT]\n", argv[0]);
102 exit(1);
103 }
104
105 IPv4 this_address = (IPv4){ .bytes = {127,0,0,1}};
106 Port this_port = (Port)atoi(argv[2]);
107 Str data_dir_path = argv[3];
108 IPv4 metadata_address = parse_address(argv[1]);
109 Port metadata_port = argc == 5 ? (Port)atoi(argv[4]) : 8000;
110
111 log(INFO, "Creating data node directory inside: %s\n", data_dir_path);
112 mkdir_p(atoss(data_dir_path), 0700);
113
114 // Connect to the metadata server
115 Enchufe metadataserver = enchufa(metadata_address, metadata_port);
116 conecta(metadataserver);
117
118 // Send REG(address :: IPv4, port :: Port) to metadata server
119 Command reg = (Command){
120 .tag = REG,
121 .as.reg = {
122 .address = this_address,
123 .port = this_port,
124 },
125 };
126 Buffer reg_buf = Command_serialize(reg);
127 zumba(metadataserver, reg_buf);
128 free(reg_buf.buf);
129
130 desenchufa(metadataserver);
131
132 // Open a socket to listen for other connections.
133 Enchufe enchufe = enchufa(this_address, this_port);
134 amarra(enchufe);
135 escucha(enchufe, 3);
136
137 // Wait for connections
138 while (true) {
139 log(INFO, "Waiting for clients\n");
140 Enchufe cliente = acepta(enchufe);
141 log(INFO, "Received a client\n");
142
143 // Receive Command from client
144 Byte buf[BUF_LEN] = {0};
145 Buffer outside_buf = { .buf = buf, .len = BUF_LEN };
146 size_t len = recibe(cliente, outside_buf);
147 outside_buf.len = len;
148 Command cmd = Command_deserialize(outside_buf);
149
150 switch (cmd.tag) {
151 case WRITE: {
152 log(INFO, "Received WRITE Command\n");
153 ecriver(cliente, cmd.as.write.nbytes, atoss(data_dir_path));
154 log(INFO, "Finished WRITE Command\n");
155 } break;
156 case READ: {
157 log(INFO, "Received READ Command\n");
158 lire(cliente, cmd.as.read.cid, atoss(data_dir_path));
159 log(INFO, "Finished READ Command\n");
160 } break;
161 default: {
162 log(ERROR, "That command was incorrect, proceeding to ignore\n");
163 } break;
164 }
165 }
166}