Skip to main content
The Graphor SDKs provide convenient access to the Graphor REST API from Python and TypeScript/JavaScript applications. Both libraries include type definitions for all request params and response fields.
The Python SDK supports Python 3.9+ and offers both synchronous and asynchronous clients.
This page provides a comprehensive overview of the SDK. It covers the full lifecycle:
  1. Data Ingestion (Sources): Ingest, poll status, list, get elements, and manage documents by file_id
  2. Document Chat: Ask questions about your documents with conversational memory
  3. Data Extraction: Extract structured data using JSON Schema
  4. Prebuilt RAG: Retrieve relevant document chunks for custom RAG pipelines

Python SDK Repository

View the Python SDK source code, report issues, and contribute.

TypeScript SDK Repository

View the TypeScript SDK source code, report issues, and contribute.

Installation

Install the Graphor SDK from PyPI:
pip install graphor
Python 3.9 or higher is required.

Data Ingestion (Sources)

The Sources methods cover the full ingestion lifecycle:

Ingest Source

Ingest documents from files, URLs, GitHub, and YouTube (returns build_id; poll for file_id)

Reprocess Source

Reprocess an existing source with a different partition method

List Sources

Retrieve all sources with status and metadata

List Source Elements

Retrieve structured elements/partitions from processed sources

Delete Source

Permanently remove sources from your project

Document Chat

Once your data is ingested, use the Chat method to ask questions:

Chat with Documents

Ask natural language questions about your documents with conversational memory and structured outputs

Data Extraction

Extract specific structured data from your documents using schemas:

Extract Structured Data

Extract structured information from documents using JSON Schema and natural language instructions

Prebuilt RAG

Build custom RAG pipelines with semantic document retrieval:

Retrieve Document Chunks

Retrieve relevant document chunks using semantic search for custom LLM integration

What “Data Ingestion” includes

  • Ingest: Create a new source (file, URL, GitHub, YouTube); returns build_id; poll get build status until ready, then use file_id
  • Reprocess: Reprocess an existing source with a different partition method (optional)
  • List: Monitor status and metadata; optionally filter by file_ids
  • Get elements: Retrieve structured elements/partitions by file_id after processing
  • Delete: Remove a source by file_id

Authentication

All SDK methods require authentication using API tokens. You can provide your API key in two ways: Set the GRAPHOR_API_KEY environment variable:
export GRAPHOR_API_KEY="grlm_your_api_key_here"
from graphor import Graphor

# API key is automatically read from GRAPHOR_API_KEY
client = Graphor()

Direct Initialization

from graphor import Graphor

client = Graphor(api_key="grlm_your_api_key_here")
Learn how to generate and manage API tokens in the API Tokens guide.

Token Security

  • Never expose tokens in client-side code or public repositories
  • Use environment variables to store tokens securely
  • Rotate tokens regularly for enhanced security
  • Use different tokens for different environments (dev/staging/prod)

Async Usage

Simply import AsyncGraphor instead of Graphor and use await with each API call:
import asyncio
from graphor import AsyncGraphor

client = AsyncGraphor()

async def main():
    build_id = await client.sources.ingest_file(file=b"raw file contents")
    print(build_id)

asyncio.run(main())

Available Methods

Sources

MethodDescription
client.sources.ingest_file()Ingest a local file (returns build_id)
client.sources.ingest_url()Ingest from a web URL
client.sources.ingest_github()Ingest from GitHub
client.sources.ingest_youtube()Ingest from YouTube
client.sources.get_build_status()Poll build status; returns file_id when ready
client.sources.reprocess()Reprocess a source by file_id (returns build_id)
client.sources.list()List all sources (optional file_ids filter)
client.sources.get_elements()Get parsed elements by file_id
client.sources.delete()Delete a source by file_id

Chat & Extraction

MethodDescription
client.sources.ask()Ask questions about your documents
client.sources.extract()Extract structured data using JSON Schema
client.sources.retrieve_chunks()Retrieve relevant chunks for custom RAG

Complete Workflow Example

Here’s the full “happy path”: ingest → get_build_status (poll) → list → get_elements → chat/extract/rag; optionally reprocess by file_id.

