Skip to main content
The streaming API processes large text files line-by-line with bounded memory, yielding results incrementally instead of loading everything into memory. It supports synchronous generators, async iterators, and progress callbacks.

Quick Start

from myspellchecker import SpellChecker
from myspellchecker.core.streaming import StreamingChecker

checker = SpellChecker()
streaming = StreamingChecker(checker)

# Process large file with bounded memory
with open("large_file.txt", encoding="utf-8") as f:
    for result in streaming.check_stream(f):
        if result.response.has_errors:
            for error in result.response.errors:
                print(f"Line {result.line_number}: {error.text}")

StreamingConfig

from myspellchecker.core.streaming import StreamingConfig

config = StreamingConfig(
    chunk_size=100,                      # Lines per chunk
    max_memory_mb=100,                   # Memory limit before backpressure
    sentence_boundary_pattern=r"[။!?]+", # Myanmar sentence endings
    enable_cross_sentence_context=True,  # Context across sentences
    progress_interval=1000,              # Lines between callbacks
    timeout_per_chunk=30.0,              # Async timeout per chunk
)

streaming = StreamingChecker(checker, config=config)
OptionDefaultDescription
chunk_size100Lines processed per chunk
max_memory_mb100Max memory before backpressure
sentence_boundary_patternr"[။!?]+"Regex for sentence boundaries
enable_cross_sentence_contextTrueEnable cross-sentence context
progress_interval1000Lines between progress callbacks
timeout_per_chunk30.0Async processing timeout (seconds)

Configuration Presets

# High-throughput (large files on powerful hardware)
high_throughput = StreamingConfig(chunk_size=500, max_memory_mb=500, progress_interval=5000)

# Low-memory (constrained environments)
low_memory = StreamingConfig(chunk_size=10, max_memory_mb=50, progress_interval=100)

# Real-time (per-line processing)
realtime = StreamingConfig(chunk_size=1, max_memory_mb=100, timeout_per_chunk=5.0)

Synchronous Streaming

Processing Files

from myspellchecker.core.streaming import StreamingChecker

streaming = StreamingChecker(checker)

with open("document.txt", encoding="utf-8") as f:
    for result in streaming.check_stream(f):
        if result.response.has_errors:
            for error in result.response.errors:
                print(f"Line {result.line_number}: {error.text}")

Processing Different Input Types

# From file handle
with open("file.txt") as f:
    for result in streaming.check_stream(f):
        pass

# From StringIO
from io import StringIO
text_io = StringIO("Line 1\nLine 2\nLine 3")
for result in streaming.check_stream(text_io):
    pass

# From any iterable
lines = ["Line 1", "Line 2", "Line 3"]
for result in streaming.check_stream(iter(lines)):
    pass

Sentence-by-Sentence Processing

Process text by sentences with context preservation:
text = """
ဤစာကြောင်းသည် ပထမစာကြောင်းဖြစ်သည်။
ဒုတိယစာကြောင်းသည် ပိုရှည်သည်။
"""

for result in streaming.check_sentences(text):
    print(f"Sentence {result.chunk_index}: {result.response.text}")
    if result.response.has_errors:
        print(f"  Errors: {result.response.errors}")

Cross-Sentence Context

When enable_cross_sentence_context=True, the checker validates using context from the previous sentence:
config = StreamingConfig(enable_cross_sentence_context=True)
streaming = StreamingChecker(checker, config=config)

for result in streaming.check_sentences(text):
    pass

# Reset context between documents
streaming.reset_context()

Async Streaming

Basic Async

import asyncio
from myspellchecker import SpellChecker
from myspellchecker.core.streaming import StreamingChecker

async def process_file():
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async with aiofiles.open("large_file.txt") as f:
        async for result in streaming.check_stream_async(f):
            if result.response.has_errors:
                await handle_errors(result)

asyncio.run(process_file())

Async with Timeout

config = StreamingConfig(timeout_per_chunk=10.0)
streaming = StreamingChecker(checker, config=config)

async for result in streaming.check_stream_async(reader):
    if "error" in result.response.metadata:
        print(f"Timeout on line {result.line_number}")

Cancellation

async def cancellable_check(filepath: str, cancel_event: asyncio.Event):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async def read_lines():
        import aiofiles
        async with aiofiles.open(filepath) as f:
            async for line in f:
                if cancel_event.is_set():
                    return
                yield line

    try:
        async for result in streaming.check_stream_async(read_lines()):
            if cancel_event.is_set():
                break
            process_result(result)
    except asyncio.CancelledError:
        print("Processing was cancelled")
        raise

Graceful Shutdown

import signal

async def graceful_streaming():
    checker = SpellChecker()
    streaming = StreamingChecker(checker)
    shutdown_event = asyncio.Event()

    def handle_signal():
        shutdown_event.set()

    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, handle_signal)
    loop.add_signal_handler(signal.SIGTERM, handle_signal)

    async def read_lines():
        for i in range(10000):
            if shutdown_event.is_set():
                return
            yield f"Line {i}\n"
            await asyncio.sleep(0.001)

    async for result in streaming.check_stream_async(read_lines()):
        if shutdown_event.is_set():
            print("Shutting down gracefully...")
            break
        process_result(result)

Progress Tracking

Progress Callbacks

from myspellchecker.core.streaming import StreamingConfig, StreamingStats

