Skip to main content
When you have many texts to check, check_batch() processes them together with reduced overhead. Combine with thread or process pools for 2-10x throughput improvements over sequential checking.

Why Batch Processing?

Processing texts individually has overhead:
  • Repeated initialization
  • No parallelization
  • Inefficient memory usage
Batch processing provides:
  • Parallelization: Process multiple texts concurrently
  • Reduced overhead: Share resources across texts
  • Better throughput: 2-10x faster than sequential

Basic Usage

Simple Batch Check

from myspellchecker import SpellChecker

checker = SpellChecker()

texts = [
    "မြန်မာနိုင်ငံ",
    "ကျေးဇူးတင်ပါသည်",
    "နေကောင်းလား",
]

# Check all texts
results = checker.check_batch(texts)

for text, result in zip(texts, results):
    print(f"{text}: {len(result.errors)} errors")

With Validation Level

from myspellchecker.core.constants import ValidationLevel

# Check with syllable-level validation (faster)
results = checker.check_batch(texts, level=ValidationLevel.SYLLABLE)

# Check with word-level validation (more thorough)
results = checker.check_batch(texts, level=ValidationLevel.WORD)

Parallelization

OpenMP Parallelization

Cython extensions use OpenMP for parallel processing:
# Requires libomp on macOS: brew install libomp
# Automatically uses all available cores

results = checker.check_batch(texts)

Thread Pool Parallelization

Python-level parallelization for I/O-bound operations:
from concurrent.futures import ThreadPoolExecutor

checker = SpellChecker()

def check_single(text):
    return checker.check(text)

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(check_single, texts))

Process Pool Parallelization

For CPU-bound operations with GIL limitations:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

# Each process gets its own checker
def check_in_process(text):
    checker = SpellChecker()
    return checker.check(text)

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(check_in_process, texts))

Configuration

Batch Configuration

from myspellchecker import SpellChecker
from myspellchecker.core.config import SpellCheckerConfig, get_profile

# Use a preset optimized for batch processing
config = get_profile("fast")

checker = SpellChecker(config=config)
results = checker.check_batch(texts)

Memory-Efficient Processing

# For very large datasets, use streaming
def check_stream(file_handle, batch_size=100):
    """Process file in memory-efficient batches."""
    batch = []

    for line in file_handle:
        batch.append(line.strip())

        if len(batch) >= batch_size:
            yield from checker.check_batch(batch)
            batch = []

    if batch:
        yield from checker.check_batch(batch)

# Usage
with open("large_file.txt", encoding="utf-8") as f:
    for result in check_stream(f):
        if result.has_errors:
            print(result.text)

Performance Optimization

Optimal Batch Size

Text LengthOptimal Batch Size
Short (<50 chars)500-1000
Medium (50-200 chars)100-500
Long (>200 chars)50-100
def get_optimal_batch_size(texts):
    avg_length = sum(len(t) for t in texts) / len(texts)

    if avg_length < 50:
        return 500
    elif avg_length < 200:
        return 100
    else:
        return 50

Worker Count

import os