1. Ingest a source

from pathlib import Path
from graphor import Graphor
import time

client = Graphor()

# Ingest returns build_id; poll get_build_status until ready
build_id = client.sources.ingest_file(file=Path("./document.pdf"))
while True:
    status = client.sources.get_build_status(build_id)
    if status.success and getattr(status, "file_id", None):
        file_id = status.file_id
        print(f"Ready. file_id: {file_id}")
        break
    time.sleep(2)

2. Reprocess (optional)

build_id = client.sources.reprocess(file_id=file_id, method="balanced")
print(f"Reprocessing: {build_id}")

3. List sources

sources = client.sources.list()
for s in sources:
    print(f"{s.file_id} {s.file_name}: {s.status}")
target = next((s for s in sources if s.file_id == file_id), None)

4. Get elements (after processing)

elements = client.sources.get_elements(file_id=file_id, page=1, page_size=50)
print(f"Total elements: {elements.total}")
for item in elements.items:
    print(f"  [{item.element_type}] {item.text[:100]}...")

5. Ask questions (Chat)

response = client.sources.ask(
    question="What are the main topics in this document?",
    file_ids=[file_id]  # Optional: scope to specific sources
)
print(response.answer)
follow_up = client.sources.ask(
    question="Can you elaborate on the first topic?",
    conversation_id=response.conversation_id
)
print(follow_up.answer)

6. Extract data

result = client.sources.extract(
    file_ids=[file_id],
    user_instruction="Extract the invoice number and total amount.",
    output_schema={
        "type": "object",
        "properties": {
            "invoice_number": {"type": "string"},
            "total_amount": {"type": "number"}
        },
        "required": ["invoice_number", "total_amount"]
    }
)
print(result.structured_output)

7. Retrieve chunks (Prebuilt RAG)

chunks = client.sources.retrieve_chunks(
    query="What are the payment terms?",
    file_ids=[file_id]
)
for chunk in chunks.chunks:
    print(f"[{chunk.file_id}, Page {chunk.page_number}]", chunk.text[:80])

Integration Patterns

Complete SDK Client Wrapper

from graphor import Graphor, AsyncGraphor
import graphor
from pathlib import Path
from typing import Any
import os


class GraphorSDK:
    """Complete wrapper for common Graphor operations."""
    
    def __init__(self, api_key: str | None = None):
        self.client = Graphor(api_key=api_key) if api_key else Graphor()
    
    # ==================== Sources ====================
    
    def ingest_file(self, file_path: str | Path) -> str:
        """Ingest a file; returns build_id. Poll get_build_status for file_id."""
        return self.client.sources.ingest_file(file=Path(file_path))
    
    def get_build_status(self, build_id: str) -> Any:
        """Poll build status; when success, response has file_id."""
        return self.client.sources.get_build_status(build_id)
    
    def ingest_url(self, url: str, crawl: bool = False) -> str:
        """Ingest from a URL; returns build_id."""
        return self.client.sources.ingest_url(url=url, crawl_urls=crawl)
    
    def reprocess(self, file_id: str, method: str = "balanced") -> str:
        """Reprocess a source; returns build_id."""
        return self.client.sources.reprocess(file_id=file_id, method=method)
    
    def list_sources(self) -> list[dict[str, Any]]:
        """List all sources."""
        sources = self.client.sources.list()
        return [
            {"file_id": s.file_id, "file_name": s.file_name, "status": s.status}
            for s in sources
        ]
    
    def get_elements(
        self, file_id: str, page: int = 1, page_size: int = 50
    ) -> dict[str, Any]:
        """Get parsed elements from a source."""
        result = self.client.sources.get_elements(
            file_id=file_id, page=page, page_size=page_size
        )
        return {
            "total": result.total,
            "page": result.page,
            "total_pages": result.total_pages,
            "items": [
                {"type": item.element_type, "content": item.text, "page": item.page_number}
                for item in result.items
            ]
        }
    
    def delete(self, file_id: str) -> dict[str, Any]:
        """Delete a source by file_id."""
        result = self.client.sources.delete(file_id=file_id)
        return {"message": result.message}
    
    # ==================== Chat ====================
    
    def ask(
        self,
        question: str,
        file_ids: list[str] | None = None,
        conversation_id: str | None = None
    ) -> dict[str, Any]:
        """Ask a question about documents."""
        kwargs = {"question": question}
        if file_ids:
            kwargs["file_ids"] = file_ids
        if conversation_id:
            kwargs["conversation_id"] = conversation_id
        response = self.client.sources.ask(**kwargs)
        return {"answer": response.answer, "conversation_id": response.conversation_id}
    
    # ==================== Extraction ====================
    
    def extract(
        self, file_ids: list[str], instruction: str, schema: dict[str, Any]
    ) -> dict[str, Any]:
        """Extract structured data from documents."""
        result = self.client.sources.extract(
            file_ids=file_ids,
            user_instruction=instruction,
            output_schema=schema
        )
        return {"data": result.structured_output, "raw": result.raw_json}
    
    # ==================== RAG ====================
    
    def retrieve(
        self, query: str, file_ids: list[str] | None = None
    ) -> dict[str, Any]:
        """Retrieve relevant chunks for RAG."""
        kwargs = {"query": query}
        if file_ids:
            kwargs["file_ids"] = file_ids
        result = self.client.sources.retrieve_chunks(**kwargs)
        return {
            "query": result.query,
            "total": result.total,
            "chunks": [
                {"text": c.text, "file_id": c.file_id, "page": c.page_number, "score": c.score}
                for c in result.chunks or []
            ]
        }


