MeMOry RePlacement alGorithms
1import sys
2from typing import NamedTuple, Callable
3
4# Folding function.
5def fold[T, U](f: Callable[[U, T], U], z: U, l: list[T]) -> U:
6 # This is a simple for loop that accumulates a value and then returns it.
7 for x in l:
8 z = f(z, x)
9 return z
10
11# This is the state of the memory.
12class State(NamedTuple):
13 faults: int
14 hits: int
15 order: dict[int, list[int]]
16 page_table: set[int]
17 physical_capacity: int
18
19 # Increase hits by 1.
20 def hit(s):
21 return State(s.faults, s.hits + 1, s.order, s.page_table, s.physical_capacity)
22
23 # Increase faults by 1.
24 def fault(s):
25 return State(s.faults + 1, s.hits, s.order, s.page_table, s.physical_capacity)
26
27 # Updates the indices in the state.
28 def update(s):
29 # This is the recursive function that decreases the indices.
30 def rec(order: dict[int, list[int]], page: int) -> dict[int, list[int]]:
31 # If the page appears again:
32 if order[page] != []:
33 # Decrease all of the indices
34 tmp = [x - 1 for x in order[page]]
35 # If the first index is 0, skip it
36 if tmp[0] == 0:
37 return order | {page: tmp[1:]}
38 # Else, just use the decreased indices
39 else:
40 return order | {page: tmp}
41 # In the case where the page doesn't appear again, just return the
42 # order.
43 else:
44 return order
45 return State(s.faults, s.hits, fold(rec, s.order.copy(), s.order), s.page_table, s.physical_capacity)
46
47 # Frees up a spot in the page table depending on the order.
48 def evict(s):
49 # These are the candidates to get removed.
50 opts = [(page, s.order[page]) for page in s.page_table]
51
52 # If some of the candidates don't appear again, simply remove them from
53 # the page table.
54 empty = [a for a in opts if a[1] == []]
55 if empty != []:
56 page = empty[0][0]
57 return State(s.faults, s.hits, s.order, s.page_table - {page}, s.physical_capacity)
58 else:
59 # Otherwise, pick the candidate that appears last.
60 page, lst = sorted([a for a in opts if a[1] != []], key = lambda x: x[1], reverse = True)[0]
61 return State(s.faults, s.hits, s.order | {page: lst[1:]}, s.page_table - {page}, s.physical_capacity)
62
63 # Inserts a page into the page table.
64 def insert(s, page: int):
65 return State(s.faults, s.hits, s.order, s.page_table | {page}, s.physical_capacity)
66
67# This is the algorithm that is used to calculate the faults and hits.
68def optimal(state: State, access: tuple[str,int]) -> State:
69 # The read
70 page = access[1]
71 # If the page is in the page table, then this is a page hit.
72 if page in state.page_table:
73 return state.update().hit()
74 else:
75 # Otherwise this is a page fault, and depending on whether the page
76 # table is full or not, some space has to be made.
77 if len(state.page_table) == state.physical_capacity:
78 return state.evict().insert(page).update().fault()
79 else:
80 return state.insert(page).update().fault()
81
82# This function calculates when they next page will be loaded into physical
83# memory. The existence and reliance on this function in order to execute the
84# rest of the algorithm is what make this algorithm impossible to implement.
85def preprocess(accesses: list[tuple[str,int]]) -> dict[int,list[int]]:
86 # The accumulating function: if the page is not in the pages that have been
87 # seen, initialize it with 0. Otherwise, add the newest index to it.
88 def f(seen, x):
89 index, (_, page) = x
90 if page not in seen:
91 return seen | {page: []}
92 else:
93 return seen | {page: seen[page] + [index]}
94
95 # Fold over the accesses with the accumulating function.
96 return fold(f, dict(), list(enumerate(accesses)))
97
98
99def main():
100 # Please provide the correct number of arguments.
101 if len(sys.argv) != 3:
102 print("USAGE:", sys.argv[0], "number_of_pages", "file_path")
103 print("\twhere:\n\t\tnumber_of_pages : int\n\t\tfile_path : string")
104
105 # The file path is the second argument.
106 file_path = sys.argv[2]
107
108 # This opens the file and parses the accesses.
109 accesses = []
110 with open(file_path) as f:
111 accesses = [(x.split(":")[0], int(x.split(":")[1])) for x in f.read().split()]
112
113 # The limit is the first argument.
114 limit = int(sys.argv[1])
115 # The result of this computation will be a fold over the accesses, with the
116 # the following state and using the optimal algorithm.
117 res = fold(optimal,
118 State(faults = 0, hits = 0, order = preprocess(accesses), page_table = set(), physical_capacity = limit),
119 accesses)
120
121 # Print the results!
122 print("Faults:", res.faults, "\nHits:", res.hits)
123
124main()