Skillia
Back to Projects

Torah Study AI

in progress

Production RAG pipeline on 3.5M sacred texts. Hybrid search, Cohere reranking, strict anti-hallucination guardrails. Built with FastAPI, Weaviate, and Gemini.

PythonFastAPIWeaviateGemini 2.5 FlashCohere RerankNext.jsshadcn/uiDocker
Back to Instructions

Step 11: Embeddings + Weaviate (the cost disaster) - How I Build

How a 25x pricing miscalculation led to a pivot, and the parallel ingest script that saved the project.

6 min read

The Goal

Take the 886K English Sefaria texts and embed them into Weaviate using Gemini Embedding 001 (3072 dimensions). Build a fast ingest pipeline that can handle hundreds of thousands of texts without crashing.

The Cost Disaster

Here is what happened. My cost estimation from Step 9:

174M tokens x $0.006/1M tokens = $1.04
With 50% buffer = ~$2.60

The actual Gemini embedding price: $0.15 per 1M tokens. Not $0.006.

Where did I get $0.006? From an old blog post about text-embedding-004 that was either outdated or wrong. I did not double-check against the official pricing page.

The real math:

174M tokens x $0.15/1M tokens = $26.10

That is 25x more than my estimate. For a side project with no revenue.

What Actually Happened

I started the ingest script on a Saturday night. It ran for a few hours, processing 246K texts (a mix of English and Hebrew). When I checked the Gemini billing dashboard the next morning:

Tokens used: ~130M
Cost: ~20 shekels ($5.50)

Not catastrophic, but 25x my per-text estimate. And I was only 28% through the dataset.

I stopped the script immediately.

The Pivot: English Only

After the cost shock, I made a product decision: MVP is English-only.

Reasoning:

  • The target audience (beginners) mostly reads English
  • 94K high-quality English texts are enough for a great experience
  • Hebrew support can come later with a cheaper embedding model
  • 94K texts at $0.15/1M tokens costs about 5 shekels total

I deleted the 246K mixed-language vectors from Weaviate and re-ingested only filtered English texts:

# Filter: English only, non-empty, reasonable length
filtered = english_clean.filter(
    lambda x: len(x["text"].strip()) > 20  # Skip very short fragments
    and len(x["text"]) < 8000              # Skip extremely long texts
)
print(f"Filtered English texts: {len(filtered):,}")  # ~94,635

The Parallel Ingest Script

Embedding 94K texts one by one would take 26+ hours. I built a parallel ingest pipeline using ThreadPoolExecutor with a Queue for rate limiting.

import weaviate
from google import genai
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from threading import Lock
import time
import json

# Configuration
BATCH_SIZE = 100
MAX_WORKERS = 5
MAX_RETRIES = 3
BACKOFF_BASE = 2  # seconds