# Usage example
sdk = GraphorSDK()
import time

def full_workflow(file_path: str):
    """Complete ingestion, chat, and extraction workflow."""
    try:
        build_id = sdk.ingest_file(file_path)
        while True:
            status = sdk.get_build_status(build_id)
            if status.success and getattr(status, "file_id", None):
                file_id = status.file_id
                break
            time.sleep(2)
        print(f"Ready: {file_id}")
        chat_result = sdk.ask("Summarize this document", [file_id])
        print(f"Summary: {chat_result['answer']}")
        extract_result = sdk.extract(
            [file_id],
            "Extract key information",
            {"type": "object", "properties": {"title": {"type": "string"}, "summary": {"type": "string"}}}
        )
        print(f"Extracted: {extract_result['data']}")
        return {"success": True, "file_id": file_id}
    except graphor.APIStatusError as e:
        print(f"Error: {e}")
        return {"success": False, "error": str(e)}

Async Integration

import asyncio
from graphor import AsyncGraphor
import graphor


class AsyncGraphorSDK:
    """Async wrapper for Graphor operations."""
    
    def __init__(self, api_key: str | None = None):
        self.client = AsyncGraphor(api_key=api_key) if api_key else AsyncGraphor()
    
    async def process_multiple(
        self, file_paths: list[str], method: str = "balanced"
    ) -> list[dict]:
        """Ingest multiple files concurrently; poll for file_id when needed."""
        from pathlib import Path
        import time

        async def ingest_one(file_path: str) -> dict:
            try:
                build_id = await self.client.sources.ingest_file(file=Path(file_path))
                while True:
                    status = await self.client.sources.get_build_status(build_id)
                    if status.success and getattr(status, "file_id", None):
                        return {"file": file_path, "status": "success", "file_id": status.file_id}
                    await asyncio.sleep(2)
            except graphor.APIStatusError as e:
                return {"file": file_path, "status": "failed", "error": str(e)}

        return await asyncio.gather(*[ingest_one(fp) for fp in file_paths])

    async def batch_ask(
        self, questions: list[str], file_ids: list[str] | None = None
    ) -> list[dict]:
        """Ask multiple questions concurrently."""
        async def ask_one(question: str) -> dict:
            response = await self.client.sources.ask(
                question=question, file_ids=file_ids
            )
            return {"question": question, "answer": response.answer}
        return await asyncio.gather(*[ask_one(q) for q in questions])


