Skip to main content
The Graphor Python SDK provides convenient access to the Graphor REST API from any Python 3.9+ application. The library includes type definitions for all request params and response fields, and offers both synchronous and asynchronous clients. This page provides a comprehensive overview of the SDK. It covers the full lifecycle:
  1. Data Ingestion (Sources): Upload, process, list, and manage documents
  2. Document Chat: Ask questions about your documents with conversational memory
  3. Data Extraction: Extract structured data using JSON Schema
  4. Prebuilt RAG: Retrieve relevant document chunks for custom RAG pipelines

GitHub Repository

View the source code, report issues, and contribute to the SDK.

Installation

Install the Graphor SDK from PyPI:
pip install graphor
Python 3.9 or higher is required.

Data Ingestion (Sources)

The Sources methods cover the full ingestion lifecycle:

Document Chat

Once your data is ingested, use the Chat method to ask questions:

Data Extraction

Extract specific structured data from your documents using schemas:

Prebuilt RAG

Build custom RAG pipelines with semantic document retrieval:

What “Data Ingestion” includes

  • Upload: Create a new source (file / web page / GitHub / YouTube)
  • Parse: Choose OCR/parsing method; reprocess when needed
  • List: Monitor status and metadata
  • Elements: Retrieve structured elements/partitions after processing
  • Delete: Remove a source permanently

Authentication

All SDK methods require authentication using API tokens. You can provide your API key in two ways: Set the GRAPHOR_API_KEY environment variable:
export GRAPHOR_API_KEY="grlm_your_api_key_here"
from graphor import Graphor

# API key is automatically read from GRAPHOR_API_KEY
client = Graphor()

Direct Initialization

from graphor import Graphor

client = Graphor(api_key="grlm_your_api_key_here")
Learn how to generate and manage API tokens in the API Tokens guide.

Token Security

  • Never expose tokens in client-side code or public repositories
  • Use environment variables to store tokens securely
  • Rotate tokens regularly for enhanced security
  • Use different tokens for different environments (dev/staging/prod)

Async Usage

Simply import AsyncGraphor instead of Graphor and use await with each API call:
import asyncio
from graphor import AsyncGraphor

client = AsyncGraphor()

async def main():
    source = await client.sources.upload(file=b"raw file contents")
    print(source.project_id)

asyncio.run(main())

Available Methods

Sources

MethodDescription
client.sources.upload()Upload a local file
client.sources.upload_url()Upload from a web URL
client.sources.upload_github()Upload from GitHub
client.sources.upload_youtube()Upload from YouTube
client.sources.parse()Reprocess a source with different parsing method
client.sources.list()List all sources in the project
client.sources.delete()Delete a source permanently
client.sources.load_elements()Get parsed elements from a source

Chat & Extraction

MethodDescription
client.sources.ask()Ask questions about your documents
client.sources.extract()Extract structured data using JSON Schema
client.sources.retrieve_chunks()Retrieve relevant chunks for custom RAG

Complete Workflow Example

Here’s the full “happy path”: upload → parse → list → elements → chat/extract/rag.

1. Upload a source

from pathlib import Path
from graphor import Graphor

client = Graphor()

# Upload a document
source = client.sources.upload(file=Path("./document.pdf"))
print(f"Uploaded: {source.file_name}")
print(f"Status: {source.status}")

2. Parse (OCR/parsing)

# Reprocess with a different parsing method
source = client.sources.parse(
    file_name="document.pdf",
    partition_method="hi_res"
)
print(f"Processed with: {source.partition_method}")

3. Monitor status (List Sources)

# List all sources and check status
sources = client.sources.list()

for source in sources:
    print(f"{source.file_name}: {source.status}")

# Find specific source
target = next((s for s in sources if s.file_name == "document.pdf"), None)
if target:
    print(f"Status: {target.status}")

4. Retrieve structured elements (after processing)

# Get parsed elements with pagination
elements = client.sources.load_elements(
    file_name="document.pdf",
    page=1,
    page_size=50
)

