Skip to main content
Synchronous spell checking blocks the event loop for ~50-200ms per call. The async API offloads CPU-bound checking to a thread pool, keeping your web server responsive under concurrent load.

Why Async?

Synchronous spell checking blocks the event loop:
# Blocking - can't handle other requests
result = checker.check(text)  # Blocks for ~50-200ms
Async spell checking allows concurrent handling:
# Non-blocking - event loop can handle other tasks
result = await checker.check_async(text)

Basic Usage

Async Check

import asyncio
from myspellchecker import SpellChecker

async def main():
    checker = SpellChecker()

    # Single async check
    result = await checker.check_async("မြန်မာနိုင်ငံ")
    print(f"Errors: {len(result.errors)}")

asyncio.run(main())

Async Batch

async def check_multiple():
    checker = SpellChecker()

    texts = [
        "မြန်မာနိုင်ငံ",
        "ကျေးဇူးတင်ပါသည်",
        "နေကောင်းလား",
    ]

    # Async batch check
    results = await checker.check_batch_async(texts)

    for text, result in zip(texts, results):
        print(f"{text}: {len(result.errors)} errors")

asyncio.run(check_multiple())

Concurrent Checks

async def concurrent_checks():
    checker = SpellChecker()

    texts = ["text1", "text2", "text3", "text4"]

    # Run checks concurrently
    tasks = [checker.check_async(text) for text in texts]
    results = await asyncio.gather(*tasks)

    return results

Web Framework Integration

FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from myspellchecker import SpellChecker

app = FastAPI()
checker = SpellChecker()

class CheckRequest(BaseModel):
    text: str

class CheckResponse(BaseModel):
    has_errors: bool
    error_count: int
    errors: list

@app.post("/check", response_model=CheckResponse)
async def check_spelling(request: CheckRequest):
    result = await checker.check_async(request.text)

    return CheckResponse(
        has_errors=result.has_errors,
        error_count=len(result.errors),
        errors=[
            {
                "position": e.position,
                "text": e.text,
                "type": str(e.error_type),
                "suggestions": e.suggestions[:3],
            }
            for e in result.errors
        ]
    )

@app.post("/check/batch")
async def check_batch(texts: list[str]):
    results = await checker.check_batch_async(texts)

    return [
        {
            "text": r.text,
            "has_errors": r.has_errors,
            "error_count": len(r.errors),
        }
        for r in results
    ]

Starlette

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
from myspellchecker import SpellChecker

checker = SpellChecker()

async def check_endpoint(request):
    data = await request.json()
    text = data.get("text", "")

    result = await checker.check_async(text)

    return JSONResponse({
        "has_errors": result.has_errors,
        "errors": [e.to_dict() for e in result.errors],
    })

app = Starlette(routes=[
    Route("/check", check_endpoint, methods=["POST"]),
])

aiohttp

from aiohttp import web
from myspellchecker import SpellChecker

checker = SpellChecker()

async def check_handler(request):
    data = await request.json()
    text = data.get("text", "")

    result = await checker.check_async(text)

    return web.json_response({
        "has_errors": result.has_errors,
        "errors": [e.to_dict() for e in result.errors],
    })

app = web.Application()
app.router.add_post("/check", check_handler)

Configuration

Async Settings

The async API runs CPU-intensive logic in a thread pool to avoid blocking the event loop. Configuration is handled at the method level:
from myspellchecker import SpellChecker
from myspellchecker.core.constants import ValidationLevel

checker = SpellChecker()

# Specify validation level for async calls
result = await checker.check_async(text, level=ValidationLevel.WORD)

# Control concurrency for batch async
results = await checker.check_batch_async(texts, max_concurrency=4)

Connection Pool for High Concurrency

from myspellchecker import SpellChecker
from myspellchecker.providers import SQLiteProvider

# Use connection pool for concurrent access
provider = SQLiteProvider(pool_max_size=10)
checker = SpellChecker(provider=provider)

Patterns

Rate Limiting

import asyncio
from asyncio import Semaphore

class RateLimitedChecker:
    def __init__(self, max_concurrent: int = 10):
        self.checker = SpellChecker()
        self.semaphore = Semaphore(max_concurrent)

    async def check(self, text: str):
        async with self.semaphore:
            return await self.checker.check_async(text)

    async def check_batch(self, texts: list[str]):
        tasks = [self.check(text) for text in texts]
        return await asyncio.gather(*tasks)

