The retrieveChunks / retrieve_chunks method allows you to retrieve relevant document chunks from your ingested documents using semantic search. It returns the most relevant content for your query, making it ideal for building custom RAG (Retrieval-Augmented Generation) applications with your preferred LLM.
Method Overview
Sync Method client.sources.retrieve_chunks()
Async Method await client.sources.retrieve_chunks() (using AsyncGraphor)
Async Method await client.sources.retrieveChunks()All TypeScript methods are async and return a Promise.
Method Signature
client.sources.retrieve_chunks(
query: str , # Required
file_ids: list[ str ] | None = None , # Preferred
file_names: list[ str ] | None = None , # Deprecated
timeout: float | None = None
) -> SourceRetrieveChunksResponse
await client . sources . retrieveChunks ({
query: string , // Required
file_ids? : string [] | null , // Preferred
file_names? : string [] | null , // Deprecated
}): Promise < SourceRetrieveChunksResponse >
Parameters
Parameter Type Description Required querystrThe search query to retrieve relevant chunks Yes file_idslist[str] | NoneRestrict retrieval to specific documents by file ID (preferred) No file_nameslist[str] | NoneRestrict retrieval to specific documents by file name (deprecated, use file_ids) No timeoutfloatRequest timeout in seconds No
Parameter Type Description Required querystringThe search query to retrieve relevant chunks Yes file_idsstring[] | nullRestrict retrieval to specific documents by file ID (preferred) No file_namesstring[] | nullRestrict retrieval to specific documents by file name (deprecated, use file_ids) No
Response Object
The method returns a SourceRetrieveChunksResponse object with the following properties:
Property Type Description querystrThe original search query totalintTotal number of chunks retrieved chunkslist[Chunk] | NoneList of retrieved document chunks
Chunk Object
Each chunk in the chunks list contains:
Property Type Description textstrThe text content of the chunk file_idstr | NoneThe unique identifier of the source file file_namestr | NoneThe source file name page_numberint | NoneThe page number where the chunk was found scorefloat | NoneThe relevance score of the chunk (higher is more relevant) metadatadict | NoneAdditional metadata for the chunk
Code Examples
Basic Retrieval
from graphor import Graphor
client = Graphor()
# Retrieve relevant chunks for a query
result = client.sources.retrieve_chunks(
query = "What are the payment terms?"
)
print ( f "Found { result.total } relevant chunks" )
for chunk in result.chunks:
print ( f " \n --- { chunk.file_name } (page { chunk.page_number } ) ---" )
print (chunk.text)
print ( f "Relevance: { chunk.score :.2f} " )
import Graphor from 'graphor' ;
const client = new Graphor ();
// Retrieve relevant chunks for a query
const result = await client . sources . retrieveChunks ({
query: 'What are the payment terms?' ,
});
console . log ( `Found ${ result . total } relevant chunks` );
for ( const chunk of result . chunks ?? []) {
console . log ( ` \n --- ${ chunk . file_name } (page ${ chunk . page_number } ) ---` );
console . log ( chunk . text );
console . log ( `Relevance: ${ chunk . score ?. toFixed ( 2 ) } ` );
}
Retrieval from Specific Documents (using file_ids)
from graphor import Graphor
client = Graphor()
# Restrict retrieval to specific files using file_ids (preferred)
result = client.sources.retrieve_chunks(
query = "What is the total amount due?" ,
file_ids = [ "file_abc123" , "file_def456" ]
)
print ( f "Found { result.total } chunks from specified files" )
for chunk in result.chunks:
print ( f "[ { chunk.file_id } ] { chunk.text[: 100 ] } ..." )
import Graphor from 'graphor' ;
const client = new Graphor ();
// Restrict retrieval to specific files using file_ids (preferred)
const result = await client . sources . retrieveChunks ({
query: 'What is the total amount due?' ,
file_ids: [ 'file_abc123' , 'file_def456' ],
});
console . log ( `Found ${ result . total } chunks from specified files` );
for ( const chunk of result . chunks ?? []) {
console . log ( `[ ${ chunk . file_id } ] ${ chunk . text . slice ( 0 , 100 ) } ...` );
}
Retrieval from Specific Documents (using file_names - deprecated)
from graphor import Graphor
client = Graphor()
# Restrict retrieval to specific files using file_names (deprecated)
result = client.sources.retrieve_chunks(
query = "What is the total amount due?" ,
file_names = [ "invoice-2024.pdf" , "invoice-2023.pdf" ]
)
print ( f "Found { result.total } chunks from specified files" )
for chunk in result.chunks:
print ( f "[ { chunk.file_name } ] { chunk.text[: 100 ] } ..." )
import Graphor from 'graphor' ;
const client = new Graphor ();
// Restrict retrieval to specific files using file_names (deprecated)
const result = await client . sources . retrieveChunks ({
query: 'What is the total amount due?' ,
file_names: [ 'invoice-2024.pdf' , 'invoice-2023.pdf' ],
});
console . log ( `Found ${ result . total } chunks from specified files` );
for ( const chunk of result . chunks ?? []) {
console . log ( `[ ${ chunk . file_name } ] ${ chunk . text . slice ( 0 , 100 ) } ...` );
}
Async Retrieval
import asyncio
from graphor import AsyncGraphor
async def search_documents ( query : str ):
client = AsyncGraphor()
result = await client.sources.retrieve_chunks(
query = query
)
print ( f "Found { result.total } relevant chunks" )
return result.chunks
# Run the async function
chunks = asyncio.run(search_documents( "What are the key contract terms?" ))
import Graphor from 'graphor' ;
const client = new Graphor ();
async function searchDocuments ( query : string ) {
const result = await client . sources . retrieveChunks ({ query });
console . log ( `Found ${ result . total } relevant chunks` );
return result . chunks ;
}
const chunks = await searchDocuments ( 'What are the key contract terms?' );
Error Handling
import graphor
from graphor import Graphor
client = Graphor()
try :
result = client.sources.retrieve_chunks(
query = "What are the payment terms?"
)
print ( f "Found { result.total } chunks" )
except graphor.BadRequestError as e:
print ( f "Invalid request: { e } " )
except graphor.AuthenticationError as e:
print ( f "Invalid API key: { e } " )
except graphor.NotFoundError as e:
print ( f "File not found: { e } " )
except graphor.RateLimitError as e:
print ( f "Rate limit exceeded. Please wait and retry: { e } " )
except graphor.APIConnectionError as e:
print ( f "Connection error: { e } " )
except graphor.APITimeoutError as e:
print ( f "Request timed out: { e } " )
import Graphor from 'graphor' ;
const client = new Graphor ();
try {
const result = await client . sources . retrieveChunks ({
query: 'What are the payment terms?' ,
});
console . log ( `Found ${ result . total } chunks` );
} catch ( err ) {
if ( err instanceof Graphor . BadRequestError ) {
console . log ( `Invalid request: ${ err . message } ` );
} else if ( err instanceof Graphor . AuthenticationError ) {
console . log ( `Invalid API key: ${ err . message } ` );
} else if ( err instanceof Graphor . NotFoundError ) {
console . log ( `File not found: ${ err . message } ` );
} else if ( err instanceof Graphor . RateLimitError ) {
console . log ( `Rate limit exceeded. Please wait and retry: ${ err . message } ` );
} else if ( err instanceof Graphor . APIConnectionError ) {
console . log ( `Connection error: ${ err . message } ` );
} else if ( err instanceof Graphor . APIError ) {
console . log ( `API error (status ${ err . status } ): ${ err . message } ` );
} else {
throw err ;
}
}
Building Custom RAG Pipelines
The prebuilt RAG method is designed to give you full control over your RAG pipeline. Here’s how to integrate it with your preferred LLM.
With OpenAI
from graphor import Graphor
from openai import OpenAI
graphor_client = Graphor()
openai_client = OpenAI()
def retrieve_chunks ( query : str , file_names : list[ str ] | None = None ):
"""Step 1: Retrieve relevant chunks using semantic search."""
result = graphor_client.sources.retrieve_chunks(
query = query,
file_names = file_names
)
return result.chunks
def build_context ( chunks ) -> str :
"""Step 2: Build context from retrieved chunks."""
context = ""
for chunk in chunks:
context += f " \n [Source: { chunk.file_name } , Page { chunk.page_number } ] \n "
context += chunk.text + " \n "
return context
def generate_answer ( question : str , context : str ) -> str :
"""Step 3: Generate answer with OpenAI."""
response = openai_client.chat.completions.create(
model = "gpt-4" ,
messages = [
{
"role" : "system" ,
"content" : "Answer questions based on the provided context. Cite sources when possible."
},
{
"role" : "user" ,
"content" : f "Context: \n { context } \n\n Question: { question } "
}
]
)
return response.choices[ 0 ].message.content
def ask_question ( question : str , file_names : list[ str ] | None = None ) -> dict :
"""Complete RAG pipeline."""
chunks = retrieve_chunks(question, file_names)
context = build_context(chunks)
answer = generate_answer(question, context)
return {
"answer" : answer,
"sources" : [
{ "file" : c.file_name, "page" : c.page_number, "score" : c.score}
for c in chunks
]
}
# Usage
result = ask_question( "What are the payment terms?" )
print (result[ "answer" ])
print ( "Sources:" , result[ "sources" ])
import Graphor from 'graphor' ;
import OpenAI from 'openai' ;
const graphorClient = new Graphor ();
const openaiClient = new OpenAI ();
/** Step 1: Retrieve relevant chunks using semantic search. */
async function retrieveChunks ( query : string , fileNames ?: string []) {
const result = await graphorClient . sources . retrieveChunks ({
query ,
file_names: fileNames ,
});
return result . chunks ?? [];
}
/** Step 2: Build context from retrieved chunks. */
function buildContext ( chunks : Awaited < ReturnType < typeof retrieveChunks >>) : string {
return chunks
. map (( chunk ) => `[Source: ${ chunk . file_name } , Page ${ chunk . page_number } ] \n ${ chunk . text } ` )
. join ( ' \n\n ' );
}
/** Step 3: Generate answer with OpenAI. */
async function generateAnswer ( question : string , context : string ) : Promise < string > {
const response = await openaiClient . chat . completions . create ({
model: 'gpt-4' ,
messages: [
{
role: 'system' ,
content: 'Answer questions based on the provided context. Cite sources when possible.' ,
},
{
role: 'user' ,
content: `Context: \n ${ context } \n\n Question: ${ question } ` ,
},
],
});
return response . choices [ 0 ]. message . content ?? '' ;
}
/** Complete RAG pipeline. */
async function askQuestion ( question : string , fileNames ?: string []) {
const chunks = await retrieveChunks ( question , fileNames );
const context = buildContext ( chunks );
const answer = await generateAnswer ( question , context );
return {
answer ,
sources: chunks . map (( c ) => ({
file: c . file_name ,
page: c . page_number ,
score: c . score ,
})),
};
}
// Usage
const result = await askQuestion ( 'What are the payment terms?' );
console . log ( result . answer );
console . log ( 'Sources:' , result . sources );
With Anthropic Claude
from graphor import Graphor
import anthropic
graphor_client = Graphor()
claude_client = anthropic.Anthropic()
def rag_with_claude ( question : str , file_names : list[ str ] | None = None ) -> str :
"""RAG pipeline using Claude."""
# Retrieve chunks
result = graphor_client.sources.retrieve_chunks(
query = question,
file_names = file_names
)
# Build context
context = " \n\n " .join([
f "[ { chunk.file_name } , Page { chunk.page_number } ] \n { chunk.text } "
for chunk in result.chunks
])
# Generate answer with Claude
message = claude_client.messages.create(
model = "claude-sonnet-4-20250514" ,
max_tokens = 1024 ,
messages = [
{
"role" : "user" ,
"content" : f """Based on the following context, answer the question.
Context:
{ context }
Question: { question }
Please cite sources when answering."""
}
]
)
return message.content[ 0 ].text
# Usage
answer = rag_with_claude( "What are the contract obligations?" )
print (answer)
import Graphor from 'graphor' ;
import Anthropic from '@anthropic-ai/sdk' ;
const graphorClient = new Graphor ();
const claudeClient = new Anthropic ();
async function ragWithClaude ( question : string , fileNames ?: string []) : Promise < string > {
// Retrieve chunks
const result = await graphorClient . sources . retrieveChunks ({
query: question ,
file_names: fileNames ,
});
// Build context
const context = ( result . chunks ?? [])
. map (( chunk ) => `[ ${ chunk . file_name } , Page ${ chunk . page_number } ] \n ${ chunk . text } ` )
. join ( ' \n\n ' );
// Generate answer with Claude
const message = await claudeClient . messages . create ({
model: 'claude-sonnet-4-20250514' ,
max_tokens: 1024 ,
messages: [
{
role: 'user' ,
content: `Based on the following context, answer the question.
Context:
${ context }
Question: ${ question }
Please cite sources when answering.` ,
},
],
});
const block = message . content [ 0 ];
return block . type === 'text' ? block . text : '' ;
}
// Usage
const answer = await ragWithClaude ( 'What are the contract obligations?' );
console . log ( answer );
With Google Gemini
from graphor import Graphor
import google.generativeai as genai
graphor_client = Graphor()
genai.configure( api_key = "YOUR_GOOGLE_API_KEY" )
model = genai.GenerativeModel( "gemini-pro" )
def rag_with_gemini ( question : str , file_names : list[ str ] | None = None ) -> str :
"""RAG pipeline using Google Gemini."""
# Retrieve chunks
result = graphor_client.sources.retrieve_chunks(
query = question,
file_names = file_names
)
# Build context
context = " \n\n " .join([
f "[Source: { chunk.file_name } , Page { chunk.page_number } ] \n { chunk.text } "
for chunk in result.chunks
])
# Generate answer with Gemini
prompt = f """Based on the following context, answer the question.
Context:
{ context }
Question: { question }
Please provide a clear answer and cite the sources."""
response = model.generate_content(prompt)
return response.text
# Usage
answer = rag_with_gemini( "What products are mentioned in the catalog?" )
print (answer)
import Graphor from 'graphor' ;
import { GoogleGenerativeAI } from '@google/generative-ai' ;
const graphorClient = new Graphor ();
const genAI = new GoogleGenerativeAI ( 'YOUR_GOOGLE_API_KEY' );
const model = genAI . getGenerativeModel ({ model: 'gemini-pro' });
async function ragWithGemini ( question : string , fileNames ?: string []) : Promise < string > {
// Retrieve chunks
const result = await graphorClient . sources . retrieveChunks ({
query: question ,
file_names: fileNames ,
});
// Build context
const context = ( result . chunks ?? [])
. map (( chunk ) => `[Source: ${ chunk . file_name } , Page ${ chunk . page_number } ] \n ${ chunk . text } ` )
. join ( ' \n\n ' );
// Generate answer with Gemini
const prompt = `Based on the following context, answer the question.
Context:
${ context }
Question: ${ question }
Please provide a clear answer and cite the sources.` ;
const response = await model . generateContent ( prompt );
return response . response . text ();
}
// Usage
const answer = await ragWithGemini ( 'What products are mentioned in the catalog?' );
console . log ( answer );
Advanced Examples
Document Search Interface
Build a document search that shows relevant excerpts:
from graphor import Graphor
from dataclasses import dataclass
import re
@dataclass
class SearchResult :
text: str
file_name: str
page_number: int
score: float
highlight: str # Text with query highlighted
class DocumentSearcher :
def __init__ ( self , api_key : str | None = None ):
self .client = Graphor( api_key = api_key) if api_key else Graphor()
def search (
self ,
query : str ,
file_names : list[ str ] | None = None ,
min_score : float = 0.0 ,
max_results : int = 10
) -> list[SearchResult]:
"""Search documents and return formatted results."""
result = self .client.sources.retrieve_chunks(
query = query,
file_names = file_names
)
results = []
for chunk in result.chunks or []:
# Filter by minimum score
if chunk.score and chunk.score < min_score:
continue
# Highlight query terms in text
highlight = self ._highlight_text(chunk.text, query)
results.append(SearchResult(
text = chunk.text,
file_name = chunk.file_name or "Unknown" ,
page_number = chunk.page_number or 0 ,
score = chunk.score or 0.0 ,
highlight = highlight
))
if len (results) >= max_results:
break
return results
def _highlight_text ( self , text : str , query : str ) -> str :
"""Simple highlighting of query terms."""
words = query.lower().split()
highlighted = text
for word in words:
pattern = re.compile(re.escape(word), re. IGNORECASE )
highlighted = pattern.sub( f "** { word } **" , highlighted)
return highlighted
# Usage
searcher = DocumentSearcher()
results = searcher.search(
query = "payment terms" ,
min_score = 0.7 ,
max_results = 5
)
for result in results:
print ( f " \n { result.file_name } (Page { result.page_number } )" )
print ( f "Score: { result.score :.2f} " )
print ( f "Text: { result.text[: 200 ] } ..." )
import Graphor from 'graphor' ;
interface SearchResult {
text : string ;
fileName : string ;
pageNumber : number ;
score : number ;
highlight : string ;
}
class DocumentSearcher {
private client : Graphor ;
constructor ( apiKey ?: string ) {
this . client = apiKey ? new Graphor ({ apiKey }) : new Graphor ();
}
async search (
query : string ,
options : { fileNames ?: string []; minScore ?: number ; maxResults ?: number } = {},
) : Promise < SearchResult []> {
const { fileNames , minScore = 0.0 , maxResults = 10 } = options ;
const result = await this . client . sources . retrieveChunks ({
query ,
file_names: fileNames ,
});
const results : SearchResult [] = [];
for ( const chunk of result . chunks ?? []) {
if ( chunk . score != null && chunk . score < minScore ) continue ;
results . push ({
text: chunk . text ,
fileName: chunk . file_name ?? 'Unknown' ,
pageNumber: chunk . page_number ?? 0 ,
score: chunk . score ?? 0.0 ,
highlight: this . highlightText ( chunk . text , query ),
});
if ( results . length >= maxResults ) break ;
}
return results ;
}
private highlightText ( text : string , query : string ) : string {
const words = query . toLowerCase (). split ( / \s + / );
let highlighted = text ;
for ( const word of words ) {
const pattern = new RegExp ( word . replace ( / [ .*+?^${}()|[ \]\\ ] / g , ' \\ $&' ), 'gi' );
highlighted = highlighted . replace ( pattern , `** ${ word } **` );
}
return highlighted ;
}
}
// Usage
const searcher = new DocumentSearcher ();
const results = await searcher . search ( 'payment terms' , {
minScore: 0.7 ,
maxResults: 5 ,
});
for ( const result of results ) {
console . log ( ` \n ${ result . fileName } (Page ${ result . pageNumber } )` );
console . log ( `Score: ${ result . score . toFixed ( 2 ) } ` );
console . log ( `Text: ${ result . text . slice ( 0 , 200 ) } ...` );
}
Async Batch Search
Search multiple queries efficiently with async:
import asyncio
from graphor import AsyncGraphor
async def search_single ( client : AsyncGraphor, query : str ):
"""Search for a single query."""
result = await client.sources.retrieve_chunks( query = query)
return {
"query" : query,
"total" : result.total,
"top_chunk" : result.chunks[ 0 ].text if result.chunks else None
}
async def batch_search ( queries : list[ str ], max_concurrent : int = 5 ):
"""Search multiple queries with controlled concurrency."""
client = AsyncGraphor()
semaphore = asyncio.Semaphore(max_concurrent)
async def search_with_semaphore ( query : str ):
async with semaphore:
return await search_single(client, query)
tasks = [search_with_semaphore(q) for q in queries]
results = await asyncio.gather( * tasks)
return results
# Usage
queries = [
"What are the payment terms?" ,
"What is the contract duration?" ,
"Who are the parties involved?" ,
"What are the termination conditions?"
]
results = asyncio.run(batch_search(queries))
for result in results:
print ( f "Query: { result[ 'query' ] } " )
print ( f "Found: { result[ 'total' ] } chunks" )
print ( f "Top result: { result[ 'top_chunk' ][: 100 ] if result[ 'top_chunk' ] else 'None' } ..." )
print ( "---" )
import Graphor from 'graphor' ;
const client = new Graphor ();
async function searchSingle ( query : string ) {
const result = await client . sources . retrieveChunks ({ query });
return {
query ,
total: result . total ,
topChunk: result . chunks ?.[ 0 ]?. text ?? null ,
};
}
async function batchSearch ( queries : string [], maxConcurrent = 5 ) {
// Process in batches to control concurrency
const results : Awaited < ReturnType < typeof searchSingle >>[] = [];
for ( let i = 0 ; i < queries . length ; i += maxConcurrent ) {
const batch = queries . slice ( i , i + maxConcurrent );
const batchResults = await Promise . all ( batch . map ( searchSingle ));
results . push ( ... batchResults );
}
return results ;
}
// Usage
const queries = [
'What are the payment terms?' ,
'What is the contract duration?' ,
'Who are the parties involved?' ,
'What are the termination conditions?' ,
];
const results = await batchSearch ( queries );
for ( const result of results ) {
console . log ( `Query: ${ result . query } ` );
console . log ( `Found: ${ result . total } chunks` );
console . log ( `Top result: ${ result . topChunk ?. slice ( 0 , 100 ) ?? 'None' } ...` );
console . log ( '---' );
}
RAG with Source Citations
Build a RAG system that properly cites sources:
from graphor import Graphor
from dataclasses import dataclass
from typing import Any
@dataclass
class Citation :
file_name: str
page_number: int
text_snippet: str
@dataclass
class RAGResponse :
answer: str
citations: list[Citation]
confidence: float
class CitedRAG :
def __init__ ( self , llm_client : Any):
self .graphor = Graphor()
self .llm = llm_client # Your LLM client (OpenAI, Anthropic, etc.)
def ask (
self ,
question : str ,
file_names : list[ str ] | None = None
) -> RAGResponse:
"""Ask a question with proper source citations."""
# Retrieve relevant chunks
result = self .graphor.sources.retrieve_chunks(
query = question,
file_names = file_names
)
if not result.chunks:
return RAGResponse(
answer = "I couldn't find relevant information to answer this question." ,
citations = [],
confidence = 0.0
)
# Build numbered context for citation
context_parts = []
citations = []
for i, chunk in enumerate (result.chunks, 1 ):
context_parts.append( f "[ { i } ] { chunk.text } " )
citations.append(Citation(
file_name = chunk.file_name or "Unknown" ,
page_number = chunk.page_number or 0 ,
text_snippet = chunk.text[: 100 ] + "..."
))
context = " \n\n " .join(context_parts)
# Generate answer with citation instructions
prompt = f """Based on the following numbered sources, answer the question.
When citing information, use the source number in brackets like [1], [2], etc.
Sources:
{ context }
Question: { question }
Answer with citations:"""
answer = self ._generate_with_llm(prompt)
# Calculate confidence based on top chunk scores
avg_score = sum (c.score or 0 for c in result.chunks) / len (result.chunks)
return RAGResponse(
answer = answer,
citations = citations,
confidence = avg_score
)
def _generate_with_llm ( self , prompt : str ) -> str :
"""Generate response with your LLM."""
response = self .llm.chat.completions.create(
model = "gpt-4" ,
messages = [{ "role" : "user" , "content" : prompt}]
)
return response.choices[ 0 ].message.content
# Usage
from openai import OpenAI
rag = CitedRAG( llm_client = OpenAI())
response = rag.ask( "What are the payment terms?" )
print ( f "Answer: { response.answer } " )
print ( f " \n Confidence: { response.confidence :.2%} " )
print ( f " \n Citations:" )
for i, citation in enumerate (response.citations, 1 ):
print ( f " [ { i } ] { citation.file_name } , Page { citation.page_number } " )
import Graphor from 'graphor' ;
import OpenAI from 'openai' ;
interface Citation {
fileName : string ;
pageNumber : number ;
textSnippet : string ;
}
interface RAGResponse {
answer : string ;
citations : Citation [];
confidence : number ;
}
class CitedRAG {
private graphor : Graphor ;
private llm : OpenAI ;
constructor ( llmClient : OpenAI ) {
this . graphor = new Graphor ();
this . llm = llmClient ;
}
async ask ( question : string , fileNames ?: string []) : Promise < RAGResponse > {
// Retrieve relevant chunks
const result = await this . graphor . sources . retrieveChunks ({
query: question ,
file_names: fileNames ,
});
const chunks = result . chunks ?? [];
if ( chunks . length === 0 ) {
return {
answer: "I couldn't find relevant information to answer this question." ,
citations: [],
confidence: 0.0 ,
};
}
// Build numbered context for citation
const contextParts : string [] = [];
const citations : Citation [] = [];
chunks . forEach (( chunk , i ) => {
contextParts . push ( `[ ${ i + 1 } ] ${ chunk . text } ` );
citations . push ({
fileName: chunk . file_name ?? 'Unknown' ,
pageNumber: chunk . page_number ?? 0 ,
textSnippet: chunk . text . slice ( 0 , 100 ) + '...' ,
});
});
const context = contextParts . join ( ' \n\n ' );
// Generate answer with citation instructions
const prompt = `Based on the following numbered sources, answer the question.
When citing information, use the source number in brackets like [1], [2], etc.
Sources:
${ context }
Question: ${ question }
Answer with citations:` ;
const answer = await this . generateWithLLM ( prompt );
// Calculate confidence based on top chunk scores
const avgScore =
chunks . reduce (( sum , c ) => sum + ( c . score ?? 0 ), 0 ) / chunks . length ;
return { answer , citations , confidence: avgScore };
}
private async generateWithLLM ( prompt : string ) : Promise < string > {
const response = await this . llm . chat . completions . create ({
model: 'gpt-4' ,
messages: [{ role: 'user' , content: prompt }],
});
return response . choices [ 0 ]. message . content ?? '' ;
}
}
// Usage
const rag = new CitedRAG ( new OpenAI ());
const response = await rag . ask ( 'What are the payment terms?' );
console . log ( `Answer: ${ response . answer } ` );
console . log ( ` \n Confidence: ${ ( response . confidence * 100 ). toFixed ( 1 ) } %` );
console . log ( ' \n Citations:' );
response . citations . forEach (( citation , i ) => {
console . log ( ` [ ${ i + 1 } ] ${ citation . fileName } , Page ${ citation . pageNumber } ` );
});
Multi-Step Reasoning
Build a system that retrieves, reasons, and retrieves again if needed:
from graphor import Graphor
client = Graphor()
def multi_step_rag ( question : str , max_steps : int = 3 ) -> dict :
"""Multi-step RAG with iterative retrieval."""
all_chunks = []
queries_used = [question]
for step in range (max_steps):
# Retrieve chunks for current query
current_query = queries_used[ - 1 ]
result = client.sources.retrieve_chunks( query = current_query)
if not result.chunks:
break
all_chunks.extend(result.chunks)
# Analyze if we need more information (simplified logic)
if len (all_chunks) >= 5 or step == max_steps - 1 :
break
# Generate follow-up query based on what we found
follow_up = generate_follow_up_query(question, all_chunks)
if follow_up and follow_up not in queries_used:
queries_used.append(follow_up)
else :
break
return {
"chunks" : all_chunks,
"queries_used" : queries_used,
"steps" : len (queries_used)
}
def generate_follow_up_query ( original_question : str , chunks : list ) -> str | None :
"""Generate a follow-up query. Use your LLM for better results."""
if "payment" in original_question.lower():
return "invoice due date late fees"
return None
# Usage
result = multi_step_rag( "What are the payment terms and penalties?" )
print ( f "Used { result[ 'steps' ] } retrieval steps" )
print ( f "Queries: { result[ 'queries_used' ] } " )
print ( f "Total chunks: { len (result[ 'chunks' ]) } " )
import Graphor from 'graphor' ;
const client = new Graphor ();
type ChunkType = NonNullable <
Awaited < ReturnType < typeof client . sources . retrieveChunks >>[ 'chunks' ]
>[ number ];
async function multiStepRag ( question : string , maxSteps = 3 ) {
const allChunks : ChunkType [] = [];
const queriesUsed = [ question ];
for ( let step = 0 ; step < maxSteps ; step ++ ) {
const currentQuery = queriesUsed [ queriesUsed . length - 1 ];
const result = await client . sources . retrieveChunks ({ query: currentQuery });
if ( ! result . chunks || result . chunks . length === 0 ) break ;
allChunks . push ( ... result . chunks );
if ( allChunks . length >= 5 || step === maxSteps - 1 ) break ;
// Generate follow-up query based on what we found
const followUp = generateFollowUpQuery ( question , allChunks );
if ( followUp && ! queriesUsed . includes ( followUp )) {
queriesUsed . push ( followUp );
} else {
break ;
}
}
return {
chunks: allChunks ,
queriesUsed ,
steps: queriesUsed . length ,
};
}
function generateFollowUpQuery (
originalQuestion : string ,
chunks : ChunkType [],
) : string | null {
// Simplified example - in practice, use an LLM
if ( originalQuestion . toLowerCase (). includes ( 'payment' )) {
return 'invoice due date late fees' ;
}
return null ;
}
// Usage
const result = await multiStepRag ( 'What are the payment terms and penalties?' );
console . log ( `Used ${ result . steps } retrieval steps` );
console . log ( `Queries: ${ result . queriesUsed . join ( ', ' ) } ` );
console . log ( `Total chunks: ${ result . chunks . length } ` );
Use Cases
Custom RAG Applications
Use this method to build custom RAG pipelines with your preferred LLM:
Retrieve — Get relevant chunks using semantic search
Build context — Format chunks for your LLM prompt
Generate — Create answers using any LLM (OpenAI, Anthropic, Google, etc.)
Document Search
Build document search interfaces that show relevant excerpts:
Query for relevant content
Display chunks with file names and page numbers
Allow users to navigate to source documents
Knowledge Base Q&A
Create custom Q&A systems with full control over:
Prompt engineering
Response formatting
Source citations
Multi-step reasoning
Comparison with Chat SDK
Feature retrieve_chunks()ask()Returns Raw document chunks Generated answer LLM Integration Bring your own Built-in Conversation Memory No Yes (with conversation_id) Customization Full control Limited Use Case Custom RAG pipelines Quick Q&A
Feature retrieveChunks()ask()Returns Raw document chunks Generated answer LLM Integration Bring your own Built-in Conversation Memory No Yes (with conversation_id) Customization Full control Limited Use Case Custom RAG pipelines Quick Q&A
Use retrieveChunks() / retrieve_chunks() when you need full control over your RAG pipeline. Use ask() for quick Q&A with built-in answer generation.
Best Practices
Query Optimization
Be specific — Clear, specific queries return more relevant chunks
Use natural language — Write queries as you would ask a person
Include key terms — Include important keywords from the domain
Filtering and Scoring
Use file_ids — Restrict to specific documents when you know the source
Filter by score — Consider filtering chunks with low relevance scores
Limit results — Process only the top N most relevant chunks
# Example: Filter low-score chunks
result = client.sources.retrieve_chunks( query = "payment terms" )
# Only use chunks with high relevance
high_quality_chunks = [
chunk for chunk in result.chunks
if chunk.score and chunk.score > 0.7
]
// Example: Filter low-score chunks
const result = await client . sources . retrieveChunks ({ query: 'payment terms' });
// Only use chunks with high relevance
const highQualityChunks = ( result . chunks ?? []). filter (
( chunk ) => chunk . score != null && chunk . score > 0.7 ,
);
Context Building
Include metadata — Add file names and page numbers to your context
Order by relevance — Put highest-scoring chunks first
Limit context size — Don’t exceed your LLM’s context window
# Example: Build context with metadata
def build_context ( chunks , max_chunks : int = 5 ) -> str :
# Sort by score (highest first)
sorted_chunks = sorted (
chunks,
key = lambda c : c.score or 0 ,
reverse = True
)[:max_chunks]
context_parts = []
for chunk in sorted_chunks:
source = f "[ { chunk.file_name } , Page { chunk.page_number } ]"
context_parts.append( f " { source } \n { chunk.text } " )
return " \n\n " .join(context_parts)
// Example: Build context with metadata
function buildContext ( chunks : typeof result . chunks , maxChunks = 5 ) : string {
const sortedChunks = [ ... ( chunks ?? [])]
. sort (( a , b ) => ( b . score ?? 0 ) - ( a . score ?? 0 ))
. slice ( 0 , maxChunks );
return sortedChunks
. map (( chunk ) => `[ ${ chunk . file_name } , Page ${ chunk . page_number } ] \n ${ chunk . text } ` )
. join ( ' \n\n ' );
}
Error Handling
Handle empty results — Check if chunks were returned
Implement retries — Use the SDK’s built-in retry mechanism
Set timeouts — Configure appropriate timeouts for your use case
# Configure retries and timeout
client = Graphor( max_retries = 3 , timeout = 60.0 )
# Check for empty results
result = client.sources.retrieve_chunks( query = "..." )
if not result.chunks:
print ( "No relevant chunks found" )
// Configure retries and timeout
const client = new Graphor ({
maxRetries: 3 ,
timeout: 60 * 1000 , // 60 seconds (milliseconds)
});
// Check for empty results
const result = await client . sources . retrieveChunks ({ query: '...' });
if ( ! result . chunks || result . chunks . length === 0 ) {
console . log ( 'No relevant chunks found' );
}
Error Reference
Error Type Status Code Description BadRequestError400 Invalid parameters AuthenticationError401 Invalid or missing API key PermissionDeniedError403 Access denied to the project NotFoundError404 Specified file not found RateLimitError429 Too many requests, please retry after waiting InternalServerError≥500 Server-side error APIConnectionErrorN/A Network connectivity issues APITimeoutErrorN/A Request timed out
Troubleshooting
Causes : Query too specific, no matching content, or files not processedSolutions :
Try a broader, more general query
Verify files have been uploaded and processed
Remove file_names filter to search all documents
Check document processing status with client.sources.list()
Causes : Query doesn’t match document content wellSolutions :
Rephrase the query using terms from your documents
Be more specific about what you’re looking for
Check if documents contain the information you need
Causes : Incorrect file name or file not in projectSolutions :
Verify exact file names (case-sensitive)
Use client.sources.list() to see available files
Ensure files have been successfully processed
Causes : Large document corpus or complex queriesSolutions :
Use file_names or file_ids to limit search scope
Increase timeout value
Process queries in batches if needed
client = Graphor( timeout = 120.0 ) # 2 minutes
const client = new Graphor ({ timeout: 120 * 1000 }); // 2 minutes
Next Steps
After mastering document retrieval:
Chat with Documents Use built-in LLM for quick Q&A without custom integration
Extract Structured Data Extract structured data from documents using JSON Schema
Upload Documents Add more documents to your knowledge base
RAG Quickstart Guide Learn best practices for building RAG applications