print(f"Total elements: {elements.total}")
for item in elements.items:
    print(f"  [{item.metadata.element_type}] {item.page_content[:100]}...")

5. Ask Questions (Chat)

# Ask a question about your documents
response = client.sources.ask(
    question="What are the main topics in this document?",
    file_names=["document.pdf"]  # Optional: scope to specific files
)

print(f"Answer: {response.answer}")

# Continue the conversation
follow_up = client.sources.ask(
    question="Can you elaborate on the first topic?",
    conversation_id=response.conversation_id
)
print(f"Follow-up: {follow_up.answer}")

6. Extract Data (Extraction)

# Extract structured data using JSON Schema
result = client.sources.extract(
    file_names=["document.pdf"],
    user_instruction="Extract the invoice number and total amount.",
    output_schema={
        "type": "object",
        "properties": {
            "invoice_number": {"type": "string", "description": "Invoice ID"},
            "total_amount": {"type": "number", "description": "Total due"}
        },
        "required": ["invoice_number", "total_amount"]
    }
)

print(f"Invoice: {result.structured_output['invoice_number']}")
print(f"Amount: ${result.structured_output['total_amount']}")

7. Retrieve Chunks (Prebuilt RAG)

# Retrieve relevant chunks for custom RAG pipelines
chunks = client.sources.retrieve_chunks(
    query="What are the payment terms?",
    file_names=["document.pdf"]  # Optional: scope to specific files
)

print(f"Found {chunks.total} relevant chunks")
for chunk in chunks.chunks:
    print(f"\n[{chunk.file_name}, Page {chunk.page_number}]")
    print(chunk.text)
    print(f"Score: {chunk.score:.2f}")

Integration Patterns

Complete SDK Client Wrapper

from graphor import Graphor, AsyncGraphor
import graphor
from pathlib import Path
from typing import Any
import os


class GraphorSDK:
    """Complete wrapper for common Graphor operations."""
    
    def __init__(self, api_key: str | None = None):
        self.client = Graphor(api_key=api_key) if api_key else Graphor()
    
    # ==================== Sources ====================
    
    def upload_file(self, file_path: str | Path) -> dict[str, Any]:
        """Upload a file and return source info."""
        source = self.client.sources.upload(file=Path(file_path))
        return {
            "file_name": source.file_name,
            "status": source.status,
            "project_id": source.project_id
        }
    
    def upload_url(self, url: str, crawl: bool = False) -> dict[str, Any]:
        """Upload from a URL."""
        source = self.client.sources.upload_url(url=url, crawl_urls=crawl)
        return {"file_name": source.file_name, "status": source.status}
    
    def process(self, file_name: str, method: str = "hi_res") -> dict[str, Any]:
        """Process a source with specified method."""
        source = self.client.sources.parse(
            file_name=file_name,
            partition_method=method
        )
        return {
            "file_name": source.file_name,
            "status": source.status,
            "method": source.partition_method
        }
    
    def list_sources(self) -> list[dict[str, Any]]:
        """List all sources."""
        sources = self.client.sources.list()
        return [
            {
                "file_name": s.file_name,
                "status": s.status,
                "file_type": s.file_type,
                "file_size": s.file_size
            }
            for s in sources
        ]
    
    def get_elements(
        self, 
        file_name: str, 
        page: int = 1, 
        page_size: int = 50
    ) -> dict[str, Any]:
        """Get parsed elements from a source."""
        result = self.client.sources.load_elements(
            file_name=file_name,
            page=page,
            page_size=page_size
        )
        return {
            "total": result.total,
            "page": result.page,
            "total_pages": result.total_pages,
            "items": [
                {
                    "type": item.metadata.element_type,
                    "content": item.page_content,
                    "page": item.metadata.page_number
                }
                for item in result.items
            ]
        }
    
    def delete(self, file_name: str) -> dict[str, Any]:
        """Delete a source."""
        result = self.client.sources.delete(file_name=file_name)
        return {"file_name": result.file_name, "message": result.message}
    
    # ==================== Chat ====================
    
    def ask(
        self, 
        question: str, 
        file_names: list[str] | None = None,
        conversation_id: str | None = None
    ) -> dict[str, Any]:
        """Ask a question about documents."""
        kwargs = {"question": question}
        if file_names:
            kwargs["file_names"] = file_names
        if conversation_id:
            kwargs["conversation_id"] = conversation_id
        
        response = self.client.sources.ask(**kwargs)
        return {
            "answer": response.answer,
            "conversation_id": response.conversation_id
        }
    
    # ==================== Extraction ====================
    
    def extract(
        self,
        file_names: list[str],
        instruction: str,
        schema: dict[str, Any]
    ) -> dict[str, Any]:
        """Extract structured data from documents."""
        result = self.client.sources.extract(
            file_names=file_names,
            user_instruction=instruction,
            output_schema=schema
        )
        return {
            "file_names": result.file_names,
            "data": result.structured_output,
            "raw": result.raw_json
        }
    
    # ==================== RAG ====================
    
    def retrieve(
        self,
        query: str,
        file_names: list[str] | None = None
    ) -> dict[str, Any]:
        """Retrieve relevant chunks for RAG."""
        kwargs = {"query": query}
        if file_names:
            kwargs["file_names"] = file_names
        
        result = self.client.sources.retrieve_chunks(**kwargs)
        return {
            "query": result.query,
            "total": result.total,
            "chunks": [
                {
                    "text": c.text,
                    "file_name": c.file_name,
                    "page": c.page_number,
                    "score": c.score
                }
                for c in result.chunks or []
            ]
        }


