Fix generator quality issues and run initial corpus pipeline
Pre-corpus fixes (from EVALUATION.md): - Clean 2,264 contaminated rows from augmented relations (bridge artifacts, full-sentence HasProperty values, null bytes, empty words) - Fix article logic: dynamic a/an across Deconstruction, FalseEquivalence, DenialOfConsequences, TautologicalWisdom templates - Tighten _short_concepts() default from max_words=3 to 2 - Fix FutilePreparation gerunding: filter vocab nouns and noun-suffix words from UsedFor targets; fix CVC doubling for 'y'-ending words - Add _looks_like_verb() heuristic, improve _a() for vowel-sound edges Pipeline hardening: - polish_corpus.py: context-size fallback (truncate chain, then minimal prompt), classified error types, consecutive-error circuit breaker, 10-entry flush granularity, ETA tracking, KeyboardInterrupt handling - generate_raw_batch.sh: fix python -> python3 Corpus generation run (9,835 raw -> 5,499 polished -> 2,312 filtered): - 44.1% discard rate, 0 errors, 82 minutes on RTX 4090 - 9,257 training pairs across 5 input framing types - 97.6% vocab coverage (609/624 words) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
356b62c6ea
commit
651ec3ffc6
10 changed files with 34853 additions and 2406 deletions
|
|
@ -34,7 +34,7 @@ total=0
|
|||
for template in "${TEMPLATES[@]}"; do
|
||||
echo -n " $template ($COUNT_PER_TEMPLATE)... "
|
||||
before=$(wc -l < "$OUTPUT")
|
||||
python "$GENERATOR" --template "$template" --count "$COUNT_PER_TEMPLATE" --json >> "$OUTPUT" 2>/dev/null
|
||||
python3 "$GENERATOR" --template "$template" --count "$COUNT_PER_TEMPLATE" --json >> "$OUTPUT" 2>/dev/null
|
||||
after=$(wc -l < "$OUTPUT")
|
||||
generated=$((after - before))
|
||||
total=$((total + generated))
|
||||
|
|
@ -47,7 +47,7 @@ echo ""
|
|||
|
||||
# Check template distribution
|
||||
echo "Template distribution:"
|
||||
python -c "
|
||||
python3 -c "
|
||||
import json, sys
|
||||
from collections import Counter
|
||||
counts = Counter()
|
||||
|
|
|
|||
|
|
@ -4,6 +4,13 @@
|
|||
Reads corpus_raw.jsonl, sends each to GLM4-32B for polish.
|
||||
Output file is the checkpoint — append mode with resume detection.
|
||||
|
||||
Robust error handling:
|
||||
- Context size errors: truncates chain data and retries
|
||||
- JSON parse errors: retries, then marks as error
|
||||
- Transient HTTP errors: exponential backoff retry
|
||||
- Keyboard interrupt: flushes and exits cleanly
|
||||
- Safe resume: skips entries already in output file
|
||||
|
||||
Usage:
|
||||
python scripts/polish_corpus.py
|
||||
python scripts/polish_corpus.py --input corpus/corpus_raw.jsonl --output corpus/corpus_polished.jsonl
|
||||
|
|
@ -62,8 +69,27 @@ Chain: canoe UsedFor transport, fire UsedFor boiling_food
|
|||
Polished: DISCARD"""
|
||||
|
||||
|
||||
class LLMError(Exception):
|
||||
"""Base class for LLM errors."""
|
||||
pass
|
||||
|
||||
|
||||
class ContextTooLong(LLMError):
|
||||
"""Prompt exceeded context window."""
|
||||
pass
|
||||
|
||||
|
||||
class TransientError(LLMError):
|
||||
"""Recoverable error (network, server overload, etc.)."""
|
||||
pass
|
||||
|
||||
|
||||
def llm_chat_completion(messages, max_retries=3):
|
||||
"""Chat completion with retry logic."""
|
||||
"""Chat completion with retry logic and error classification.
|
||||
|
||||
Returns (response_text, error_type) tuple.
|
||||
response_text is None on failure; error_type is None on success.
|
||||
"""
|
||||
import requests
|
||||
|
||||
for attempt in range(max_retries):
|
||||
|
|
@ -71,21 +97,94 @@ def llm_chat_completion(messages, max_retries=3):
|
|||
resp = requests.post(LLM_ENDPOINT, json={
|
||||
"model": LLM_MODEL,
|
||||
"messages": messages,
|
||||
"temperature": 0.7,
|
||||
}, timeout=120)
|
||||
|
||||
# Check for context length errors (HTTP 400 typically)
|
||||
if resp.status_code == 400:
|
||||
body = resp.text.lower()
|
||||
if any(kw in body for kw in ["context", "token", "length", "too long", "exceed"]):
|
||||
return None, "context_too_long"
|
||||
# Other 400 errors — log and retry
|
||||
print(f" HTTP 400 (attempt {attempt+1}): {resp.text[:200]}", file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(2 ** attempt)
|
||||
continue
|
||||
return None, "http_400"
|
||||
|
||||
if resp.status_code == 503 or resp.status_code == 429:
|
||||
wait = 2 ** (attempt + 1)
|
||||
print(f" HTTP {resp.status_code} (attempt {attempt+1}), waiting {wait}s...",
|
||||
file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(wait)
|
||||
continue
|
||||
return None, "server_overload"
|
||||
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return data["choices"][0]["message"]["content"].strip()
|
||||
except Exception as e:
|
||||
wait = (2 ** attempt)
|
||||
print(f" LLM error (attempt {attempt+1}/{max_retries}): {e}", file=sys.stderr)
|
||||
|
||||
# Parse JSON response
|
||||
try:
|
||||
data = resp.json()
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
print(f" JSON parse error (attempt {attempt+1}): {e}", file=sys.stderr)
|
||||
print(f" Response body: {resp.text[:300]}", file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(2 ** attempt)
|
||||
continue
|
||||
return None, "json_parse"
|
||||
|
||||
# Extract content from response
|
||||
try:
|
||||
content = data["choices"][0]["message"]["content"]
|
||||
if content is None:
|
||||
print(f" Null content in response (attempt {attempt+1})", file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(1)
|
||||
continue
|
||||
return None, "null_content"
|
||||
return content.strip(), None
|
||||
except (KeyError, IndexError) as e:
|
||||
print(f" Unexpected JSON structure (attempt {attempt+1}): {e}", file=sys.stderr)
|
||||
print(f" Keys: {list(data.keys()) if isinstance(data, dict) else type(data)}",
|
||||
file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(1)
|
||||
continue
|
||||
return None, "json_structure"
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
wait = 2 ** (attempt + 1)
|
||||
print(f" Timeout (attempt {attempt+1}), waiting {wait}s...", file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(wait)
|
||||
else:
|
||||
return None
|
||||
continue
|
||||
return None, "timeout"
|
||||
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
wait = 2 ** (attempt + 2) # longer wait for connection errors
|
||||
print(f" Connection error (attempt {attempt+1}): {e}", file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(wait)
|
||||
continue
|
||||
return None, "connection"
|
||||
|
||||
except Exception as e:
|
||||
print(f" Unexpected error (attempt {attempt+1}): {type(e).__name__}: {e}",
|
||||
file=sys.stderr)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(2 ** attempt)
|
||||
continue
|
||||
return None, "unexpected"
|
||||
|
||||
return None, "exhausted_retries"
|
||||
|
||||
|
||||
def format_chain(chain_edges):
|
||||
"""Format chain_edges list into readable string for LLM context."""
|
||||
def format_chain(chain_edges, truncate=False):
|
||||
"""Format chain_edges list into readable string for LLM context.
|
||||
|
||||
If truncate=True, omit weights and surface_text to reduce token count.
|
||||
"""
|
||||
if not chain_edges:
|
||||
return "(no chain data)"
|
||||
parts = []
|
||||
|
|
@ -93,8 +192,11 @@ def format_chain(chain_edges):
|
|||
start = edge.get("start", "?")
|
||||
rel = edge.get("relation", "?")
|
||||
end = edge.get("end", "?")
|
||||
weight = edge.get("weight", 0)
|
||||
parts.append(f"{start} --{rel}--> {end} (w:{weight:.1f})")
|
||||
if truncate:
|
||||
parts.append(f"{start} --{rel}--> {end}")
|
||||
else:
|
||||
weight = edge.get("weight", 0)
|
||||
parts.append(f"{start} --{rel}--> {end} (w:{weight:.1f})")
|
||||
return ", ".join(parts)
|
||||
|
||||
|
||||
|
|
@ -103,9 +205,33 @@ def format_slots(slots):
|
|||
return ", ".join(f"{k}={v}" for k, v in slots.items())
|
||||
|
||||
|
||||
def build_messages(entry, truncate_chain=False):
|
||||
"""Build the messages list for a single entry."""
|
||||
raw_text = entry.get("raw_text", "")
|
||||
meta_template = entry.get("meta_template", "")
|
||||
chain = format_chain(entry.get("chain", []), truncate=truncate_chain)
|
||||
slots = format_slots(entry.get("slots", {}))
|
||||
|
||||
user_prompt = (
|
||||
f"Meta-template: {meta_template}\n"
|
||||
f"Relationship chain: {chain}\n"
|
||||
f"Slot fills: {slots}\n"
|
||||
f"Raw saying: {raw_text}"
|
||||
)
|
||||
|
||||
return [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_prompt},
|
||||
]
|
||||
|
||||
|
||||
def load_already_processed(output_path):
|
||||
"""Load set of raw_text strings already processed (for resume)."""
|
||||
"""Load set of raw_text strings already processed (for resume).
|
||||
|
||||
Also returns counts of each status for accurate progress reporting.
|
||||
"""
|
||||
processed = set()
|
||||
counts = {"polished": 0, "discarded": 0, "error": 0}
|
||||
if output_path.exists():
|
||||
with open(output_path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
|
|
@ -115,9 +241,12 @@ def load_already_processed(output_path):
|
|||
try:
|
||||
entry = json.loads(line)
|
||||
processed.add(entry.get("raw_text", ""))
|
||||
status = entry.get("status", "")
|
||||
if status in counts:
|
||||
counts[status] += 1
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return processed
|
||||
return processed, counts
|
||||
|
||||
|
||||
def main():
|
||||
|
|
@ -141,15 +270,21 @@ def main():
|
|||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
raw_entries.append(json.loads(line))
|
||||
try:
|
||||
raw_entries.append(json.loads(line))
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Warning: skipping malformed input line: {e}", file=sys.stderr)
|
||||
|
||||
print(f"Loaded {len(raw_entries)} raw entries from {input_path}")
|
||||
|
||||
# Check what's already been processed
|
||||
already_processed = load_already_processed(output_path)
|
||||
already_processed, prev_counts = load_already_processed(output_path)
|
||||
remaining = [e for e in raw_entries if e.get("raw_text", "") not in already_processed]
|
||||
|
||||
print(f"Already processed: {len(already_processed)}")
|
||||
print(f"Already processed: {len(already_processed)} "
|
||||
f"(polished={prev_counts['polished']}, "
|
||||
f"discarded={prev_counts['discarded']}, "
|
||||
f"errors={prev_counts['error']})")
|
||||
print(f"Remaining: {len(remaining)}")
|
||||
|
||||
if not remaining:
|
||||
|
|
@ -159,55 +294,105 @@ def main():
|
|||
discards = 0
|
||||
polished = 0
|
||||
errors = 0
|
||||
error_types = {}
|
||||
consecutive_errors = 0
|
||||
start_time = time.time()
|
||||
|
||||
with open(output_path, "a", encoding="utf-8") as out:
|
||||
for i, entry in enumerate(remaining):
|
||||
raw_text = entry.get("raw_text", "")
|
||||
meta_template = entry.get("meta_template", "")
|
||||
chain = format_chain(entry.get("chain", []))
|
||||
slots = format_slots(entry.get("slots", {}))
|
||||
try:
|
||||
with open(output_path, "a", encoding="utf-8") as out:
|
||||
for i, entry in enumerate(remaining):
|
||||
# First attempt with full chain data
|
||||
messages = build_messages(entry, truncate_chain=False)
|
||||
response, error_type = llm_chat_completion(messages)
|
||||
|
||||
user_prompt = (
|
||||
f"Meta-template: {meta_template}\n"
|
||||
f"Relationship chain: {chain}\n"
|
||||
f"Slot fills: {slots}\n"
|
||||
f"Raw saying: {raw_text}"
|
||||
)
|
||||
# If context too long, retry with truncated chain
|
||||
if error_type == "context_too_long":
|
||||
print(f" #{i+1}: context too long, retrying with truncated chain...",
|
||||
file=sys.stderr)
|
||||
messages = build_messages(entry, truncate_chain=True)
|
||||
response, error_type = llm_chat_completion(messages)
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_prompt},
|
||||
]
|
||||
# If still too long, try with just the raw text
|
||||
if error_type == "context_too_long":
|
||||
print(f" #{i+1}: still too long, retrying with minimal prompt...",
|
||||
file=sys.stderr)
|
||||
raw_text = entry.get("raw_text", "")
|
||||
messages = [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": f"Raw saying: {raw_text}"},
|
||||
]
|
||||
response, error_type = llm_chat_completion(messages)
|
||||
|
||||
response = llm_chat_completion(messages)
|
||||
if response is None:
|
||||
entry["status"] = "error"
|
||||
entry["error_type"] = error_type or "unknown"
|
||||
errors += 1
|
||||
consecutive_errors += 1
|
||||
error_types[error_type] = error_types.get(error_type, 0) + 1
|
||||
|
||||
if response is None:
|
||||
entry["status"] = "error"
|
||||
errors += 1
|
||||
elif response.strip().upper() == "DISCARD":
|
||||
entry["status"] = "discarded"
|
||||
discards += 1
|
||||
else:
|
||||
entry["polished_text"] = response.strip()
|
||||
entry["status"] = "polished"
|
||||
polished += 1
|
||||
# If we get 20 consecutive errors, something is seriously wrong
|
||||
if consecutive_errors >= 20:
|
||||
print(f"\nFATAL: {consecutive_errors} consecutive errors. "
|
||||
f"Last error type: {error_type}", file=sys.stderr)
|
||||
print("Flushing output and stopping. Re-run to resume.", file=sys.stderr)
|
||||
out.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
||||
out.flush()
|
||||
sys.exit(1)
|
||||
elif response.strip().upper() == "DISCARD":
|
||||
entry["status"] = "discarded"
|
||||
discards += 1
|
||||
consecutive_errors = 0
|
||||
else:
|
||||
# Sanity check the response
|
||||
cleaned = response.strip()
|
||||
# Sometimes the LLM wraps in quotes
|
||||
if cleaned.startswith('"') and cleaned.endswith('"'):
|
||||
cleaned = cleaned[1:-1]
|
||||
# Sometimes the LLM prefixes with "Polished:" or similar
|
||||
for prefix in ["Polished:", "polished:", "Output:", "Result:"]:
|
||||
if cleaned.startswith(prefix):
|
||||
cleaned = cleaned[len(prefix):].strip()
|
||||
entry["polished_text"] = cleaned
|
||||
entry["status"] = "polished"
|
||||
polished += 1
|
||||
consecutive_errors = 0
|
||||
|
||||
out.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
||||
out.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
||||
|
||||
if (i + 1) % 100 == 0:
|
||||
out.flush()
|
||||
total_done = len(already_processed) + i + 1
|
||||
print(f" [{total_done}/{len(raw_entries)}] "
|
||||
f"polished={polished}, discarded={discards}, errors={errors}")
|
||||
# Flush every 10 entries for fine-grained resume safety
|
||||
if (i + 1) % 10 == 0:
|
||||
out.flush()
|
||||
|
||||
time.sleep(0.1)
|
||||
# Progress report every 100 entries
|
||||
if (i + 1) % 100 == 0:
|
||||
total_done = len(already_processed) + i + 1
|
||||
elapsed = time.time() - start_time
|
||||
rate = (i + 1) / elapsed
|
||||
eta_sec = (len(remaining) - (i + 1)) / rate if rate > 0 else 0
|
||||
eta_min = eta_sec / 60
|
||||
print(f" [{total_done}/{len(raw_entries)}] "
|
||||
f"polished={polished}, discarded={discards}, errors={errors} "
|
||||
f"({rate:.1f}/s, ETA {eta_min:.0f}m)")
|
||||
|
||||
total_done = len(already_processed) + len(remaining)
|
||||
print(f"\nDone: {total_done} total entries processed.")
|
||||
time.sleep(0.1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print(f"\nInterrupted at entry {i+1}/{len(remaining)}. "
|
||||
f"Progress saved — re-run to resume.", file=sys.stderr)
|
||||
|
||||
# Final report
|
||||
elapsed = time.time() - start_time
|
||||
total_done = len(already_processed) + polished + discards + errors
|
||||
print(f"\nSession complete: {polished + discards + errors} entries processed "
|
||||
f"in {elapsed/60:.1f} minutes.")
|
||||
print(f" Polished: {polished}")
|
||||
print(f" Discarded: {discards}")
|
||||
print(f" Errors: {errors}")
|
||||
print(f" Discard rate: {discards/(polished+discards)*100:.1f}%" if (polished+discards) else " N/A")
|
||||
if error_types:
|
||||
print(f" Error breakdown: {error_types}")
|
||||
if polished + discards > 0:
|
||||
print(f" Discard rate: {discards/(polished+discards)*100:.1f}%")
|
||||
print(f" Total across all sessions: {total_done}/{len(raw_entries)}")
|
||||
print(f"Output: {output_path}")
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue