MeMOry RePlacement alGorithms
1import sys
2from typing import NamedTuple
3
4# Folding function.
5def fold(f, z, l):
6 # This is a simple for loop that accumulates a value and then returns it.
7 for x in l:
8 z = f(z, x)
9 return z
10
11# This holds information about the page.
12class Info(NamedTuple):
13 age: int
14 read: bool
15 write: bool
16
17# This class manages the state of physical/virtual memory.
18class State(NamedTuple):
19 faults: int
20 hits: int
21 order: list[int]
22 page_table: dict[int, Info]
23 physical_capacity: int
24 tau: int
25
26 # Increase hits by 1.
27 def hit(s):
28 return State(s.faults, s.hits + 1, s.order, s.page_table, s.physical_capacity, s.tau)
29
30 # Increase faults by 1.
31 def fault(s):
32 return State(s.faults + 1, s.hits, s.order, s.page_table, s.physical_capacity, s.tau)
33
34 def update(s):
35 # Focus on the list comprehension here. The list comprehension is simply
36 # increasing the age of each page by 1.
37 return State(s.faults, s.hits, s.order, {k: Info(v.age + 1, v.read, v.write) for k, v in s.page_table.items()}, s.physical_capacity, s.tau)
38
39 # Change the mode on a page.
40 def modify(s, page, read, write):
41 return State(s.faults, s.hits, s.order, s.page_table | {page: Info(s.page_table[page].age, read, write)}, s.physical_capacity, s.tau)
42
43 # Make space in physical memory depending on the order.
44 def evict(s):
45 def find(page_table, order, cur, done):
46 if done == len(order):
47 return cur, order
48 else:
49 if page_table[cur].age < page_table[order[0]].age:
50 return cur, order
51 else:
52 return find(page_table, order[1:] + [order[0]], order[0], done + 1)
53 def rec(page_table, order, page, tau, original_page):
54 age, read, write = page_table[page]
55 # If the page wasn't read, check if it's old.
56 if not read:
57 # When the page is old and not read, return it.
58 if age > tau:
59 # INFO: Usually you would check if the page was written to
60 # and write it to disk here.
61 return page, order
62 # When the page is new but not read, skip it.
63 else:
64 # In the case where all of the pages are new but not read,
65 # find the oldest and return it.
66 if original_page == page:
67 return find(page_table, order, page, 0)
68 # Skip the current page when it's new and not read.
69 return rec(page_table, order[1:] + [order[0]], order[0], tau, original_page)
70 else:
71 # If the page was read, set its R bit to 0 and continue.
72 return rec(page_table | {page: Info(age, False, write)}, order[1:] + [order[0]], order[0], tau, original_page)
73
74 assert False
75
76 last_page, new_order = rec(s.page_table, s.order[1:] + [s.order[0]], s.order[0], s.tau)
77 # Finally, return the new state, without that page.
78 return State(s.faults, s.hits, new_order[:-1], {k: v for k, v in s.page_table.items() if k != last_page}, s.physical_capacity)
79
80 # This function simply inserts a page into the page table.
81 def insert(s, page):
82 return State(s.faults, s.hits, s.order + [page], s.page_table | {page: Info(-1, True, False)}, s.physical_capacity, s.tau)
83
84# This is the algorithm for second chance page replacement.
85def wsclock(state, access):
86 mode, page = access
87 # If the page is in the page table, that is a page hit.
88 if page in state.page_table:
89 # Also, the page's R bit must be set to 1.
90 return state.modify(page, True, mode == "W").update().hit()
91 else:
92 # If there isn't any space in the page table, call make_space()
93 if len(state.page_table) == state.physical_capacity:
94 return state.evict().insert(page).update().fault()
95 else:
96 # If there is space in the page table, we can simply insert the page.
97 return state.insert(page).update().fault()
98
99def main():
100 # The user must provide the physical capacity and the file path to the
101 # access sequence.
102 if len(sys.argv) != 4:
103 print("USAGE:", sys.argv[0], "number_of_pages", "tau", "file_path")
104 print("\twhere:\n\t\tnumber_of_pages : int\n\t\ttau : int\n\t\tfile_path : string")
105
106 # The second argument is the file path.
107 file_path = sys.argv[3]
108
109 # This simply parses the access sequence file.
110 accesses = []
111 with open(file_path) as f:
112 accesses = [(x.split(":")[0], int(x.split(":")[1])) for x in f.read().split()]
113
114 # The physical capacity is the first argument.
115 limit = int(sys.argv[1])
116 tau = int(sys.argv[2])
117 # The result is simply the evaluation of the second algorithm on the access
118 # sequence.
119 res = fold(wsclock, State(0, 0, [], dict(), limit, tau), accesses)
120
121 # Print the result.
122 print("Faults:", res.faults, "\nHits:", res.hits)
123
124main()