# Usage example
sdk = GraphorSDK()

def full_workflow(file_path: str):
    """Complete ingestion, chat, and extraction workflow."""
    try:
        # 1. Upload
        upload_result = sdk.upload_file(file_path)
        file_name = upload_result["file_name"]
        print(f"✅ Uploaded: {file_name}")
        
        # 2. Process
        process_result = sdk.process(file_name, "hi_res")
        print(f"✅ Processed with: {process_result['method']}")
        
        # 3. Chat
        chat_result = sdk.ask("Summarize this document", [file_name])
        print(f"📝 Summary: {chat_result['answer']}")
        
        # 4. Extract
        extract_result = sdk.extract(
            file_names=[file_name],
            instruction="Extract key information",
            schema={
                "type": "object",
                "properties": {
                    "title": {"type": "string", "description": "Document title"},
                    "summary": {"type": "string", "description": "Brief summary"}
                }
            }
        )
        print(f"📊 Extracted: {extract_result['data']}")
        
        return {"success": True, "file_name": file_name}
        
    except graphor.APIStatusError as e:
        print(f"❌ Error: {e}")
        return {"success": False, "error": str(e)}

Async Integration

import asyncio
from graphor import AsyncGraphor
import graphor


class AsyncGraphorSDK:
    """Async wrapper for Graphor operations."""
    
    def __init__(self, api_key: str | None = None):
        self.client = AsyncGraphor(api_key=api_key) if api_key else AsyncGraphor()
    
    async def process_multiple(
        self,
        file_paths: list[str],
        method: str = "hi_res"
    ) -> list[dict]:
        """Upload and process multiple files concurrently."""
        from pathlib import Path
        
        async def process_one(file_path: str) -> dict:
            try:
                # Upload
                source = await self.client.sources.upload(file=Path(file_path))
                
                # Process
                processed = await self.client.sources.parse(
                    file_name=source.file_name,
                    partition_method=method
                )
                
                return {
                    "file": file_path,
                    "status": "success",
                    "file_name": processed.file_name
                }
            except graphor.APIStatusError as e:
                return {"file": file_path, "status": "failed", "error": str(e)}
        
        # Process all files concurrently
        tasks = [process_one(fp) for fp in file_paths]
        results = await asyncio.gather(*tasks)
        
        return results
    
    async def batch_ask(
        self,
        questions: list[str],
        file_names: list[str] | None = None
    ) -> list[dict]:
        """Ask multiple questions concurrently."""
        async def ask_one(question: str) -> dict:
            response = await self.client.sources.ask(
                question=question,
                file_names=file_names
            )
            return {"question": question, "answer": response.answer}
        
        tasks = [ask_one(q) for q in questions]
        return await asyncio.gather(*tasks)