# Usage
limited_checker = RateLimitedChecker(max_concurrent=5)
results = await limited_checker.check_batch(texts)

Timeout Handling

import asyncio

async def check_with_timeout(text: str, timeout: float = 5.0):
    """Check with timeout protection."""
    try:
        result = await asyncio.wait_for(
            checker.check_async(text),
            timeout=timeout
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "timeout", "text": text}

Background Processing

import asyncio
from collections import deque

class BackgroundChecker:
    def __init__(self):
        self.checker = SpellChecker()
        self.queue = deque()
        self.results = {}
        self._running = False

    async def start(self):
        """Start background processing."""
        self._running = True
        while self._running:
            if self.queue:
                text_id, text = self.queue.popleft()
                result = await self.checker.check_async(text)
                self.results[text_id] = result
            else:
                await asyncio.sleep(0.01)

    def submit(self, text_id: str, text: str):
        """Submit text for background processing."""
        self.queue.append((text_id, text))

    def get_result(self, text_id: str):
        """Get result if available."""
        return self.results.get(text_id)

    def stop(self):
        self._running = False

Streaming WebSocket

from fastapi import FastAPI, WebSocket
from myspellchecker import SpellChecker

app = FastAPI()
checker = SpellChecker()

@app.websocket("/ws/check")
async def websocket_check(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            text = await websocket.receive_text()
            result = await checker.check_async(text)

            await websocket.send_json({
                "has_errors": result.has_errors,
                "errors": [e.to_dict() for e in result.errors],
            })
    except Exception:
        await websocket.close()

Performance

Async vs Sync Comparison

Test: 100 concurrent requests
Hardware: 4-core CPU

Sync (sequential):
  Total time: 15.2s
  Throughput: 6.6 req/sec

Async (concurrent):
  Total time: 3.1s
  Throughput: 32.3 req/sec

Async (with connection pool):
  Total time: 1.8s
  Throughput: 55.6 req/sec

Optimization Tips

  1. Use connection pooling for database access
  2. Limit concurrency to prevent resource exhaustion
  3. Set appropriate timeouts for production
  4. Cache frequently checked texts
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_cached_result(text: str):
    # For sync fallback
    return checker.check(text)

async def check_with_cache(text: str):
    # Check cache first
    cached = get_cached_result.__wrapped__(text)
    if cached:
        return cached
    return await checker.check_async(text)

API Reference

check_async

async def check_async(
    text: str,
    level: ValidationLevel = ValidationLevel.SYLLABLE,
    use_semantic: bool | None = None,
) -> Response:
    """
    Asynchronously check text for spelling errors.

    Args:
        text: Text to check
        level: Validation level (SYLLABLE or WORD)
        use_semantic: Override semantic checking for this call

    Returns:
        Response object
    """

check_batch_async

async def check_batch_async(
    texts: list[str],
    level: ValidationLevel = ValidationLevel.SYLLABLE,
    max_concurrency: int = 4,
) -> list[Response]:
    """
    Asynchronously check multiple texts.

    Args:
        texts: List of texts to check
        level: Validation level (SYLLABLE or WORD)
        max_concurrency: Maximum concurrent checks (default: 4)

    Returns:
        List of Response objects
    """

Troubleshooting

Issue: Event loop blocked

Cause: Synchronous code in async context Solution: Ensure all I/O is async:
# Bad: blocks event loop
result = checker.check(text)

# Good: non-blocking
result = await checker.check_async(text)

Issue: “RuntimeError: Event loop is closed”

Cause: Checker used after loop closed Solution: Create checker within async function scope:
async def main():
    with SpellChecker() as checker:
        result = await checker.check_async(text)
Note: SpellChecker uses synchronous context manager (with), not async (async with). The async methods work within this context.

Issue: Database connection errors

Cause: Concurrent access without pooling Solution: Enable connection pooling:
provider = SQLiteProvider(pool_max_size=10)
checker = SpellChecker(provider=provider)

Issue: High memory usage with many connections

Cause: Too many concurrent checks Solution: Limit concurrency:
results = await checker.check_batch_async(
    texts,
    max_concurrency=5  # Limit concurrent checks
)

Next Steps