# Rule of thumb: 1-2 workers per CPU core
num_cores = os.cpu_count() or 4
optimal_workers = min(num_cores, len(texts) // 10)  # At least 10 texts per worker

Cython Acceleration

Ensure Cython extensions are compiled:
# Check if Cython is being used
python -c "from myspellchecker.text.normalize_c import remove_zero_width_chars; print('Cython OK')"

# Rebuild if needed
python setup.py build_ext --inplace

Benchmarks

Throughput Comparison

Test: 10,000 texts, average 100 chars each
Hardware: 8-core CPU

Sequential (no batch):
  Time: 45.2s
  Throughput: 221 texts/sec

Batch (1 worker):
  Time: 38.1s
  Throughput: 262 texts/sec

Batch (4 workers):
  Time: 12.3s
  Throughput: 813 texts/sec

Batch (8 workers):
  Time: 8.7s
  Throughput: 1,149 texts/sec

Batch (8 workers + Cython):
  Time: 4.2s
  Throughput: 2,381 texts/sec

Memory Usage

Sequential: ~50MB base + 0.1MB per text
Batch (100): ~50MB base + 10MB buffer
Batch (1000): ~50MB base + 100MB buffer
Streaming: ~50MB base + 10MB buffer (constant)

API Reference

check_batch

def check_batch(
    texts: list[str],
    level: ValidationLevel = ValidationLevel.SYLLABLE,
) -> list[Response]:
    """
    Check multiple texts.

    Args:
        texts: List of texts to check
        level: Validation level (SYLLABLE or WORD)

    Returns:
        List of Response objects
    """

StreamingChecker

For memory-efficient streaming, use StreamingChecker:
from myspellchecker.core.streaming import StreamingChecker

streaming = StreamingChecker(checker)
for result in streaming.check_stream(file_handle):
    process_result(result)

Common Patterns

Progress Tracking

from tqdm import tqdm

def check_with_progress(texts):
    """Check texts with progress bar using streaming."""
    pbar = tqdm(total=len(texts))
    results = []

    # Use streaming for progress tracking
    # check_batch() does not support callback parameter
    for i in range(0, len(texts), 100):  # Process in chunks
        batch = texts[i:i+100]
        batch_results = checker.check_batch(batch)
        results.extend(batch_results)
        pbar.update(len(batch))

    pbar.close()
    return results
Note: For true streaming with callbacks, use StreamingChecker.check_stream() with on_progress.

Error Aggregation

from collections import Counter

def aggregate_errors(results):
    """Aggregate errors across all results."""
    all_errors = []
    error_type_counts = Counter()

    for result in results:
        for error in result.errors:
            error_type_counts[error.error_type] += 1
            all_errors.append(error)

    return {
        "total": len(all_errors),
        "by_type": dict(error_type_counts),
        # Possible keys: "invalid_syllable", "invalid_word",
        # "context_probability", "grammar_error", etc.
        "errors": all_errors,
    }

Parallel File Processing

from pathlib import Path
from concurrent.futures import ProcessPoolExecutor

def process_file(file_path: Path) -> dict:
    """Process single file."""
    checker = SpellChecker()

    with open(file_path, encoding="utf-8") as f:
        texts = f.readlines()

    results = checker.check_batch(texts)

    return {
        "file": str(file_path),
        "total_lines": len(texts),
        "errors": sum(len(r.errors) for r in results),
    }

def process_directory(dir_path: Path, pattern: str = "*.txt") -> list:
    """Process all files in directory."""
    files = list(dir_path.glob(pattern))

    with ProcessPoolExecutor() as executor:
        results = list(executor.map(process_file, files))

    return results

Chunked Processing for Very Large Files

def process_large_file(file_path: str, chunk_size: int = 10000):
    """Process very large file in chunks."""
    all_results = []
    chunk = []

    with open(file_path, encoding="utf-8") as f:
        for line in f:
            chunk.append(line.strip())

            if len(chunk) >= chunk_size:
                results = checker.check_batch(chunk)
                all_results.extend(results)
                chunk = []

                # Optional: Clear cache to free memory
                # Note: SpellChecker has no clear_cache() method

        if chunk:
            results = checker.check_batch(chunk)
            all_results.extend(results)

    return all_results

Troubleshooting

Issue: No speedup with batch processing

Cause: GIL contention or I/O bottleneck Solution: Use process-based parallelization:
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(check_single, texts))

Issue: Out of memory

Cause: Batch too large or results not processed Solution: Use streaming:
from myspellchecker.core.streaming import StreamingChecker

streaming = StreamingChecker(checker)
for result in streaming.check_stream(text_iterator):
    process_result(result)  # Process immediately
    # Results are garbage collected

Issue: Slow with many small texts

Cause: Worker overhead dominates Solution: Process larger batches at once:
# Process texts in larger chunks manually
chunk_size = 1000
for i in range(0, len(texts), chunk_size):
    chunk = texts[i:i + chunk_size]
    results = checker.check_batch(chunk)

Next Steps