The Problem That Bugged Me
We had a pipeline pulling 240 documents into our search engine. It worked. It was just painfully slow over an hour to finish. And the reason was simple: it did everything one document at a time. Download a PDF, split it into chunks, get embeddings for each chunk, write each one to the database, then move on to the next file. Repeat 240 times.
It is like going to the shop, buying one item, walking home, then heading back for the next one. Technically correct. Nobody actually does that.
What Was Going On Under the Hood
Each document went through three steps:
- Download the PDF from S3
- Embed the text chunks (turning text into vectors via an API)
- Write everything to OpenSearch
The old code did all three in order. Document 1 had to finish all three before Document 2 even started downloading. A 100-page PDF that splits into 50 chunks meant 50 separate embedding calls and 50 separate writes in a single line.
Old Flow (Sequential):
Doc 1: Download → Embed all chunks → Write to OpenSearch → Done
Doc 2: Download → Embed all chunks → Write to OpenSearch → Done
Doc 3: Download → Embed all chunks → Write to OpenSearch → Done
... (240 times, one after another)
Multiply that by 240 documents and you get why it took over an hour.
The Fix: Let Things Overlap
The core idea was straightforward while one document is being embedded, another one can be downloading, and a third one can be writing to the database. These stages don't depend on each other across different documents.
Think of it like a laundry assembly line. You don't wait for one load to wash, dry, and fold before starting the next. You put load 2 in the washer while load 1 is in the dryer.
New Flow (Concurrent):
Doc 1: Download ─────→ Embed chunks ─────→ Bulk write to OpenSearch
Doc 2: Download ─────→ Embed chunks ─────→ Bulk write
Doc 3: Download ─────→ Embed chunks ─────→ Bulk write
Doc 4: Download ─────→ Embed chunks ─────→ Bulk write
... (all 240 docs overlap across stages)
I rewrote the pipeline using Python's asyncio to handle this concurrency. Here's what changed:
Downloads happen 8 at a time
Instead of one by one, the moment one finishes, the next one starts. A semaphore limits concurrency to 8 simultaneous S3 downloads so we don't overwhelm the connection pool.
Embeddings fire in batches of 20 chunks
Instead of one chunk per API call, we batch 20 chunks together. And up to 50 embedding calls can be in-flight across all documents simultaneously. This respects Azure OpenAI's rate limits (1000 RPM / 1M TPM) while keeping the pipeline moving fast.
Database writes use bulk operations
Instead of inserting chunks one at a time (which for 240 documents meant nearly 12,000 individual writes), each document now sends all its chunks in a single bulk request.
The Architecture: Shared Embedder, Async Pipeline
The main entry point changed from a synchronous loop to a fully async pipeline:
# OLD: Synchronous loop, one doc at a time
def extract_data(self):
for cleaned_text_pdf, meta_data in self.fetch_docs(json_data, processes=7):
embedding_function = AzureEmbeddings(self.user_id, self.consumer)
document_chunks = self.text_split_to_docs(cleaned_text_pdf, meta_data)
result = self.add_or_update_document(document_chunks)
# NEW: All documents launched concurrently
async def extract_data(self):
embedding_function = AzureEmbeddings(self.user_id, self.consumer)
shared_embedder = vector.DEmbedding(embedding_function)
tasks = []
for meta_data in json_data:
task = asyncio.create_task(
self._process_single_document(meta_data, shared_embedder)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
Why shared embedder matters: The old code created a new AzureEmbeddings()
for each document, which calls refresh_auth_header() (an HTTP call). For 240 docs that's 240
auth calls = ~4-8 minutes wasted. Now one embedder is created and shared across all documents.
Three-Stage Pipeline Per Document
Each document flows through a three-stage async pipeline:
async def _process_single_document(self, meta_data, shared_embedder):
# Stage 1: Download PDF (bounded by download_sem)
async with download_sem:
cleaned_text_pdf, returned_meta = await loop.run_in_executor(
executor, self.download_file, meta_data
)
# download_sem released here - next doc can start downloading
# Stage 2: Split into chunks + embed asynchronously
document_chunks = await self.text_split_to_docs(
cleaned_text_pdf, meta_data, local_embedder=shared_embedder,
embed_semaphore=embed_sem,
)
# Stage 3: Bulk write to OpenSearch
result = await loop.run_in_executor(
executor, self.add_or_update_document, document_chunks
)
The key design: the download semaphore is released before embedding starts. This means while Doc 1 is embedding its chunks, Doc 2 can already start downloading. The stages overlap across documents.
The Small Wins That Added Up
Removed a redundant pre-scan
Before processing anything, the old pipeline checked OpenSearch for all 240 documents to see which ones were new or updated. But the actual processing code already checked this during its write step. We were doing the same work twice 240 sequential existence checks for nothing.
Stopped refreshing the index after every write
OpenSearch was rebuilding its index after every single chunk was written. That's like reorganizing your
entire bookshelf every time you add one book. Now we write everything with refresh=False
and call refresh_index() once at the end.
Targeted deletes instead of full scans
When updating a document, the old code scanned the entire index and filtered client-side. Now we use a prefix query to find just the chunks belonging to that document, then bulk-delete them.
Removed a random one-second sleep
There was a time.sleep(1) after every PDF read. No comment explaining why. It added
over four minutes of just... waiting for nothing.
Disabled image extraction
We were extracting images from PDFs even though our pipeline only processes text. Turning this off sped up the PDF parsing step noticeably.
Controlling the Chaos
Running everything at once sounds great until you hit rate limits or crash your database. The key was using semaphores basically counters that limit how many operations happen simultaneously.
| Setting | Value | What it controls |
|---|---|---|
max_concurrent_downloads |
8 | PDFs downloading from S3 at the same time |
max_concurrent_embeddings |
50 | Embedding API calls in-flight at once |
embedding_batch_size |
20 | Chunks sent per embedding batch |
executor_threads |
10 | Thread pool for sync operations (S3, OpenSearch) |
BULK_BATCH_SIZE |
200 | Chunks per OpenSearch bulk write |
All these numbers are configurable through AWS Secrets Manager, so we can tune them without deploying new code.
How It All Works Together
Time 0s: Doc 1-8 start downloading from S3 (download_sem=8)
Time 2s: Doc 1 finishes download, starts embedding. Doc 9 starts downloading.
Time 3s: Doc 2 finishes download, starts embedding. Doc 10 starts downloading.
Doc 1 has 50 chunks, fires 20 embed calls (batch 1 of 3)
Time 4s: Doc 1 batch 1 done, fires batch 2. Doc 3 starts embedding.
Time 5s: Doc 1 all batches done, bulk-writes 50 chunks to OpenSearch.
Docs 4-8 are all embedding. Docs 9-16 are downloading.
...
Time 780s: All 240 docs processed. Final refresh_index(). Done.
The key insight: downloading, embedding, and writing overlap across documents. While some docs are downloading, others are embedding, and others are writing. No stage blocks the others.
The Result
Before: 110+ minutes for 240 documents.
After: ~13 minutes for the same 240 documents.
That's roughly a 8.5x speedup. The remaining time is mostly embedding API calls which is the one thing we can't speed up without a faster model or higher rate limits.
What I Took Away
The biggest lesson here was not really about async. It was about questioning what was already there. The old pipeline was written to be simple and right and it was. But "simple" had quietly turned into "nobody bothered to look at it again" as the document count grew from 20 to 240.
Second lesson: measure first. I had assumed embeddings were the only bottleneck. They were one of them, sure. But the extra pre-scan, the repeated auth calls, the one-at-a-time writes together those cost just as much. If I had only fixed the embeddings, I would have got maybe 2x and called it a day.
Sometimes the biggest win is not making the slow thing faster. It is finding the work you should not have been doing in the first place.