Threads and Scheduling
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: Add documentation.

+142 -12
+24 -2
src/cluster.c
··· 20 20 21 21 22 22 int consume(void*args) { 23 - 23 + // Turns the strange looking thing thrd_current() into a smaller number. 24 24 size_t tid = ((thrd_current() & 0xFFFF) >> 12) % 12; 25 25 26 26 log(INFO, "Consumer %zu started.\n", tid); ··· 31 31 size_t total_time = 0; 32 32 Proc proc = {0}; 33 33 while (true) { 34 - 34 + // Enter crit region 35 35 mtx_lock(&pq_mtx); // down 36 36 if (pq_empty(pq)) { 37 37 mtx_unlock(&pq_mtx); // up ··· 42 42 pq_delete(&pq); 43 43 mtx_unlock(&pq_mtx); // up 44 44 } 45 + // Left crit region 45 46 47 + // Let the client know what just happened. 46 48 log(INFO, "Consumer %zu:\n\tProcessing: %s for %d seconds.\n", tid, (const char*)proc.program.buf, proc.time); 47 49 zumba(cliente, proc.program); 48 50 51 + // Perform work. 49 52 sleep(proc.time); 50 53 total_time += proc.time; 54 + // Free the buffer. 51 55 free(proc.program.buf); 52 56 } 53 57 58 + // Report back to the user. 54 59 log(INFO, "Consumer %zu consumed %zu seconds of CPU time.\n", tid, total_time); 55 60 56 61 return thrd_success; ··· 67 72 }; 68 73 69 74 while (true) { 75 + // Get the message. 70 76 size_t msg_len = recibe(cliente, out_buf); 71 77 78 + // The client closed the connection. 72 79 if (msg_len == 0) break; 73 80 81 + // Deserialize the message 74 82 Procs procs = deserialize(out_buf, msg_len); 75 83 for (size_t i = 0; i < procs.len; ++i) { 76 84 Proc proc = procs.procs[i]; 85 + 86 + // Put the proc in the queue (Enter crit region) 77 87 mtx_lock(&pq_mtx); // down 78 88 pq_insert(&pq, proc); 79 89 mtx_unlock(&pq_mtx); // up 90 + // Leave crit region 80 91 } 81 92 82 93 memset(out_buf.buf, 0, out_buf.len); 83 94 } 95 + // Let the rest of the threads know that the client is closed. 84 96 client_closed = true; 85 97 86 98 ··· 90 102 int main(int argc, const char** argv) { 91 103 assert(argc == 3 && "You must provide three arguments."); 92 104 105 + // Get a new enchufe. 93 106 IPv4 ip = parse_address(argv[1]); 94 107 Port port = (Port)atoi(argv[2]); 95 108 Enchufe enchufe = enchufa(ip, port); 96 109 amarra(enchufe); 97 110 111 + // Listen to incoming clients. 98 112 escucha(enchufe, 1); 99 113 log(INFO, "Listening to: %d.%d.%d.%d on port: %d\n", ip.bytes[0], ip.bytes[1], ip.bytes[2], ip.bytes[3], port); 100 114 115 + // Accept connection from client. 101 116 Enchufe cliente = acepta(enchufe); 102 117 log(INFO, "Accepted a connection.\n"); 103 118 119 + // Initialize data for program. 104 120 mtx_init(&pq_mtx, mtx_plain); 105 121 mtx_init(&so_mtx, mtx_plain); 106 122 pq = pq_init(); 107 123 124 + // Start producer. 108 125 thrd_t producer = {0}; 109 126 thrd_create(&producer, produce, (void*)&cliente); 110 127 111 128 log(INFO, "Producer started.\n"); 112 129 130 + // Start consumers. 113 131 thrd_t consumers[THREAD_COUNT] = {0}; 114 132 for (size_t i = 0; i < THREAD_COUNT; ++i) { 115 133 thrd_create(&consumers[i], consume, (void*)&cliente); 116 134 } 117 135 log(INFO, "Consumers started.\n"); 118 136 137 + // Join the producer. 119 138 thrd_join(producer, NULL); 139 + 140 + // Join the consumers. 120 141 for (size_t i = 0; i < THREAD_COUNT; ++i) { 121 142 thrd_join(consumers[i], NULL); 122 143 } 123 144 124 145 log(INFO, "Done.\n"); 125 146 147 + // Deinitialize all the program's data. 126 148 pq_deinit(&pq); 127 149 mtx_destroy(&pq_mtx); 128 150 mtx_destroy(&so_mtx);
+9
src/lib/enchufe.c
··· 4 4 #include <netinet/in.h> // sockaddr_in 5 5 #include <arpa/inet.h> // inet_pton 6 6 7 + // Esta funcion llama a tres otras funciones inline. 7 8 Enchufe enchufa(IPv4 ip, Port port) { 8 9 return aplasta(nuevo(), receptaculo(ip, port)); 9 10 } 10 11 12 + // Wrapper para connect. 11 13 void conecta(Enchufe enchufe) { 12 14 try (connect(enchufe.fd, (const struct sockaddr*)&enchufe.addr, enchufe.addrlen)); 13 15 } 14 16 17 + // Wrapper para bind. 15 18 void amarra(Enchufe enchufe) { 16 19 try (bind(enchufe.fd, (struct sockaddr*)&enchufe.addr, enchufe.addrlen)); 17 20 } 18 21 22 + // Wrapper para liste. 19 23 void escucha(Enchufe enchufe, size_t len) { 20 24 listen(enchufe.fd, (int)len); 21 25 } 22 26 27 + // Wrapper para acepta. 23 28 Enchufe acepta(Enchufe enchufe) { 24 29 FD fd = accept(enchufe.fd, (struct sockaddr*)&enchufe.addr, &enchufe.addrlen); 25 30 try (fd); ··· 30 35 }; 31 36 } 32 37 38 + // Wrapper para zumba. 33 39 void zumba(Enchufe enchufe, Buffer buf) { 34 40 try (write(enchufe.fd, buf.buf, buf.len)); 35 41 } 36 42 43 + // Wrapper para recibe. 37 44 size_t recibe(Enchufe enchufe, Buffer buf) { 38 45 int64_t bytes_read = read(enchufe.fd, buf.buf, buf.len); 39 46 try (bytes_read); 40 47 return (size_t)bytes_read; 41 48 } 42 49 50 + // Wrapper para close. 43 51 void desenchufa(Enchufe enchufe) { 44 52 close(enchufe.fd); 45 53 } 46 54 55 + // Esta funcion convierte un string a un buffer. 47 56 Buffer atob(const char* str) { 48 57 return (Buffer){ 49 58 .buf = (Byte*)str,
+39
src/lib/enchufe.h
··· 8 8 #include <string.h> 9 9 #include <sys/socket.h> 10 10 11 + // Macro para probar si un numero es negativo. En general, esta libreria 12 + // prefiere crashear el programa que dejar que el usuario arregle un error. 11 13 #define try(a) do { \ 12 14 if ((a) < 0) { \ 13 15 fprintf(stderr, "[ERROR]: %s:%d %s\n", __FILE__, __LINE__, strerror(errno)); \ ··· 19 21 typedef uint16_t Port; 20 22 typedef uint8_t Byte; 21 23 24 + // Buffer de bytes, lo puedes usar para lo que sea. 22 25 typedef struct { 23 26 size_t len; 24 27 Byte* buf; 25 28 } Buffer; 26 29 30 + // Convierte un string de C a un buffer de bytes. 27 31 Buffer atob(const char* str); 28 32 33 + 34 + // Tipo para un IPv4. Ademas te ayuda convertir entre little endian y big 35 + // endian. La data de bytes aparece como bytes[3]bytes[2]bytes[1]bytes[0] en 36 + // memoria. 29 37 typedef union { 30 38 Byte bytes[4]; 31 39 uint32_t ip; 32 40 } IPv4; 33 41 42 + // Enchufe. 34 43 typedef struct { 35 44 FD fd; 36 45 struct sockaddr_in addr; 37 46 socklen_t addrlen; 38 47 } Enchufe; 39 48 49 + // Receptaculo. 40 50 typedef struct { 41 51 struct sockaddr_in addr; 42 52 socklen_t addrlen; 43 53 } Receptaculo; 44 54 55 + // Crea un file descriptor nuevo para un enchufe. 45 56 inline FD nuevo() { 46 57 FD fd = socket(PF_INET, SOCK_STREAM, 0); 47 58 try (fd); 48 59 return fd; 49 60 } 50 61 62 + // Crea un receptaculo. 51 63 inline Receptaculo receptaculo(IPv4 ip, Port port) { 52 64 struct sockaddr_in name = { 53 65 .sin_family = AF_INET, ··· 62 74 }; 63 75 } 64 76 77 + // Coge un file descriptor y un receptaculo y los junta. En otras palabras, los 78 + // aplasta. Un enchufe es basicamente, un file descriptor con un IP. 65 79 inline Enchufe aplasta(FD fd, Receptaculo rec) { 66 80 return (Enchufe){ 67 81 .fd = fd, ··· 70 84 }; 71 85 } 72 86 87 + // Esta funcion te crea un enchufe. 73 88 Enchufe enchufa(IPv4 ip, Port port); 89 + 90 + // Esta funcion crea la conexion desde tu computadora hasta donde sea que este 91 + // el enchufe. 74 92 void conecta(Enchufe enchufe); 93 + 94 + // Esta funcion amarra la direccion de IP que se le dio al enchufe, al file 95 + // descriptor. Hay casos donde no vas a querer que esten amarrados, como cuando 96 + // no te importa la direccion que tendra un cliente conectandose a un servidor, 97 + // por eso el default es que la funcion enchufa(ip, port) no amarre el file 98 + // descriptor al puerto. 75 99 void amarra(Enchufe enchufe); 100 + 101 + // Le deja saber al enchufe cuantas conexiones se pueden hacer. El default es 102 + // que no se puedan hacer conexiones. Asi que si estas codificando un servidor, 103 + // tienes que llamar esta funcion. 76 104 void escucha(Enchufe enchufe, size_t len); 105 + 106 + // Esta funcion bloquea el thread hasta que un cliente se conecte. Devuelve el 107 + // enchufe del cliente para poder comunicarse con el cliente. Tienes que 108 + // desenchufarlo cuando termines la direccion. 77 109 Enchufe acepta(Enchufe enchufe); 110 + 111 + // Envia un buffer de bytes a un cliente. 78 112 void zumba(Enchufe enchufe, Buffer in_buf); 113 + 114 + // Recibe un buffer de bytes de un cliente. Devuelve la cantidad de bytes que se 115 + // leyeron. Si devuelve 0, entonces el cliente cerro la conexion. 79 116 size_t recibe(Enchufe enchufe, Buffer out_buf); 117 + 118 + // Esta funcion se encarga de liberar los recursos que ocupan los echufes. 80 119 void desenchufa(Enchufe enchufe); 81 120 82 121 #endif // ENCHUFE_H_ header
+16 -6
src/lib/lib.c
··· 21 21 if (str.len != calculated_len) { 22 22 log(ERROR, "%s:%d String's length (%zu) is not equal to given length (%zu).", __FILE__, __LINE__, calculated_len, str.len); 23 23 24 - printf("\n"); 24 + printf("\nBuffer contains: "); 25 25 for (size_t i = 0; i < max_len; ++i) printf("[%d] ", str.buf[i]); 26 26 printf("\n"); 27 27 ··· 30 30 if (str.len > max_len) { 31 31 log(ERROR, "%s:%d String's length (%zu) is larger than the buffer that contains it (%zu).\n", str.len, calculated_len); 32 32 33 - printf("\n"); 33 + printf("\nBuffer contains: "); 34 34 for (size_t i = 0; i < max_len; ++i) printf("[%d] ", str.buf[i]); 35 35 printf("\n"); 36 36 ··· 39 39 return str; 40 40 } 41 41 42 - // Convertr the buffer received from a socket into an array of processes. The 42 + // Converts the buffer received from a socket into an array of processes. The 43 43 // user must free this memory. 44 44 Procs deserialize(Buffer out_buf, size_t msg_len) { 45 45 Procs procs = { ··· 48 48 }; 49 49 exists(procs.procs); 50 50 51 + // This loop will continue until all Proc's have been deserialized. 51 52 size_t buf_idx = 0; 52 53 for (size_t j = 0; buf_idx < msg_len; ++j) { 53 54 // Reallocate new Proc if more than one proc was received. ··· 64 65 size_t len = *(size_t*)(curr + sizeof(Time)); 65 66 Byte* str_buf = curr + sizeof(Time) + sizeof(size_t); 66 67 68 + // Validate the string in the buffer. 67 69 Buffer program = validate_str((Buffer){.len = len, .buf = str_buf}, msg_len - buf_idx); 68 70 71 + // Insert everything into the proc list. 69 72 procs.procs[j] = (Proc){ 70 73 .time = time, 71 74 .program = bufcpy(program), ··· 77 80 return procs; 78 81 } 79 82 83 + // This function turns a proc into a buffer. In order to do that, this function 84 + // reinterprets everything on the proc as a sequence of bytes. 80 85 Buffer serialize(Proc proc) { 86 + // First, determine how long the buffer has to be. 81 87 size_t len = sizeof(Time) + sizeof(size_t) + proc.program.len + 1; 88 + 89 + // Allocate the bytes in the buffer. 82 90 Buffer buf = { 83 91 .len = len, 84 92 .buf = (Byte*)calloc(len, sizeof(Byte)), 85 93 }; 86 94 exists(buf.buf); 87 95 96 + // Copy everything in the buffer. 88 97 memcpy((void*)buf.buf, (void*)&proc.time, sizeof(Time)); 89 98 memcpy((void*)(buf.buf + sizeof(Time)), (void*)&proc.program.len, sizeof(size_t)); 90 99 memcpy((void*)(buf.buf + sizeof(Time) + sizeof(size_t)), (void*)proc.program.buf, proc.program.len); 91 100 return buf; 92 101 } 93 102 103 + // This function takes a string representing an IPv4 address and converts it 104 + // into an IPv4 type. 94 105 IPv4 parse_address(const char* str) { 95 - size_t len = strlen(str); 96 - 97 - assert(len <= 15 && "What you have entered cannot possibly be an IP address."); 106 + size_t len = safe_strlen(str, 15); 98 107 99 108 IPv4 ip = {0}; 100 109 size_t curr_byte = 0; ··· 109 118 return ip; 110 119 } 111 120 121 + // uses memchr to calculate strlen. 112 122 size_t safe_strlen(const char* str, size_t max_len) { 113 123 return (size_t)memchr(str, '\0', max_len) - (size_t)str; 114 124 }
+17 -2
src/lib/lib.h
··· 3 3 #include "enchufe.h" 4 4 #include <assert.h> 5 5 6 + // Macro para detectar si un pointer es NULL. Usa esta funcion si prefieres 7 + // crashear el programa cuando encuetras un puntero NULL. 6 8 #define exists(a) do { \ 7 9 if ((a) == NULL) { \ 8 10 fprintf(stderr, "[ERROR]: %s:%d Null pointer encountered, %s\n", __FILE__, __LINE__, strerror(errno)); \ ··· 10 12 } \ 11 13 } while(0) 12 14 15 + // Esto es necesario por si prefieres que Proc aguante una unidad de tiempo 16 + // distinta. 13 17 typedef Byte Time; 14 18 19 + // Esto sera lo que se envia y recibe por el socket. 15 20 typedef struct { 16 21 Time time; 17 22 Buffer program; 18 23 } Proc; 19 24 25 + // Esto es para crear una lista dinamica de Procs. 20 26 typedef struct { 21 27 Proc* procs; 22 28 size_t len; 23 29 } Procs; 24 30 25 - typedef Buffer SafeStr; 31 + // Esta funcion convierte un buffer en una lista de Proc's. 32 + Procs deserialize(Buffer out_buf, size_t msg_len); 26 33 27 - Procs deserialize(Buffer out_buf, size_t msg_len); 34 + // Esta funcion convierte un Proc en un buffer. 28 35 Buffer serialize(Proc); 36 + 37 + // Esta funcion convierte un string que representa un IPv4 en un IPv4. 29 38 IPv4 parse_address(const char* str); 39 + 40 + // Esta funcion verifica que el string enviado por el socket sea valido. 30 41 Buffer validate_str(Buffer str, size_t max_len); 42 + 43 + // Esta funcion crea una copia de un buffer en memoria y lo devuelve. 31 44 Buffer bufcpy(Buffer in); 45 + 46 + // Esta funcion es mejor que strlen. 32 47 size_t safe_strlen(const char* str, size_t max_len); 33 48 34 49 #endif // LIB_H_
+1
src/lib/log.c
··· 2 2 #include <stdarg.h> 3 3 #include "log.h" 4 4 5 + // LOGGGG 5 6 void log(LogLevel level, const char* format, ...) { 6 7 FILE* out = stdout; 7 8
+3 -1
src/lib/log.h
··· 1 1 #ifndef LOG_H_ 2 2 #define LOG_H_ 3 3 4 + // ¿Cuál log prefieres hacer? 4 5 typedef enum { 5 6 INFO, 6 7 WARN, 7 8 ERROR, 8 9 } LogLevel; 9 10 10 - void log(LogLevel level, const char* format, ...); 11 + // logggg 12 + void log(LogLevel level, const char* format, ...);;;; 11 13 12 14 #endif // LOG_H_ header
+14 -1
src/lib/pq.c
··· 1 1 #include "pq.h" 2 2 #include "log.h" 3 3 4 - #include <string.h> 4 + // Just sets the head to NULL. 5 5 PQ pq_init() { 6 6 return (PQ){.head = NULL}; 7 7 } 8 8 9 + // Goes node by node freeing all the memory. 9 10 void pq_deinit(PQ* pq) { 10 11 exists(pq); 11 12 struct Node* curr = pq->head; ··· 18 19 } 19 20 } 20 21 22 + 23 + // I spent way too long trying to implement this in C. . . So I tried it in 24 + // Haskell. The Haskell implementation looks like this: 25 + // insert :: a -> [a] -> [a] 26 + // insert p [] = [p] 27 + // insert p (x:xs) = if x > p then p:x:xs else x:insert p xs 28 + // The C implementation looks like this: 21 29 Node* n_insert(Proc proc, Node* node) { 22 30 if (node == NULL) { 23 31 Node* n = malloc(sizeof(Node)); ··· 39 47 } 40 48 } 41 49 50 + // This function just calls n_insert. 42 51 void pq_insert(PQ* pq, Proc proc) { 43 52 exists(pq); 44 53 45 54 pq->head = n_insert(proc, pq->head); 46 55 } 47 56 57 + // Free node and buffer. 48 58 void pq_delete(PQ* pq) { 49 59 exists(pq); 50 60 if (pq->head != NULL) { ··· 55 65 } 56 66 } 57 67 68 + // This function creates a copy of the Proc. 58 69 Proc pq_access(PQ pq) { 59 70 exists(pq.head); 60 71 Proc proc = pq.head->proc; ··· 70 81 }; 71 82 } 72 83 84 + // This function. 73 85 void pq_print(PQ pq) { 74 86 struct Node* curr = pq.head; 75 87 printf("\n"); ··· 87 99 printf("\n"); 88 100 } 89 101 102 + // When the head is NULL the list is empty. 90 103 bool pq_empty(PQ pq) { 91 104 return pq.head == NULL; 92 105 }
+19
src/lib/pq.h
··· 3 3 #include "lib.h" 4 4 #include <stdbool.h> 5 5 6 + // Nodo que aguanta cada proceso. 6 7 struct Node { 7 8 Proc proc; 8 9 struct Node* next; 9 10 }; 11 + // Para crear un struct recursivo no se puede hacer un typedef. Aqui esta el 12 + // typedef. 10 13 typedef struct Node Node; 11 14 15 + // Funcion helper para insertar un proceso en la lista y respetar el orden de 16 + // tiempo de procesos. 12 17 Node* n_insert(Proc proc, Node* node); 13 18 19 + // La estructura para priority queue. 14 20 typedef struct { 15 21 Node* head; 16 22 } PQ; 17 23 24 + // Inicializar el prio queue. 18 25 PQ pq_init(); 26 + 27 + // La funcion para borrar el prio queue. 19 28 void pq_deinit(PQ* pq); 29 + 30 + // La funcion para insertar en el prio queue. 20 31 void pq_insert(PQ* pq, Proc proc); 32 + 33 + // la funcion para borrar el primer elemento del prio queue. 21 34 void pq_delete(PQ* pq); 35 + 36 + // La funcion para accesar el tope del prio queue. 22 37 Proc pq_access(PQ pq); 38 + 39 + // La funcion para imprimir el prio queue. 23 40 void pq_print(PQ pq); 41 + 42 + // La funcion para verificar si el prio queue esta vacio. 24 43 bool pq_empty(PQ pq); 25 44 26 45 #endif // PQ_H_