import sys from typing import NamedTuple, Callable # Folding function. def fold[T, U](f: Callable[[U, T], U], z: U, l: list[T]) -> U: # This is a simple for loop that accumulates a value and then returns it. for x in l: z = f(z, x) return z # This holds information about the page. class Info(NamedTuple): age: int read: bool write: bool # This class manages the state of physical/virtual memory. class State(NamedTuple): faults: int hits: int order: list[int] page_table: dict[int, Info] physical_capacity: int tau: int # Increase hits by 1. def hit(s): return State(s.faults, s.hits + 1, s.order, s.page_table, s.physical_capacity, s.tau) # Increase faults by 1. def fault(s): return State(s.faults + 1, s.hits, s.order, s.page_table, s.physical_capacity, s.tau) def update(s): # Focus on the list comprehension here. The list comprehension is simply # increasing the age of each page by 1. return State(s.faults, s.hits, s.order, {k: Info(v.age + 1, v.read, v.write) for k, v in s.page_table.items()}, s.physical_capacity, s.tau) # Change the mode on a page. def modify(s, page: int, read: bool, write: bool): return State(s.faults, s.hits, s.order, s.page_table | {page: Info(s.page_table[page].age, read, write)}, s.physical_capacity, s.tau) # Make space in physical memory depending on the order. def evict(s): def find(page_table: dict[int, Info], order: list[int], cur: int, done: int) -> tuple[int, list[int]]: if done == len(order): return cur, order else: if page_table[cur].age < page_table[order[0]].age: return cur, order else: return find(page_table, order[1:] + [order[0]], order[0], done + 1) def rec(page_table: dict[int, Info], order: list[int], page: int, tau: int, original_page: int) -> tuple[int, list[int]]: age, read, write = page_table[page] # If the page wasn't read, check if it's old. if not read: # When the page is old and not read, return it. if age > tau: # INFO: Usually you would check if the page was written to # and write it to disk here. return page, order # When the page is new but not read, skip it. else: # In the case where all of the pages are new but not read, # find the oldest and return it. if original_page == page: return find(page_table, order, page, 0) # Skip the current page when it's new and not read. return rec(page_table, order[1:] + [order[0]], order[0], tau, original_page) else: # If the page was read, set its R bit to 0 and continue. return rec(page_table | {page: Info(age, False, write)}, order[1:] + [order[0]], order[0], tau, original_page) assert False last_page, new_order = rec(s.page_table, s.order[1:] + [s.order[0]], s.order[0], s.tau) # Finally, return the new state, without that page. return State(s.faults, s.hits, new_order[:-1], {k: v for k, v in s.page_table.items() if k != last_page}, s.physical_capacity) # This function simply inserts a page into the page table. def insert(s, page: int): return State(s.faults, s.hits, s.order + [page], s.page_table | {page: Info(-1, True, False)}, s.physical_capacity, s.tau) # This is the algorithm for second chance page replacement. def wsclock(state: State, access: tuple[str,int]) -> State: mode, page = access # If the page is in the page table, that is a page hit. if page in state.page_table: # Also, the page's R bit must be set to 1. return state.modify(page, True, mode == "W").update().hit() else: # If there isn't any space in the page table, call make_space() if len(state.page_table) == state.physical_capacity: return state.evict().insert(page).update().fault() else: # If there is space in the page table, we can simply insert the page. return state.insert(page).update().fault() def main(): # The user must provide the physical capacity and the file path to the # access sequence. if len(sys.argv) != 4: print("USAGE:", sys.argv[0], "number_of_pages", "tau", "file_path") print("\twhere:\n\t\tnumber_of_pages : int\n\t\ttau : int\n\t\tfile_path : string") # The second argument is the file path. file_path = sys.argv[3] # This simply parses the access sequence file. accesses = [] with open(file_path) as f: accesses = [(x.split(":")[0], int(x.split(":")[1])) for x in f.read().split()] # The physical capacity is the first argument. limit = int(sys.argv[1]) tau = int(sys.argv[2]) # The result is simply the evaluation of the second algorithm on the access # sequence. res = fold(wsclock, State(0, 0, [], dict(), limit, tau), accesses) # Print the result. print("Faults:", res.faults, "\nHits:", res.hits) main()