Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/gabriel/utils/openai_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3521,11 +3521,24 @@ async def get_all_responses(
df[col] = pd.NA
if reasoning_summary is None and "Reasoning Summary" in df.columns:
df = df.drop(columns=["Reasoning Summary"])
# Only skip identifiers that previously succeeded so failures can be retried
# Only skip identifiers that previously succeeded so failures can be retried.
# Older/hand-edited checkpoints can persist booleans as strings ("True",
# "true", "1", etc.). Normalize those values so resume works reliably.
if "Successful" in df.columns:
done = set(df.loc[df["Successful"] == True, "Identifier"])
success_raw = df["Successful"]
success_mask = pd.Series(False, index=df.index)
with contextlib.suppress(Exception):
success_mask = success_mask | success_raw.astype("boolean").fillna(False)
string_mask = (
success_raw.astype(str)
.str.strip()
.str.lower()
.isin({"true", "1", "yes", "y", "completed", "succeeded", "success"})
)
success_mask = success_mask | string_mask
done = set(df.loc[success_mask, "Identifier"].astype(str))
else:
done = set(df["Identifier"])
done = set(df["Identifier"].astype(str))
if message_verbose:
print(f"Loaded {len(df):,} rows; {len(done):,} already marked complete.")
else:
Expand All @@ -3545,7 +3558,7 @@ async def get_all_responses(
cols.insert(7, "Reasoning Summary")
df = pd.DataFrame(columns=cols)
done = set()
written_identifiers: Set[Any] = set(df["Identifier"]) if not df.empty else set()
written_identifiers: Set[str] = set(df["Identifier"].astype(str)) if not df.empty else set()
# Helper to calculate and report final run cost
def _report_cost() -> None:
nonlocal df
Expand Down Expand Up @@ -3574,7 +3587,7 @@ def _report_cost() -> None:
print(msg)
logger.info(msg)
# Filter prompts/identifiers based on what is already completed
todo_pairs = [(p, i) for p, i in zip(prompts, identifiers) if i not in done]
todo_pairs = [(p, i) for p, i in zip(prompts, identifiers) if str(i) not in done]
if not todo_pairs:
_report_cost()
return df
Expand Down Expand Up @@ -3838,7 +3851,7 @@ def _append_results(rows: List[Dict[str, Any]]) -> None:
batch_df = pd.DataFrame(rows)
if "Web Search Sources" not in batch_df.columns:
batch_df["Web Search Sources"] = pd.NA
batch_df = batch_df[~batch_df["Identifier"].isin(written_identifiers)]
batch_df = batch_df[~batch_df["Identifier"].astype(str).isin(written_identifiers)]
if batch_df.empty:
return
to_save = batch_df.copy()
Expand All @@ -3857,7 +3870,7 @@ def _append_results(rows: List[Dict[str, Any]]) -> None:
df = batch_df.reset_index(drop=True)
else:
df = pd.concat([df, batch_df], ignore_index=True)
written_identifiers.update(batch_df["Identifier"])
written_identifiers.update(batch_df["Identifier"].astype(str))

client = _get_client(base_url)
# Load existing state
Expand Down Expand Up @@ -4898,7 +4911,7 @@ async def flush() -> None:
batch_df = pd.DataFrame(results)
if "Web Search Sources" not in batch_df.columns:
batch_df["Web Search Sources"] = pd.NA
batch_df = batch_df[~batch_df["Identifier"].isin(written_identifiers)]
batch_df = batch_df[~batch_df["Identifier"].astype(str).isin(written_identifiers)]
if not batch_df.empty:
to_save = batch_df.copy()
for col in ("Response", "Error Log", "Web Search Sources"):
Expand All @@ -4916,7 +4929,7 @@ async def flush() -> None:
df = batch_df.reset_index(drop=True)
else:
df = pd.concat([df, batch_df], ignore_index=True)
written_identifiers.update(batch_df["Identifier"])
written_identifiers.update(batch_df["Identifier"].astype(str))
results = []
if logger.isEnabledFor(logging.INFO) and processed:
logger.info(
Expand Down
33 changes: 33 additions & 0 deletions tests/test_reset_files.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import pandas as pd
from gabriel.utils import openai_utils


Expand All @@ -22,3 +23,35 @@ def test_get_all_responses_reset_files(tmp_path):
)
)
assert set(df["Identifier"]) == {"2"}


def test_resume_treats_string_success_values_as_completed(tmp_path):
save_path = tmp_path / "out.csv"
pd.DataFrame(
{
"Identifier": ["1", "2", "3"],
"Response": ["[]", "[]", "[]"],
"Web Search Sources": ["[]", "[]", "[]"],
"Time Taken": [0.1, 0.1, 0.1],
"Input Tokens": [1, 1, 1],
"Reasoning Tokens": [0, 0, 0],
"Output Tokens": [1, 1, 1],
"Reasoning Effort": ["default", "default", "default"],
"Successful": ["True", "true", "1"],
"Error Log": ["[]", "[]", "[]"],
"Response IDs": ["[]", "[]", "[]"],
"Reasoning Summary": ["", "", ""],
}
).to_csv(save_path, index=False)

df = asyncio.run(
openai_utils.get_all_responses(
prompts=["a", "b", "c"],
identifiers=["1", "2", "3"],
save_path=str(save_path),
use_dummy=True,
reset_files=False,
)
)

assert len(df) == 3
Loading