# Usage
async def main():
    sdk = AsyncGraphorSDK()
    
    # Process multiple files
    results = await sdk.process_multiple([
        "doc1.pdf",
        "doc2.pdf",
        "doc3.pdf"
    ])
    
    for r in results:
        status = "✅" if r["status"] == "success" else "❌"
        print(f"{status} {r['file']}")
    
    # Ask multiple questions
    answers = await sdk.batch_ask([
        "What is the main topic?",
        "Who are the key people mentioned?",
        "What are the conclusions?"
    ])
    
    for a in answers:
        print(f"Q: {a['question']}")
        print(f"A: {a['answer']}\n")

asyncio.run(main())

Error Handling

The SDK provides typed exceptions for different error scenarios:
import graphor
from graphor import Graphor

client = Graphor()

try:
    source = client.sources.upload(file=b"raw file contents")
except graphor.APIConnectionError as e:
    print("The server could not be reached")
    print(e.__cause__)
except graphor.RateLimitError as e:
    print("Rate limit exceeded. Back off and retry.")
except graphor.BadRequestError as e:
    print(f"Invalid request: {e}")
except graphor.AuthenticationError as e:
    print(f"Invalid API key: {e}")
except graphor.NotFoundError as e:
    print(f"Resource not found: {e}")
except graphor.APIStatusError as e:
    print(f"API error (status {e.status_code}): {e}")

Error Types

Status CodeError TypeDescription
400BadRequestErrorInvalid parameters or malformed request
401AuthenticationErrorInvalid or missing API key
403PermissionDeniedErrorAccess denied to resource
404NotFoundErrorResource doesn’t exist
422UnprocessableEntityErrorValidation error
429RateLimitErrorToo many requests
≥500InternalServerErrorServer-side error
N/AAPIConnectionErrorNetwork connectivity issues
N/AAPITimeoutErrorRequest timed out

Configuration

Retries

Certain errors are automatically retried 2 times by default with exponential backoff:
from graphor import Graphor

# Configure default retries
client = Graphor(max_retries=0)  # Disable retries

# Or per-request
client.with_options(max_retries=5).sources.upload(file=b"...")

Timeouts

By default, requests time out after 1 minute:
from graphor import Graphor

# Configure default timeout (in seconds)
client = Graphor(timeout=120.0)  # 2 minutes

# Or per-request
client.with_options(timeout=300.0).sources.parse(
    file_name="large-document.pdf",
    partition_method="graphorlm"
)

Using aiohttp for Better Concurrency

For high-concurrency async operations, use the aiohttp client:
import asyncio
from graphor import AsyncGraphor, DefaultAioHttpClient

async def main():
    async with AsyncGraphor(
        http_client=DefaultAioHttpClient()
    ) as client:
        # Your async operations here
        sources = await client.sources.list()
        print(f"Found {len(sources)} sources")

# Install aiohttp first: pip install graphor[aiohttp]
asyncio.run(main())

Rate Limits and Best Practices

Performance Guidelines

  • Batch Operations: Process multiple files sequentially or with controlled concurrency
  • Async Processing: Use AsyncGraphor for concurrent operations
  • Retry Logic: The SDK handles retries automatically; configure max_retries as needed
  • Timeout Handling: Increase timeouts for large documents or complex processing

Best Practices

from graphor import Graphor
import graphor
import time

client = Graphor(
    max_retries=3,
    timeout=120.0  # 2 minutes for processing operations
)

