MeMOry RePlacement alGorithms
at main 4.9 kB view raw
1import sys 2from typing import NamedTuple, Callable 3 4# Folding function. 5def fold[T, U](f: Callable[[U, T], U], z: U, l: list[T]) -> U: 6 # This is a simple for loop that accumulates a value and then returns it. 7 for x in l: 8 z = f(z, x) 9 return z 10 11# This is the state of the memory. 12class State(NamedTuple): 13 faults: int 14 hits: int 15 order: dict[int, list[int]] 16 page_table: set[int] 17 physical_capacity: int 18 19 # Increase hits by 1. 20 def hit(s): 21 return State(s.faults, s.hits + 1, s.order, s.page_table, s.physical_capacity) 22 23 # Increase faults by 1. 24 def fault(s): 25 return State(s.faults + 1, s.hits, s.order, s.page_table, s.physical_capacity) 26 27 # Updates the indices in the state. 28 def update(s): 29 # This is the recursive function that decreases the indices. 30 def rec(order: dict[int, list[int]], page: int) -> dict[int, list[int]]: 31 # If the page appears again: 32 if order[page] != []: 33 # Decrease all of the indices 34 tmp = [x - 1 for x in order[page]] 35 # If the first index is 0, skip it 36 if tmp[0] == 0: 37 return order | {page: tmp[1:]} 38 # Else, just use the decreased indices 39 else: 40 return order | {page: tmp} 41 # In the case where the page doesn't appear again, just return the 42 # order. 43 else: 44 return order 45 return State(s.faults, s.hits, fold(rec, s.order.copy(), s.order), s.page_table, s.physical_capacity) 46 47 # Frees up a spot in the page table depending on the order. 48 def evict(s): 49 # These are the candidates to get removed. 50 opts = [(page, s.order[page]) for page in s.page_table] 51 52 # If some of the candidates don't appear again, simply remove them from 53 # the page table. 54 empty = [a for a in opts if a[1] == []] 55 if empty != []: 56 page = empty[0][0] 57 return State(s.faults, s.hits, s.order, s.page_table - {page}, s.physical_capacity) 58 else: 59 # Otherwise, pick the candidate that appears last. 60 page, lst = sorted([a for a in opts if a[1] != []], key = lambda x: x[1], reverse = True)[0] 61 return State(s.faults, s.hits, s.order | {page: lst[1:]}, s.page_table - {page}, s.physical_capacity) 62 63 # Inserts a page into the page table. 64 def insert(s, page: int): 65 return State(s.faults, s.hits, s.order, s.page_table | {page}, s.physical_capacity) 66 67# This is the algorithm that is used to calculate the faults and hits. 68def optimal(state: State, access: tuple[str,int]) -> State: 69 # The read 70 page = access[1] 71 # If the page is in the page table, then this is a page hit. 72 if page in state.page_table: 73 return state.update().hit() 74 else: 75 # Otherwise this is a page fault, and depending on whether the page 76 # table is full or not, some space has to be made. 77 if len(state.page_table) == state.physical_capacity: 78 return state.evict().insert(page).update().fault() 79 else: 80 return state.insert(page).update().fault() 81 82# This function calculates when they next page will be loaded into physical 83# memory. The existence and reliance on this function in order to execute the 84# rest of the algorithm is what make this algorithm impossible to implement. 85def preprocess(accesses: list[tuple[str,int]]) -> dict[int,list[int]]: 86 # The accumulating function: if the page is not in the pages that have been 87 # seen, initialize it with 0. Otherwise, add the newest index to it. 88 def f(seen, x): 89 index, (_, page) = x 90 if page not in seen: 91 return seen | {page: []} 92 else: 93 return seen | {page: seen[page] + [index]} 94 95 # Fold over the accesses with the accumulating function. 96 return fold(f, dict(), list(enumerate(accesses))) 97 98 99def main(): 100 # Please provide the correct number of arguments. 101 if len(sys.argv) != 3: 102 print("USAGE:", sys.argv[0], "number_of_pages", "file_path") 103 print("\twhere:\n\t\tnumber_of_pages : int\n\t\tfile_path : string") 104 105 # The file path is the second argument. 106 file_path = sys.argv[2] 107 108 # This opens the file and parses the accesses. 109 accesses = [] 110 with open(file_path) as f: 111 accesses = [(x.split(":")[0], int(x.split(":")[1])) for x in f.read().split()] 112 113 # The limit is the first argument. 114 limit = int(sys.argv[1]) 115 # The result of this computation will be a fold over the accesses, with the 116 # the following state and using the optimal algorithm. 117 res = fold(optimal, 118 State(faults = 0, hits = 0, order = preprocess(accesses), page_table = set(), physical_capacity = limit), 119 accesses) 120 121 # Print the results! 122 print("Faults:", res.faults, "\nHits:", res.hits) 123 124main()