def on_progress(stats: StreamingStats):
    print(f"Progress: {stats.lines_processed} lines, "
          f"{stats.lines_per_second:.1f} lines/sec, "
          f"Memory: {stats.current_memory_mb:.1f}MB")

config = StreamingConfig(progress_interval=100)
streaming = StreamingChecker(checker, config=config)

stats = StreamingStats()
with open("large_file.txt") as f:
    for result in streaming.check_stream(f, on_progress=on_progress, stats=stats):
        pass

# Final statistics
print(f"Total time: {stats.elapsed_time:.2f}s")
print(f"Total errors: {stats.errors_found}")

With tqdm Progress Bar

from tqdm import tqdm
from myspellchecker.core.streaming import StreamingStats

def check_with_progress(filepath: str):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    with open(filepath) as f:
        total_lines = sum(1 for _ in f)

    stats = StreamingStats()

    with open(filepath) as f:
        with tqdm(total=total_lines, desc="Checking") as pbar:
            for result in streaming.check_stream(f, stats=stats):
                pbar.update(1)
                pbar.set_postfix({
                    "errors": stats.errors_found,
                    "rate": f"{stats.lines_per_second:.0f}/s"
                })

With Rich Progress Bar

from rich.progress import Progress

with Progress() as progress:
    task = progress.add_task("Checking...", total=total_lines)

    def on_progress(stats):
        progress.update(task, completed=stats.lines_processed)

    for result in streaming.check_stream(f, on_progress=on_progress):
        pass

StreamingStats

from myspellchecker.core.streaming import StreamingStats

stats = StreamingStats()

for result in streaming.check_stream(f, stats=stats):
    pass

# Available metrics
print(stats.bytes_processed)       # Total bytes processed
print(stats.lines_processed)       # Total lines processed
print(stats.sentences_processed)   # Sentences processed
print(stats.errors_found)          # Total errors found
print(stats.chunks_processed)      # Chunks processed
print(stats.elapsed_time)          # Seconds since start
print(stats.lines_per_second)      # Processing rate
print(stats.current_memory_mb)     # Current memory usage

# Serialize to dict
stats_dict = stats.to_dict()

ChunkResult

Each iteration yields a ChunkResult:
from myspellchecker.core.streaming import ChunkResult

for result in streaming.check_stream(f):
    result.response       # Response object with errors
    result.line_number    # Line number in source file
    result.chunk_index    # Sequential chunk number
    result.is_final       # True for last chunk

Memory Management

Backpressure

When memory exceeds max_memory_mb, the streaming checker automatically applies backpressure:
  1. Sync mode: Triggers garbage collection and adds a small delay
  2. Async mode: Adds an async sleep to allow cleanup
config = StreamingConfig(max_memory_mb=50)
streaming = StreamingChecker(checker, config=config)

# Automatic GC and delay when limit exceeded
for result in streaming.check_stream(f):
    pass

Bounded Memory Usage

# Process 10GB file with ~100MB memory
config = StreamingConfig(chunk_size=50, max_memory_mb=100)
streaming = StreamingChecker(checker, config=config)

with open("huge_file.txt") as f:
    for result in streaming.check_stream(f):
        pass  # Memory stays bounded

Error Handling

Sync Error Recovery

def safe_process(filepath: str):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    try:
        with open(filepath, encoding="utf-8") as f:
            for result in streaming.check_stream(f):
                if "error" in result.response.metadata:
                    print(f"Line {result.line_number} had an error")
                else:
                    process_result(result)
    except IOError as e:
        print(f"File error: {e}")

Async Timeout Recovery

config = StreamingConfig(timeout_per_chunk=5.0)

async for result in streaming.check_stream_async(reader):
    if result.response.metadata.get("error") == "Timeout exceeded":
        print(f"Timeout on line {result.line_number}")
        continue

Best Practices

  1. Reuse StreamingChecker instances — creating a new SpellChecker per file is expensive
  2. Reset context between documents — call streaming.reset_context() between unrelated files
  3. Choose chunk size by use case — 500 for throughput, 1 for real-time
  4. Use async for I/O-bound workloads — network sources, file I/O with aiofiles
  5. Set appropriate validation levelSYLLABLE for speed, WORD for thoroughness

Integration Examples

WebSocket Streaming

import websockets
import json

async def websocket_spellcheck(websocket):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async def receive_lines():
        async for message in websocket:
            yield message

    async for result in streaming.check_stream_async(receive_lines()):
        await websocket.send(json.dumps({
            "line": result.line_number,
            "has_errors": result.response.has_errors,
            "errors": [e.to_dict() for e in result.response.errors],
        }))

FastAPI File Upload

from fastapi import FastAPI, UploadFile
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/check-file")
async def check_file(file: UploadFile):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async def generate_results():
        async for line in file:
            text = line.decode("utf-8").strip()
            if text:
                response = checker.check(text)
                yield json.dumps({
                    "text": text,
                    "errors": [e.to_dict() for e in response.errors],
                }) + "\n"

    return StreamingResponse(generate_results(), media_type="application/x-ndjson")

Batch File Processing

import glob

def process_files(pattern: str):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)
    all_errors = []

    for filepath in glob.glob(pattern):
        streaming.reset_context()

        with open(filepath) as f:
            for result in streaming.check_stream(f):
                if result.response.has_errors:
                    all_errors.append({
                        "file": filepath,
                        "line": result.line_number,
                        "errors": result.response.errors,
                    })

    return all_errors

See Also