def robust_upload(file_path: str, max_attempts: int = 3) -> dict | None:
    """Upload with custom retry logic."""
    from pathlib import Path
    
    for attempt in range(max_attempts):
        try:
            source = client.sources.upload(file=Path(file_path))
            return {"success": True, "file_name": source.file_name}
        except graphor.RateLimitError:
            wait_time = 2 ** attempt  # Exponential backoff
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)
        except graphor.APIConnectionError as e:
            print(f"Connection error (attempt {attempt + 1}): {e}")
            time.sleep(1)
        except graphor.APIStatusError as e:
            print(f"API error: {e}")
            return {"success": False, "error": str(e)}
    
    return {"success": False, "error": "Max retries exceeded"}

Common Use Cases

Document Processing Pipeline

from graphor import Graphor
from pathlib import Path
import graphor

client = Graphor(timeout=300.0)

def document_pipeline(directory: str, method: str = "hi_res"):
    """Process all documents in a directory."""
    results = []
    
    for file_path in Path(directory).glob("*.pdf"):
        try:
            # Upload
            source = client.sources.upload(file=file_path)
            print(f"📤 Uploaded: {source.file_name}")
            
            # Process
            processed = client.sources.parse(
                file_name=source.file_name,
                partition_method=method
            )
            print(f"⚙️ Processed: {processed.file_name}")
            
            results.append({
                "file": str(file_path),
                "status": "success",
                "file_name": processed.file_name
            })
            
        except graphor.APIStatusError as e:
            results.append({
                "file": str(file_path),
                "status": "failed",
                "error": str(e)
            })
    
    successful = sum(1 for r in results if r["status"] == "success")
    print(f"\n✅ Processed {successful}/{len(results)} files")
    
    return results

Q&A System

from graphor import Graphor

client = Graphor()

class DocumentQA:
    """Simple Q&A system with conversation history."""
    
    def __init__(self, file_names: list[str] | None = None):
        self.file_names = file_names
        self.conversation_id = None
    
    def ask(self, question: str) -> str:
        """Ask a question, maintaining conversation history."""
        response = client.sources.ask(
            question=question,
            file_names=self.file_names,
            conversation_id=self.conversation_id
        )
        
        # Store conversation ID for follow-up questions
        self.conversation_id = response.conversation_id
        
        return response.answer
    
    def reset(self):
        """Reset conversation history."""
        self.conversation_id = None


# Usage
qa = DocumentQA(file_names=["report.pdf"])

print(qa.ask("What is this document about?"))
print(qa.ask("What are the main findings?"))  # Follow-up
print(qa.ask("Can you summarize the conclusions?"))  # Follow-up

qa.reset()  # Start new conversation

Custom RAG with Your LLM

from graphor import Graphor
from openai import OpenAI

graphor_client = Graphor()
openai_client = OpenAI()

def custom_rag(question: str, file_names: list[str] | None = None) -> dict:
    """Custom RAG pipeline with OpenAI."""
    # 1. Retrieve relevant chunks
    chunks = graphor_client.sources.retrieve_chunks(
        query=question,
        file_names=file_names
    )
    
    # 2. Build context
    context = "\n\n".join([
        f"[{c.file_name}, Page {c.page_number}]\n{c.text}"
        for c in chunks.chunks or []
    ])
    
    # 3. Generate answer with your LLM
    response = openai_client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": "Answer questions based on the provided context. Cite sources."
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {question}"
            }
        ]
    )
    
    return {
        "answer": response.choices[0].message.content,
        "sources": [
            {"file": c.file_name, "page": c.page_number}
            for c in chunks.chunks or []
        ]
    }


# Usage
result = custom_rag("What are the payment terms?", ["contract.pdf"])
print(result["answer"])
print("Sources:", result["sources"])

Support and Resources

Getting Help

Next Steps

Ready to start building with the Graphor SDK? Choose your path:

For Beginners

For Advanced Users

The Graphor Python SDK provides a powerful foundation for building intelligent, document-driven applications. With comprehensive support for document ingestion, conversational AI, structured extraction, and custom RAG pipelines, the SDK gives you the flexibility to build sophisticated AI workflows that scale from simple document search to complex analysis systems.