# Usage
async def main():
    sdk = AsyncGraphorSDK()
    
    # Process multiple files
    results = await sdk.process_multiple([
        "doc1.pdf",
        "doc2.pdf",
        "doc3.pdf"
    ])
    
    for r in results:
        status = "OK" if r["status"] == "success" else "FAIL"
        print(f"{status} {r['file']}")
    
    # Ask multiple questions
    answers = await sdk.batch_ask([
        "What is the main topic?",
        "Who are the key people mentioned?",
        "What are the conclusions?"
    ])
    
    for a in answers:
        print(f"Q: {a['question']}")
        print(f"A: {a['answer']}\n")

asyncio.run(main())

Error Handling

The SDK provides typed exceptions for different error scenarios:
import graphor
from graphor import Graphor

client = Graphor()

try:
    build_id = client.sources.ingest_file(file=b"raw file contents")
except graphor.APIConnectionError as e:
    print("The server could not be reached")
    print(e.__cause__)
except graphor.RateLimitError as e:
    print("Rate limit exceeded. Back off and retry.")
except graphor.BadRequestError as e:
    print(f"Invalid request: {e}")
except graphor.AuthenticationError as e:
    print(f"Invalid API key: {e}")
except graphor.NotFoundError as e:
    print(f"Resource not found: {e}")
except graphor.APIStatusError as e:
    print(f"API error (status {e.status_code}): {e}")

Error Types

Status CodeError TypeDescription
400BadRequestErrorInvalid parameters or malformed request
401AuthenticationErrorInvalid or missing API key
403PermissionDeniedErrorAccess denied to resource
404NotFoundErrorResource doesn’t exist
422UnprocessableEntityErrorValidation error
429RateLimitErrorToo many requests
≥500InternalServerErrorServer-side error
N/AAPIConnectionErrorNetwork connectivity issues
N/AAPITimeoutErrorRequest timed out

Configuration

Retries

Certain errors are automatically retried 2 times by default with exponential backoff:
from graphor import Graphor

# Configure default retries
client = Graphor(max_retries=0)  # Disable retries

# Or per-request
client.with_options(max_retries=5).sources.ingest_file(file=b"...")

Timeouts

By default, requests time out after 1 minute:
from graphor import Graphor

# Configure default timeout (in seconds)
client = Graphor(timeout=120.0)  # 2 minutes

# Or per-request
client.with_options(timeout=300.0).sources.reprocess(
    file_id="file_abc123",
    method="agentic"
)

Using aiohttp for Better Concurrency (Python only)

For high-concurrency async operations in Python, use the aiohttp client:
import asyncio
from graphor import AsyncGraphor, DefaultAioHttpClient

async def main():
    async with AsyncGraphor(
        http_client=DefaultAioHttpClient()
    ) as client:
        # Your async operations here
        sources = await client.sources.list()
        print(f"Found {len(sources)} sources")

# Install aiohttp first: pip install graphor[aiohttp]
asyncio.run(main())

Rate Limits and Best Practices

Performance Guidelines

  • Batch Operations: Process multiple files sequentially or with controlled concurrency
  • Async Processing: Use AsyncGraphor (Python) or Promise.all (TypeScript) for concurrent operations
  • Retry Logic: The SDK handles retries automatically; configure max_retries / maxRetries as needed
  • Timeout Handling: Increase timeouts for large documents or complex processing

Best Practices

from graphor import Graphor
import graphor
import time

client = Graphor(
    max_retries=3,
    timeout=120.0  # 2 minutes for processing operations
)

def robust_upload(file_path: str, max_attempts: int = 3) -> dict | None:
    """Ingest with custom retry logic; poll until ready and return file_id."""
    from pathlib import Path
    import time
    for attempt in range(max_attempts):
        try:
            build_id = client.sources.ingest_file(file=Path(file_path))
            while True:
                status = client.sources.get_build_status(build_id)
                if status.success and getattr(status, "file_id", None):
                    return {"success": True, "file_id": status.file_id}
                time.sleep(2)
        except graphor.RateLimitError:
            wait_time = 2 ** attempt  # Exponential backoff
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)
        except graphor.APIConnectionError as e:
            print(f"Connection error (attempt {attempt + 1}): {e}")
            time.sleep(1)
        except graphor.APIStatusError as e:
            print(f"API error: {e}")
            return {"success": False, "error": str(e)}
    
    return {"success": False, "error": "Max retries exceeded"}

