-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathAiQuery.py
More file actions
314 lines (273 loc) · 13.2 KB
/
AiQuery.py
File metadata and controls
314 lines (273 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# archive_agent/ai/query/AiQuery.py
# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev>
# This file is part of Archive Agent. See LICENSE for details.
import hashlib
from logging import Logger
from typing import List, Optional, Tuple
from pydantic import BaseModel, ConfigDict
from qdrant_client.http.models import ScoredPoint
from archive_agent.util.format import get_point_reference_info
from archive_agent.db.QdrantSchema import parse_payload
# === Reference repair configuration (module-level) ===
# Enable/disable soft repair of corrupted 16-char hex reference hashes.
HASH_REPAIR_ENABLED: bool = True
# Maximum allowed Hamming distance for a repair to be accepted.
HASH_REPAIR_MAX_DIST: int = 2
# Hex charset used for validation (lowercase only; inputs are normalized to lowercase).
_HEX_CHARS: str = "0123456789abcdef"
class AnswerItem(BaseModel):
answer: str
chunk_ref_list: List[str]
model_config = ConfigDict(extra='forbid') # Ensures additionalProperties: false — DO NOT REMOVE THIS
class QuerySchema(BaseModel):
"""
This is the format returned by MCP (`#get_answer_rag`) and for JSON output (`--to-json`, `--to-json-auto`).
"""
question_rephrased: str
answer_list: List[AnswerItem]
answer_conclusion: str
follow_up_questions_list: List[str]
is_rejected: bool
rejection_reason: str
model_config = ConfigDict(extra='forbid') # Ensures additionalProperties: false — DO NOT REMOVE THIS
class AiQuery:
@staticmethod
def get_prompt_query(question: str, context: str) -> str:
return "\n".join([
"Act as a query agent for a semantic retrieval system (RAG).",
"Your task is to answer the question using only the provided context as source of truth.",
"Adapt your response to suit any use case: factual lookups, analysis, or technical deep-dives.",
"Use a neutral, encyclopedic tone like Wikipedia—precise, structured, and comprehensive—",
"while being engaging and helpful like ChatGPT: conversational, practical, and user-focused.",
"You must output structured information using the exact response fields described below.",
"Do not return any explanations, commentary, or additional fields.",
"",
"RESPONSE STRATEGY (APPLIES TO ALL FIELDS):",
"- Prefer multi-chunk synthesis over single-chunk paraphrase.",
"- Connect dots across the 8 provided dossiers; favor depth and logical density.",
"- Target 6–8 dense `answer_list` items. Quality and technical precision over quantity.",
"- Use more references: for each answer, include 2–4 chunk references if available.",
"- Ensure every 16-character hex hash is copied exactly byte-for-byte.",
"",
"RESPONSE FIELDS:",
"",
"- `question_rephrased`:",
" Rephrase the original question in clear, context-aware language.",
"",
"- `answer_list`:",
" A list of objects, each containing a detailed, self-contained answer and its references.",
" - `answer`:",
" A detailed, narrative answer based solely on the context.",
" Explain thoroughly with examples, steps, or physical implications.",
" Use light Markdown (**bold**, *italic*), but no headings.",
" Each answer must be \"flat\" (no internal bullet points or hierarchy).",
" Integrate all content narratively. Avoid starting with bolded titles.",
" Optional: append one short sentence labeled \"Speculative — What if: ...\".",
" DO NOT mention references or provenance in this text field.",
" - `chunk_ref_list`:",
" List of reference designators: `<<< 0123456789ABCDEF >>>`.",
" Accuracy is mission-critical: exactly 16 hex characters required.",
"",
"- `answer_conclusion`:",
" A concise, integrative summary synthesizing the main ideas from `answer_list`.",
" Highlight connections and key takeaways. No new information.",
"",
"- `follow_up_questions_list`:",
" A list of 4–6 specific, well-formed follow-up questions that extend the topic.",
" Each must be self-contained and include all required context.",
"",
"- `is_rejected`:",
" Boolean flag. Set `true` ONLY if the context has zero relevant information.",
"",
"- `rejection_reason`:",
" Short factual reason for rejection. Required only if `is_rejected` is `true`.",
"",
"IMPORTANT GLOBAL CONSTRAINTS:",
"- NO references or chunks mentioned in `answer` or `answer_conclusion` text.",
"- References ONLY go into the `chunk_ref_list` in each `answer_list` item.",
"- Accuracy check: verify that every hash in the output is exactly 16 characters.",
"- Speed & Intelligence: provide the most impactful insights from the context.",
"",
"Context:\n\"\"\"\n" + context + "\n\"\"\"\n\n",
"Question:\n\"\"\"\n" + question + "\n\"\"\"",
])
@staticmethod
def get_point_hash(point: ScoredPoint) -> str:
"""
Get point hash.
:param point: Point.
:return: Point hash (16-character hex, SHA-1).
"""
model = parse_payload(point.payload)
chunk_index = str(model.chunk_index)
chunks_total = str(model.chunks_total)
file_path = str(model.file_path)
file_mtime = str(model.file_mtime)
line_range = str(model.line_range or '')
page_range = str(model.page_range or '')
point_str = "".join([
chunk_index,
chunks_total,
file_path,
file_mtime,
line_range,
page_range,
])
# noinspection PyTypeChecker
return hashlib.sha1(point_str.encode('utf-8')).hexdigest()[:16]
@staticmethod
def get_context_from_points(points: List[ScoredPoint]) -> str:
"""
Get context from points.
:param points: Points.
:return: Context string.
"""
return "\n\n\n\n".join([
"\n\n".join([
f"<<< {AiQuery.get_point_hash(point)} >>>",
f"{parse_payload(point.payload).chunk_text}\n",
])
for point in points
])
@staticmethod
def format_query_references(
logger: Logger,
query_result: QuerySchema,
points: List[ScoredPoint],
) -> QuerySchema:
"""
Format reference designators in query result as human-readable reference infos.
Broken/unresolvable references are **discarded** (not replaced with placeholders)
and a failure is logged. Soft repairs (Hamming-nearest within the configured
radius) are attempted when enabled.
:param logger: Logger.
:param query_result: Query result.
:param points: Points.
:return: Query result with reference designators formatted as human-readable
reference infos; invalid references removed.
"""
# Build a mapping: hash -> ScoredPoint (store keys lowercase for robustness)
points_by_hash = {
AiQuery.get_point_hash(point).lower(): point
for point in points
if point.payload is not None
}
# Extracts 16-char token from '<<< 0123456789ABCDEF >>>'
def extract_hash(ref: str) -> str:
# Let's allow some slack from weaker or overloaded LLMs here...
hash_str = ref.replace("<<<", "").replace(">>>", "").strip()
hash_str = hash_str.lower()
if len(hash_str) == 16 and all(c in _HEX_CHARS for c in hash_str):
return hash_str
logger.critical(f"⚠️ Invalid reference format: '{ref}'")
return hash_str # Return whatever we found; may be repaired below.
def hamming_distance(a: str, b: str) -> int:
"""
Compute the Hamming distance of two equal-length strings.
Preconditions: len(a) == len(b).
"""
return sum((ch1 != ch2) for ch1, ch2 in zip(a, b))
def try_repair_hash(maybe_hash: str, max_dist: int) -> Optional[str]:
"""
Attempt to repair a corrupted 16-char hash by nearest-neighbor search
in Hamming space over known point hashes.
:param maybe_hash: The (possibly corrupted) 16-char token extracted from the reference.
:param max_dist: Maximum Hamming distance allowed for a repair to be accepted.
:return: Repaired lowercase hash if confidently matched; otherwise None.
"""
token = (maybe_hash or "").lower()
if len(token) != 16:
return None
# Exact match — nothing to repair.
if token in points_by_hash:
return token
best: Optional[Tuple[int, str]] = None
for candidate in points_by_hash.keys():
dist = hamming_distance(token, candidate)
if best is None or dist < best[0]:
best = (dist, candidate)
# Early stopping is intentionally conservative; we keep scanning to avoid ties ambiguity.
if best is None:
return None
dist, winner = best
if dist <= max_dist:
logger.warning(
f"🔧 Repaired reference hash '{maybe_hash}' -> '{winner}' (Hamming distance={dist})."
)
return winner
return None
for answer in query_result.answer_list:
# Build a new list to **discard** broken references instead of emitting '???'.
formatted_refs: List[str] = []
for chunk_ref in answer.chunk_ref_list:
hash_id = extract_hash(chunk_ref)
point = points_by_hash.get(hash_id.lower())
if point is not None:
formatted_refs.append(
get_point_reference_info(logger, point, verbose=False)
)
continue
# Attempt a soft repair via Hamming-nearest neighbor within a small radius.
repaired_hash: Optional[str] = None
if HASH_REPAIR_ENABLED:
repaired_hash = try_repair_hash(hash_id, max_dist=HASH_REPAIR_MAX_DIST)
if repaired_hash is not None:
repaired_point = points_by_hash.get(repaired_hash)
if repaired_point is not None:
formatted_refs.append(
get_point_reference_info(logger, repaired_point, verbose=False)
)
continue
# Log and **discard** the unresolved reference token.
logger.error(f"❌ Unresolvable reference; discarding token '{hash_id}'.")
# Overwrite with only successfully resolved (or repaired) references.
answer.chunk_ref_list = formatted_refs
return query_result
@staticmethod
def get_answer_text(query_result: QuerySchema) -> str:
"""
Get answer text.
:param query_result: Query result.
:return: Formatted answer, or empty string if rejected.
"""
if query_result.is_rejected:
return ""
# Create a list of unique references in order of appearance
all_refs_ordered = []
ref_map = {}
for item in query_result.answer_list:
for ref in item.chunk_ref_list:
if ref not in ref_map:
ref_map[ref] = len(all_refs_ordered) + 1
all_refs_ordered.append(ref)
answers_formatted = []
for item in query_result.answer_list:
ref_markers = ""
if item.chunk_ref_list:
# Sort the references by their appearance order for this answer
sorted_refs = sorted(list(set(item.chunk_ref_list)), key=lambda r: ref_map[r])
ref_numbers = [ref_map[ref] for ref in sorted_refs]
ref_markers = " " + " ".join(f"**[{num}]**" for num in ref_numbers)
answers_formatted.append(f"- {item.answer}{ref_markers}")
answer_list_text = "\n".join(answers_formatted)
chunk_ref_list_text = "\n".join([
f"- **[{i + 1}]** {ref}"
for i, ref in enumerate(all_refs_ordered)
])
follow_up_questions_list_text = "\n".join([
f"- {follow_up}"
for follow_up in query_result.follow_up_questions_list
])
answer_text = "\n\n".join(filter(None, [
f"### Question",
f"**{query_result.question_rephrased}**",
f"### Answers",
f"{answer_list_text}",
f"### Conclusion",
f"**{query_result.answer_conclusion}**",
f"### References" if chunk_ref_list_text else "",
chunk_ref_list_text if chunk_ref_list_text else "",
f"### Follow-Up Questions",
f"{follow_up_questions_list_text}",
]))
return answer_text