-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprocess_newspaper_images.py
More file actions
118 lines (95 loc) · 4.23 KB
/
process_newspaper_images.py
File metadata and controls
118 lines (95 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python3
import os
import glob
import json
from function_modules.pipeline import NewspaperPipeline
from typing import Optional, Tuple
def find_latest_pipeline_state() -> Tuple[Optional[str], Optional[dict]]:
"""
Find the most recent pipeline state file.
Returns:
Tuple of (pipeline_id, state_data) or (None, None) if no state exists
"""
state_dir = os.path.join('data', 'output', 'pipeline_state')
if not os.path.exists(state_dir):
return None, None
state_files = glob.glob(os.path.join(state_dir, 'pipeline_*.json'))
if not state_files:
return None, None
# Get the most recent state file
latest_state = max(state_files, key=os.path.getmtime)
pipeline_id = os.path.splitext(os.path.basename(latest_state))[0]
try:
with open(latest_state, 'r') as f:
state_data = json.load(f)
return pipeline_id, state_data
except Exception:
return None, None
def get_or_create_pipeline() -> Tuple[NewspaperPipeline, bool]:
"""
Get existing pipeline or create a new one.
Returns:
Tuple of (pipeline, is_new_pipeline)
"""
pipeline_id, state_data = find_latest_pipeline_state()
# Create a new pipeline instance
pipeline = NewspaperPipeline()
if pipeline_id and state_data:
# Override the new state with the loaded state
pipeline.state = state_data
return pipeline, False
return pipeline, True
def process_folder(folder_path: str, api_key: Optional[str] = None, max_workers: int = 4, model: str = "pixtral-12b-2409") -> None:
"""
Process all newspaper images in the specified folder.
Args:
folder_path: Path to the folder containing newspaper images
api_key: Optional Mistral API key (defaults to environment variable)
max_workers: Number of parallel workers for batch processing (default: 4)
model: The mistral model to use for OCR (default pixtral-12b-2409)
"""
# Get or create pipeline
pipeline, is_new_pipeline = get_or_create_pipeline()
# Use the folder name as the periodical name
periodical = os.path.basename(folder_path)
# Update the max_workers in the pipeline config
pipeline.config["max_workers"] = max_workers
pipeline.config["model"] = model
if not is_new_pipeline:
print(f"Resuming pipeline {pipeline.state['pipeline_id']}")
try:
# Try to resume the pipeline
result = pipeline.resume_pipeline(periodical, api_key)
print(f"Pipeline resumed and completed with status: {result['status']}")
return
except Exception as e:
print(f"Failed to resume pipeline: {str(e)}")
print("Starting new pipeline run...")
# Create a fresh pipeline for the new run
pipeline = NewspaperPipeline()
# Set max_workers in the fresh pipeline
pipeline.config["max_workers"] = max_workers
pipeline.config["model"] = model
# Start a new pipeline run
pipeline.run_pipeline(
periodical=periodical,
image_folder=folder_path,
api_key=api_key,
wait_for_batch=False
)
print(f"Pipeline started for images in {folder_path}")
print("The script has terminated. Run again later to continue processing from the last checkpoint.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Process newspaper images using the pipeline")
parser.add_argument("folder", help="Path to the folder containing newspaper images")
parser.add_argument("--api-key", help="Mistral API key (optional, defaults to environment variable)")
parser.add_argument("--max-workers", type=int, default=4,
help="Number of parallel workers for batch processing (default: 4)")
parser.add_argument("--model", default="pixtral-12b-2409", help="Model to use for OCR processing")
args = parser.parse_args()
# Ensure the folder exists
if not os.path.isdir(args.folder):
print(f"Error: {args.folder} is not a valid directory")
exit(1)
process_folder(args.folder, args.api_key, args.max_workers, args.model)