This document provides comprehensive API reference for the QuData core components.
The main pipeline class for orchestrating the entire data processing workflow.
from qudata import QuDataPipeline, load_config
# Initialize pipeline
config = load_config("pipeline.yaml")
pipeline = QuDataPipeline(config)
# Process documents
result = pipeline.process_directory("/path/to/documents")
print(f"Processed {result.processed_count} documents")Initialize the pipeline with configuration.
Parameters:
config(PipelineConfig): Pipeline configuration object
Example:
from qudata import QuDataPipeline, PipelineConfig
config = PipelineConfig(
ingest=IngestConfig(formats=["pdf", "docx", "txt"]),
clean=CleanConfig(remove_duplicates=True),
export=ExportConfig(formats=["jsonl", "chatml"])
)
pipeline = QuDataPipeline(config)Process all documents in a directory.
Parameters:
input_path(str): Path to input directoryoutput_path(str, optional): Path to output directory
Returns:
PipelineResult: Processing results with statistics and metadata
Example:
result = pipeline.process_directory(
input_path="/data/raw",
output_path="/data/processed"
)
print(f"Success: {result.success}")
print(f"Processed: {result.processed_count}")
print(f"Failed: {result.failed_count}")
print(f"Quality Score: {result.average_quality}")Process a single file.
Parameters:
file_path(str): Path to the file to process
Returns:
ProcessingResult: Individual file processing result
Example:
result = pipeline.process_file("/path/to/document.pdf")
if result.success:
document = result.document
print(f"Title: {document.metadata.title}")
print(f"Quality: {document.quality_score}")
print(f"Content length: {len(document.content)}")
else:
for error in result.errors:
print(f"Error: {error.message}")Process a list of documents.
Parameters:
documents(List[Document]): List of documents to process
Returns:
List[ProcessingResult]: List of processing results
Example:
documents = [
Document(content="Sample text 1", metadata=DocumentMetadata()),
Document(content="Sample text 2", metadata=DocumentMetadata())
]
results = pipeline.process_documents(documents)
successful_results = [r for r in results if r.success]Manages configuration loading, validation, and access.
from qudata import ConfigManager, get_config_manager
# Get singleton config manager
config_manager = get_config_manager()
# Load configuration
config = config_manager.load_config("pipeline.yaml")
# Validate configuration
validation_result = config_manager.validate_config(config)
if not validation_result.is_valid:
for error in validation_result.errors:
print(f"Config error: {error}")Load configuration from file.
Parameters:
config_path(str): Path to configuration file
Returns:
PipelineConfig: Loaded configuration object
Validate configuration object.
Parameters:
config(PipelineConfig): Configuration to validate
Returns:
ValidationResult: Validation result with errors if any
Main pipeline configuration container.
from qudata import PipelineConfig, IngestConfig, CleanConfig
config = PipelineConfig(
ingest=IngestConfig(
formats=["pdf", "docx", "txt", "html"],
max_file_size="100MB",
parallel_processing=True
),
clean=CleanConfig(
remove_duplicates=True,
min_quality_score=0.7,
language_filter=["en", "es"]
)
)Configuration for data ingestion.
Attributes:
formats(List[str]): Supported file formatsmax_file_size(str): Maximum file size to processparallel_processing(bool): Enable parallel processingocr_enabled(bool): Enable OCR for images and scanned PDFs
Configuration for data cleaning.
Attributes:
remove_duplicates(bool): Remove duplicate contentmin_quality_score(float): Minimum quality thresholdlanguage_filter(List[str]): Languages to keepremove_boilerplate(bool): Remove headers/footers
Core document data model.
from qudata import Document, DocumentMetadata, DocumentStructure
document = Document(
id="doc_001",
source_path="/path/to/file.pdf",
content="Document content here...",
metadata=DocumentMetadata(
title="Sample Document",
author="John Doe",
language="en",
file_type="pdf"
),
quality_score=0.85
)id(str): Unique document identifiersource_path(str): Original file pathcontent(str): Extracted text contentmetadata(DocumentMetadata): Document metadatastructure(DocumentStructure): Document structure informationquality_score(float): Quality assessment score (0.0-1.0)processing_timestamp(datetime): When document was processedversion(str): Processing version
Convert document to dictionary.
doc_dict = document.to_dict()
print(doc_dict["metadata"]["title"])Create document from dictionary.
document = Document.from_dict(doc_dict)Get word count of document content.
word_count = document.get_word_count()
print(f"Document has {word_count} words")Document metadata container.
from qudata import DocumentMetadata, Entity
metadata = DocumentMetadata(
title="Research Paper",
author="Dr. Smith",
creation_date=datetime.now(),
language="en",
file_type="pdf",
domain="academic",
topics=["machine learning", "AI"],
entities=[
Entity(text="OpenAI", label="ORG", confidence=0.95),
Entity(text="GPT-4", label="PRODUCT", confidence=0.90)
]
)title(Optional[str]): Document titleauthor(Optional[str]): Document authorcreation_date(Optional[datetime]): Creation timestamplanguage(str): Detected language codefile_type(str): Original file formatdomain(str): Content domain/categorytopics(List[str]): Identified topicsentities(List[Entity]): Named entities
Result of document processing operation.
from qudata import ProcessingResult, ProcessingError
result = ProcessingResult(
success=True,
document=processed_document,
errors=[],
warnings=["Low quality score"],
processing_time=2.5,
stage_results={
"ingest": {"success": True, "time": 0.5},
"clean": {"success": True, "time": 1.2},
"annotate": {"success": True, "time": 0.8}
}
)success(bool): Whether processing succeededdocument(Optional[Document]): Processed document if successfulerrors(List[ProcessingError]): Processing errorswarnings(List[str]): Processing warningsprocessing_time(float): Total processing time in secondsstage_results(Dict[str, Any]): Per-stage processing results
Automatic file type detection.
from qudata.ingest import FileTypeDetector
detector = FileTypeDetector()
file_type = detector.detect_file_type("/path/to/document.pdf")
print(f"Detected type: {file_type}") # Output: pdf
# Check if format is supported
if detector.is_supported(file_type):
print("Format is supported for processing")Detect file type from file path and content.
Check if file type is supported for processing.
Get list of all supported file formats.
Extract content from PDF files.
from qudata.ingest import PDFExtractor
extractor = PDFExtractor()
result = extractor.extract("/path/to/document.pdf")
print(f"Content: {result.content}")
print(f"Tables: {len(result.tables)}")
print(f"Images: {len(result.images)}")Extract content from DOCX and other document formats.
from qudata.ingest import DocumentExtractor
extractor = DocumentExtractor()
result = extractor.extract("/path/to/document.docx")
print(f"Title: {result.metadata.title}")
print(f"Author: {result.metadata.author}")
print(f"Content: {result.content[:200]}...")Extract content from HTML and web pages.
from qudata.ingest import WebExtractor
extractor = WebExtractor()
result = extractor.extract_from_url("https://example.com/article")
print(f"Title: {result.metadata.title}")
print(f"Clean content: {result.content}")from qudata.clean import ComprehensiveCleaningPipeline
cleaner = ComprehensiveCleaningPipeline()
cleaned_document = cleaner.clean_document(document)
print(f"Original length: {len(document.content)}")
print(f"Cleaned length: {len(cleaned_document.content)}")
print(f"Quality score: {cleaned_document.quality_score}")from qudata.annotate import TaxonomyClassifier, MetadataExtractor
# Classify content
classifier = TaxonomyClassifier()
categories = classifier.classify(document.content)
print(f"Categories: {categories}")
# Extract metadata
extractor = MetadataExtractor()
metadata = extractor.extract_metadata(document)
print(f"Entities: {metadata.entities}")from qudata.export import ContentSegmenter
from qudata.pack import JSONLFormatter, ChatMLFormatter
# Segment content for training
segmenter = ContentSegmenter()
segments = segmenter.segment_document(document, format="instruction")
# Export to JSONL
jsonl_formatter = JSONLFormatter()
jsonl_formatter.export_to_file(
documents=[document],
output_path="training_data.jsonl"
)
# Export to ChatML
chatml_formatter = ChatMLFormatter()
chatml_data = chatml_formatter.format_documents([document])from qudata.analyze import AnalysisEngine
analyzer = AnalysisEngine()
# Analyze text statistics
stats = analyzer.analyze_text_statistics([document])
print(f"Total words: {stats.total_words}")
print(f"Unique tokens: {stats.unique_tokens}")
# Perform topic modeling
topics = analyzer.perform_topic_modeling(
texts=[doc.content for doc in documents],
method="bertopic"
)
print(f"Found {len(topics.topics)} topics")
# Analyze sentiment
sentiment = analyzer.analyze_sentiment([doc.content for doc in documents])
print(f"Average sentiment: {sentiment.average_polarity}")All API methods use consistent error handling patterns:
from qudata import ProcessingError, ErrorSeverity
try:
result = pipeline.process_file("document.pdf")
if not result.success:
for error in result.errors:
if error.severity == ErrorSeverity.CRITICAL:
print(f"Critical error: {error.message}")
print(f"Stage: {error.stage}")
print(f"Suggestion: {error.suggestion}")
except ProcessingError as e:
print(f"Processing failed: {e}")For high-throughput scenarios, async versions are available:
import asyncio
from qudata import AsyncQuDataPipeline
async def process_documents_async():
pipeline = AsyncQuDataPipeline(config)
# Process multiple files concurrently
tasks = [
pipeline.process_file_async(f"document_{i}.pdf")
for i in range(100)
]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r.success]
print(f"Successfully processed {len(successful)} documents")
# Run async processing
asyncio.run(process_documents_async())# pipeline.yaml
ingest:
formats: ["pdf", "docx", "txt", "html"]
max_file_size: "100MB"
parallel_processing: true
ocr_enabled: true
clean:
remove_duplicates: true
min_quality_score: 0.7
language_filter: ["en"]
remove_boilerplate: true
annotate:
enable_ner: true
enable_classification: true
taxonomy_file: "configs/taxonomy.yaml"
export:
formats: ["jsonl", "chatml"]
output_dir: "/data/processed"
split_ratios: [0.8, 0.1, 0.1] # train, val, test# advanced-pipeline.yaml
ingest:
formats: ["pdf", "docx", "txt", "html", "csv", "json"]
max_file_size: "500MB"
parallel_processing: true
max_workers: 8
ocr_enabled: true
ocr_languages: ["eng", "spa", "fra"]
web_scraping:
enabled: true
rate_limit: 10 # requests per second
user_agent: "QuData/1.0"
database:
enabled: true
connections:
- type: "postgresql"
host: "localhost"
database: "content_db"
clean:
remove_duplicates: true
similarity_threshold: 0.9
min_quality_score: 0.7
max_quality_score: 1.0
language_filter: ["en", "es", "fr"]
remove_boilerplate: true
text_normalization:
unicode_normalization: "NFKC"
remove_accents: false
lowercase: false
html_cleaning:
remove_scripts: true
remove_styles: true
preserve_links: false
annotate:
enable_ner: true
enable_classification: true
enable_topic_modeling: true
ner_model: "en_core_web_sm"
classification_model: "custom"
taxonomy_file: "configs/taxonomy.yaml"
quality_scoring:
weights:
length: 0.2
language: 0.3
coherence: 0.3
uniqueness: 0.2
export:
formats: ["jsonl", "chatml", "parquet"]
output_dir: "/data/processed"
jsonl:
fields: ["content", "metadata", "quality_score"]
filter_low_quality: true
chatml:
system_message: "You are a helpful assistant."
include_metadata: true
splits:
enabled: true
ratios: [0.8, 0.1, 0.1]
stratify_by: "domain"This API reference provides comprehensive documentation for all major components of the QuData system. Each section includes practical examples and configuration options to help developers integrate and use the system effectively.