Common Use Cases

Document Processing Pipeline

from graphor import Graphor
from pathlib import Path
import graphor

client = Graphor(timeout=300.0)

import time

def document_pipeline(directory: str, partition_method: str = "balanced"):
    """Ingest all PDFs in a directory; poll until ready."""
    results = []
    for file_path in Path(directory).glob("*.pdf"):
        try:
            build_id = client.sources.ingest_file(file=file_path)
            while True:
                status = client.sources.get_build_status(build_id)
                if status.success and getattr(status, "file_id", None):
                    results.append({"file": str(file_path), "status": "success", "file_id": status.file_id})
                    print(f"Ready: {status.file_id}")
                    break
                time.sleep(2)
        except graphor.APIStatusError as e:
            results.append({"file": str(file_path), "status": "failed", "error": str(e)})
    print(f"\nProcessed {sum(1 for r in results if r['status'] == 'success')}/{len(results)} files")
    return results

Q&A System

from graphor import Graphor

client = Graphor()

class DocumentQA:
    """Simple Q&A system with conversation history."""
    
    def __init__(self, file_ids: list[str] | None = None):
        self.file_ids = file_ids
        self.conversation_id = None
    
    def ask(self, question: str) -> str:
        """Ask a question, maintaining conversation history."""
        response = client.sources.ask(
            question=question,
            file_ids=self.file_ids,
            conversation_id=self.conversation_id
        )
        
        # Store conversation ID for follow-up questions
        self.conversation_id = response.conversation_id
        
        return response.answer
    
    def reset(self):
        """Reset conversation history."""
        self.conversation_id = None


# Usage (file_ids from list())
qa = DocumentQA(file_ids=["file_abc123"])

print(qa.ask("What is this document about?"))
print(qa.ask("What are the main findings?"))  # Follow-up
print(qa.ask("Can you summarize the conclusions?"))  # Follow-up

qa.reset()  # Start new conversation

Custom RAG with Your LLM

from graphor import Graphor
from openai import OpenAI

graphor_client = Graphor()
openai_client = OpenAI()

def custom_rag(question: str, file_ids: list[str] | None = None) -> dict:
    """Custom RAG pipeline with OpenAI."""
    chunks = graphor_client.sources.retrieve_chunks(
        query=question, file_ids=file_ids
    )
    context = "\n\n".join([
        f"[{c.file_id}, Page {c.page_number}]\n{c.text}"
        for c in chunks.chunks or []
    ])
    response = openai_client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "Answer questions based on the provided context. Cite sources."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
        ]
    )
    return {
        "answer": response.choices[0].message.content,
        "sources": [{"file_id": c.file_id, "page": c.page_number} for c in chunks.chunks or []]
    }

# Usage (pass file_ids from list())
result = custom_rag("What are the payment terms?", ["file_abc123"])
print(result["answer"])
print("Sources:", result["sources"])

Support and Resources

Getting Help

Contact Support

Direct support for technical questions and issues

API Tokens Guide

Learn how to generate and manage authentication tokens

Data Ingestion Guide

Best practices for document upload and processing

REST API Reference

Full REST API documentation for advanced use cases

Next Steps

Ready to start building with the Graphor SDK? Choose your path:

For Beginners

Ingest Sources

Ingest documents from files, URLs, GitHub, and YouTube; poll for file_id

Chat with Documents

Ask natural language questions about your documents

API Tokens

Set up authentication for API access

For Advanced Users

Data Extraction

Extract structured data using JSON Schema

Prebuilt RAG

Build custom RAG pipelines with semantic search

Reprocess Source

Reprocess sources with different partition methods

List Elements

Access structured document elements and metadata
The Graphor SDKs provide a powerful foundation for building intelligent, document-driven applications. With comprehensive support for document ingestion, conversational AI, structured extraction, and custom RAG pipelines, both the Python and TypeScript SDKs give you the flexibility to build sophisticated AI workflows that scale from simple document search to complex analysis systems.