| import os |
| import io |
| from contextlib import redirect_stdout |
| import pickle |
| import regex |
| import copy |
| from typing import Any, Dict, Optional |
| import multiprocess |
| from pebble import ProcessPool |
| from concurrent.futures import TimeoutError |
| from functools import partial |
| import traceback |
| from timeout_decorator import timeout |
|
|
|
|
| class GenericRuntime: |
| GLOBAL_DICT = {} |
| LOCAL_DICT = None |
| HEADERS = [] |
|
|
| def __init__(self): |
| self._global_vars = copy.copy(self.GLOBAL_DICT) |
| self._local_vars = copy.copy(self.LOCAL_DICT) if self.LOCAL_DICT else None |
|
|
| for c in self.HEADERS: |
| self.exec_code(c) |
|
|
| def exec_code(self, code_piece: str) -> None: |
| if regex.search(r"(\s|^)?input\(", code_piece) or regex.search( |
| r"(\s|^)?os.system\(", code_piece |
| ): |
| raise RuntimeError() |
| exec(code_piece, self._global_vars) |
|
|
| def eval_code(self, expr: str) -> Any: |
| return eval(expr, self._global_vars) |
|
|
| def inject(self, var_dict: Dict[str, Any]) -> None: |
| for k, v in var_dict.items(): |
| self._global_vars[k] = v |
|
|
| @property |
| def answer(self): |
| return self._global_vars["answer"] |
|
|
|
|
| class PythonExecutor: |
| def __init__( |
| self, |
| runtime: Optional[Any] = None, |
| get_answer_symbol: Optional[str] = None, |
| get_answer_expr: Optional[str] = None, |
| get_answer_from_stdout: bool = False, |
| ) -> None: |
| self.runtime = runtime if runtime else GenericRuntime() |
| self.answer_symbol = get_answer_symbol |
| self.answer_expr = get_answer_expr |
| self.get_answer_from_stdout = get_answer_from_stdout |
|
|
| def process_generation_to_code(self, gens: str): |
| batch_code = [] |
| for g in gens: |
| multiline_comments = False |
| code = [] |
| for line in g.split("\n"): |
| strip_line = line.strip() |
| if strip_line.startswith("#"): |
| line = line.split("#", 1)[0] + "# comments" |
| elif ( |
| not multiline_comments |
| and strip_line.startswith('"""') |
| and strip_line.endswith('"""') |
| and len(strip_line) >= 6 |
| ): |
| line = line.split('"""', 1)[0] + '"""comments"""' |
| elif not multiline_comments and strip_line.startswith('"""'): |
| multiline_comments = True |
| elif multiline_comments and strip_line.endswith('"""'): |
| multiline_comments = False |
| line = "" |
| if not multiline_comments: |
| code.append(line) |
| batch_code.append(code) |
| return batch_code |
|
|
| @staticmethod |
| def execute( |
| code, |
| get_answer_from_stdout=None, |
| runtime=None, |
| answer_symbol=None, |
| answer_expr=None, |
| timeout_length=10, |
| ): |
| try: |
| if get_answer_from_stdout: |
| program_io = io.StringIO() |
| with redirect_stdout(program_io): |
| timeout(timeout_length)(runtime.exec_code)("\n".join(code)) |
| program_io.seek(0) |
| result = "".join(program_io.readlines()) |
| elif answer_symbol: |
| timeout(timeout_length)(runtime.exec_code)("\n".join(code)) |
| result = runtime._global_vars[answer_symbol] |
| elif answer_expr: |
| timeout(timeout_length)(runtime.exec_code)("\n".join(code)) |
| result = timeout(timeout_length)(runtime.eval_code)(answer_expr) |
| else: |
| timeout(timeout_length)(runtime.exec_code)("\n".join(code[:-1])) |
| result = timeout(timeout_length)(runtime.eval_code)(code[-1]) |
| concise_exec_info = "" |
| exec_info = "" |
| str(result) |
| pickle.dumps(result) |
| except: |
| |
| result = "" |
| concise_exec_info = traceback.format_exc().split("\n")[-2] |
| exec_info = traceback.format_exc() |
| if ( |
| get_answer_from_stdout |
| and "exec(code_piece, self._global_vars)" in exec_info |
| ): |
| exec_info = exec_info.split("exec(code_piece, self._global_vars)")[ |
| -1 |
| ].strip() |
| msg = [] |
| for line in exec_info.split("\n"): |
| patt = regex.search( |
| r'(?P<start>.*)File "(?P<file>.*)", line (?P<lno>\d+), (?P<end>.*)', |
| line, |
| ) |
| if patt is not None: |
| if "<module>" in patt.group("end"): |
| continue |
| fname = patt.group("file") |
| if "site-packages" in fname: |
| fname = f"site-packages{fname.split('site-packages', 1)[1]}" |
| line = f'{patt.group("start")}File "{fname}", {patt.group("end")}' |
| else: |
| line = f'{patt.group("start")}{patt.group("end")}' |
| else: |
| patt = regex.search( |
| r"(?P<start>.*)(?P<file>/.*site-packages/.*\.py)(?P<end>.*)", |
| line, |
| ) |
| if patt is not None: |
| line = f'{patt.group("start")}site-packages{patt.group("file").split("site-packages", 1)[1]}{patt.group("end")}' |
| msg.append(line) |
| exec_info = "\n".join(msg) |
| return result, concise_exec_info, exec_info |
|
|
| def apply(self, code): |
| return self.batch_apply([code])[0] |
|
|
| def batch_apply(self, batch_code): |
| all_code_snippets = self.process_generation_to_code(batch_code) |
| all_exec_results = [] |
| executor = partial( |
| self.execute, |
| get_answer_from_stdout=self.get_answer_from_stdout, |
| runtime=self.runtime, |
| answer_symbol=self.answer_symbol, |
| answer_expr=self.answer_expr, |
| timeout_length=10, |
| ) |
| with ProcessPool(max_workers=multiprocess.cpu_count()) as pool: |
| iterator = pool.map(executor, all_code_snippets, timeout=10).result() |
|
|
| while True: |
| try: |
| result = next(iterator) |
| all_exec_results.append(result) |
| except StopIteration: |
| break |
| except TimeoutError as error: |
| all_exec_results.append(("", "Timeout Error", "Timeout Error")) |
| except Exception as error: |
| print(error) |
| exit() |
|
|
| batch_results = [] |
| for code, (result, concise_exec_info, exec_info) in zip( |
| all_code_snippets, all_exec_results |
| ): |
| metadata = { |
| "code": code, |
| "exec_result": result, |
| "concise_exec_info": concise_exec_info, |
| "exec_info": exec_info, |
| } |
| batch_results.append((result, metadata)) |
| return batch_results |
|
|