class SefariaIngester:
    """Parallel ingest pipeline for Sefaria texts into Weaviate."""

    def __init__(self, weaviate_client, genai_client):
        self.weaviate = weaviate_client
        self.genai = genai_client
        self.stats_lock = Lock()
        self.processed = 0
        self.failed = 0
        self.total = 0

    def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed a batch of texts with retry and exponential backoff."""
        for attempt in range(MAX_RETRIES):
            try:
                result = self.genai.models.embed_content(
                    model="models/gemini-embedding-001",
                    contents=texts,
                    config={"output_dimensionality": 3072},
                )
                return [e.values for e in result.embeddings]
            except Exception as e:
                if attempt < MAX_RETRIES - 1:
                    wait = BACKOFF_BASE ** (attempt + 1)
                    print(f"  Retry {attempt + 1}/{MAX_RETRIES} in {wait}s: {e}")
                    time.sleep(wait)
                else:
                    raise

    def ingest_batch(self, batch: list[dict]):
        """Embed and insert a batch of texts into Weaviate."""
        texts = [item["text"] for item in batch]

        try:
            embeddings = self.embed_batch(texts)

            # Insert into Weaviate
            with self.weaviate.batch as wb:
                for item, embedding in zip(batch, embeddings):
                    wb.add_data_object(
                        data_object={
                            "ref": item["ref"],
                            "text": item["text"],
                            "category": item["category"],
                            "book": item["book"],
                        },
                        class_name="SefariaText",
                        vector=embedding,
                    )

            with self.stats_lock:
                self.processed += len(batch)
                if self.processed % 1000 == 0:
                    elapsed = time.time() - self.start_time
                    rate = self.processed / elapsed
                    remaining = (self.total - self.processed) / rate
                    print(
                        f"  Progress: {self.processed:,}/{self.total:,} "
                        f"({self.processed/self.total*100:.1f}%) "
                        f"- {rate:.0f} texts/sec "
                        f"- ETA: {remaining/60:.0f} min"
                    )

        except Exception as e:
            with self.stats_lock:
                self.failed += len(batch)
            print(f"  FAILED batch: {e}")

    def run(self, dataset: list[dict]):
        """Run parallel ingest on the full dataset."""
        self.total = len(dataset)
        self.start_time = time.time()

        # Split into batches
        batches = [
            dataset[i : i + BATCH_SIZE]
            for i in range(0, len(dataset), BATCH_SIZE)
        ]

        print(f"Starting ingest: {self.total:,} texts in {len(batches):,} batches")
        print(f"Workers: {MAX_WORKERS}, Batch size: {BATCH_SIZE}")

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            executor.map(self.ingest_batch, batches)

        elapsed = time.time() - self.start_time
        print(f"\nDone in {elapsed/60:.1f} minutes")
        print(f"Processed: {self.processed:,}")
        print(f"Failed: {self.failed:,}")
        print(f"Rate: {self.processed/elapsed:.0f} texts/sec")

Running the Ingest

# Set environment variables
export GOOGLE_API_KEY="your-key"
export WEAVIATE_URL="https://your-weaviate.elestio.app"
export WEAVIATE_API_KEY="your-weaviate-key"

# Run ingest
python ingest_sefaria.py

Output:

Starting ingest: 94,635 texts in 947 batches
Workers: 5, Batch size: 100
  Progress: 1,000/94,635 (1.1%) - 42 texts/sec - ETA: 37 min
  Progress: 2,000/94,635 (2.1%) - 45 texts/sec - ETA: 34 min
  ...
  Progress: 94,000/94,635 (99.3%) - 48 texts/sec - ETA: 0 min

Done in 32.8 minutes
Processed: 94,512
Failed: 123
Rate: 48 texts/sec

The 123 failures were texts that hit Gemini's token limit (over 2048 tokens). I handled these separately by truncating to 2000 tokens before embedding.

Weaviate Schema

class_obj = {
    "class": "SefariaText",
    "vectorizer": "none",  # We provide our own embeddings
    "properties": [
        {"name": "ref", "dataType": ["text"]},
        {"name": "text", "dataType": ["text"]},
        {"name": "category", "dataType": ["text"]},
        {"name": "book", "dataType": ["text"]},
    ],
}

client.schema.create_class(class_obj)

Setting vectorizer: "none" means Weaviate stores vectors but does not generate them. We handle embedding externally with Gemini. This gives us full control over the embedding model and lets us switch to a local model later without changing the Weaviate schema.

The Plan: Migrate to Local Embeddings

The long-term plan is to replace Gemini embeddings with Qwen3-Embedding-0.6B:

  • Cost: Free (runs locally)
  • Quality: Rank 3 on Sefaria's own benchmark at 89.4% recall
  • Speed: 420 texts/sec on M1 Max (tested)
  • Dimensions: 1024 (smaller vectors = less storage, faster search)

The migration path:

  1. Download Qwen3-Embedding-0.6B (600MB)
  2. Re-embed all 94K texts locally (~4 minutes at 420 texts/sec)
  3. Create new Weaviate collection with 1024 dimensions
  4. Swap the search endpoint to use the new collection
  5. Delete the old Gemini-embedded collection

Total cost of migration: $0. Total time: ~30 minutes.

Lessons Learned

  • Always benchmark on a small sample first. Embed 1000 texts, check the bill, multiply. Never trust mental math on pricing.
  • Use count_tokens() before embedding. Gemini provides count_tokens() to measure exact token count. Use it on a sample to verify your estimate.
  • Set spending caps. Google Cloud lets you set budget alerts. Set one at $5 before running any batch job. I did not, and I got lucky it was only 20 shekels.
  • Parallel ingest with backoff is essential. Without ThreadPoolExecutor, the 94K ingest would take 26 hours instead of 33 minutes. Without retry with backoff, transient API errors would leave gaps in your index.
  • Product decisions save money. Pivoting to English-only was not just a cost decision. It focused the MVP on a single audience (English-speaking beginners) and simplified evaluation.

Cost Summary

  • Initial estimate: $2.60 for all 886K texts
  • Reality: $0.15/1M tokens, not $0.006/1M
  • Wasted on 246K mixed texts: 20 shekels ($5.50)
  • Final cost for 94K English texts: 5 shekels ($1.40)
  • Future cost with local Qwen3: $0

What is Next

Step 12 connects this vector database to the chat pipeline: Weaviate search (20 candidates), Cohere rerank (top 5), and Gemini generation with the retrieved sources.