Skip to main content
The list method allows you to retrieve information about all documents in your Graphor project. This method provides comprehensive details about each document’s status, processing information, and metadata, enabling you to monitor and manage your document collection programmatically.

Method Overview

Sync Method

client.sources.list()

Async Method

await client.sources.list()

Method Signature

client.sources.list(
    timeout: float | None = None
) -> list[PublicSource]

Parameters

ParameterTypeDescriptionRequired
timeoutfloatRequest timeout in secondsNo
This method returns all sources in the project associated with your API key. No filtering parameters are required.

Response Object

The method returns a list[PublicSource] — a list of PublicSource objects with the following properties:
PropertyTypeDescription
statusstrCurrent processing status (see status values below)
messagestrHuman-readable status description
file_idstr | NoneUnique identifier for the source (use this for subsequent API calls)
file_namestrName of the source file or identifier
file_sizeintSize of the file in bytes (0 for URLs)
file_typestrFile extension or type
file_sourcestrSource type: local file, url, github, or youtube
project_idstrUUID of the project
project_namestrName of the project
partition_methodstr | NoneProcessing method used or applied

Status Values

Description: Document has been uploaded but processing hasn’t started yetNext Steps: Processing will begin automatically or can be triggered manually
Description: Document is currently being processedDuration: Can take from minutes to hours depending on document complexity and method
Description: Document has been successfully processed and is ready for useReady for: Chunking, retrieval, and integration in RAG pipelines
Description: Document processing encountered an errorActions: Check document format, try different processing method, or contact support
Description: Status information is not availableCauses: System error or data inconsistency

File Source Types

Source TypeDescriptionTypical Use Cases
local fileFiles uploaded directly from your computerDocuments, PDFs, images, spreadsheets
urlContent imported from web URLsWeb pages, articles, online documents
githubContent imported from GitHub repositoriesCode documentation, README files, wikis
youtubeContent imported from YouTube videosVideo transcripts, educational content

Code Examples

Basic Usage

from graphor import Graphor

client = Graphor()

# List all sources in the project
sources = client.sources.list()

print(f"Found {len(sources)} sources")

for source in sources:
    print(f"{source.file_name} - {source.status}")

Async Usage

import asyncio
from graphor import AsyncGraphor

async def list_all_sources():
    client = AsyncGraphor()
    
    sources = await client.sources.list()
    
    print(f"Found {len(sources)} sources")
    
    for source in sources:
        print(f"{source.file_name} - {source.status}")
    
    return sources

asyncio.run(list_all_sources())

Filter by Status

from graphor import Graphor

client = Graphor()

sources = client.sources.list()

# Filter by status
completed = [s for s in sources if s.status == "Completed"]
processing = [s for s in sources if s.status == "Processing"]
failed = [s for s in sources if s.status == "Failed"]
new = [s for s in sources if s.status == "New"]

print(f"Completed: {len(completed)}")
print(f"Processing: {len(processing)}")
print(f"Failed: {len(failed)}")
print(f"New: {len(new)}")

Filter by File Type

from graphor import Graphor

client = Graphor()

sources = client.sources.list()

# Filter by file type
pdf_files = [s for s in sources if s.file_type == "pdf"]
docx_files = [s for s in sources if s.file_type == "docx"]
images = [s for s in sources if s.file_type in ("png", "jpg", "jpeg")]

print(f"PDFs: {len(pdf_files)}")
print(f"Word docs: {len(docx_files)}")
print(f"Images: {len(images)}")

# List all PDF files
for pdf in pdf_files:
    size_mb = pdf.file_size / (1024 * 1024)
    print(f"  {pdf.file_name} ({size_mb:.2f} MB)")

Filter by Source Type

from graphor import Graphor

client = Graphor()

sources = client.sources.list()

# Filter by source type
local_files = [s for s in sources if s.file_source == "local file"]
url_sources = [s for s in sources if s.file_source == "url"]
github_sources = [s for s in sources if s.file_source == "github"]
youtube_sources = [s for s in sources if s.file_source == "youtube"]

print(f"Local files: {len(local_files)}")
print(f"URL sources: {len(url_sources)}")
print(f"GitHub repos: {len(github_sources)}")
print(f"YouTube videos: {len(youtube_sources)}")

Error Handling

import graphor
from graphor import Graphor

client = Graphor()

try:
    sources = client.sources.list()
    print(f"Found {len(sources)} sources")
    
except graphor.AuthenticationError as e:
    print(f"Invalid API key: {e}")
    
except graphor.PermissionDeniedError as e:
    print(f"Access denied to project: {e}")
    
except graphor.RateLimitError as e:
    print(f"Rate limit exceeded. Please wait and retry: {e}")
    
except graphor.APIConnectionError as e:
    print(f"Connection error: {e}")
    
except graphor.InternalServerError as e:
    print(f"Server error: {e}")

Advanced Examples

Source Analysis

Analyze your project’s sources with detailed statistics:
from graphor import Graphor
from collections import defaultdict

client = Graphor()

def analyze_sources():
    """Analyze sources by status, type, and size."""
    sources = client.sources.list()
    
    status_counts = defaultdict(int)
    type_counts = defaultdict(int)
    source_counts = defaultdict(int)
    total_size = 0
    
    for source in sources:
        status_counts[source.status] += 1
        type_counts[source.file_type] += 1
        source_counts[source.file_source] += 1
        total_size += source.file_size
    
    return {
        "total_sources": len(sources),
        "total_size_mb": round(total_size / (1024 * 1024), 2),
        "by_status": dict(status_counts),
        "by_type": dict(type_counts),
        "by_source": dict(source_counts)
    }

# Usage
analysis = analyze_sources()
print(f"Total sources: {analysis['total_sources']}")
print(f"Total size: {analysis['total_size_mb']} MB")
print(f"By status: {analysis['by_status']}")
print(f"By type: {analysis['by_type']}")
print(f"By source: {analysis['by_source']}")

Status Monitoring

Monitor the processing status of your documents:
from graphor import Graphor

client = Graphor()

def monitor_processing_status():
    """Monitor and report on processing status."""
    sources = client.sources.list()
    
    processing = [s for s in sources if s.status == "Processing"]
    failed = [s for s in sources if s.status == "Failed"]
    completed = [s for s in sources if s.status == "Completed"]
    new = [s for s in sources if s.status == "New"]
    
    print("=" * 50)
    print("Processing Status Report")
    print("=" * 50)
    print(f"✅ Completed: {len(completed)}")
    print(f"⏳ Processing: {len(processing)}")
    print(f"🆕 New: {len(new)}")
    print(f"❌ Failed: {len(failed)}")
    print("=" * 50)
    
    # List files currently processing
    if processing:
        print("\n📋 Currently Processing:")
        for source in processing:
            print(f"  • {source.file_name} ({source.partition_method})")
    
    # List failed files that need attention
    if failed:
        print("\n⚠️ Failed Files (need attention):")
        for source in failed:
            print(f"  • {source.file_name}: {source.message}")
    
    return {
        "completed": completed,
        "processing": processing,
        "new": new,
        "failed": failed
    }

# Usage
status = monitor_processing_status()

Find Source by Name

Search for a specific source by filename:
from graphor import Graphor

client = Graphor()

def find_source(file_name: str):
    """Find a source by exact file name."""
    sources = client.sources.list()
    
    for source in sources:
        if source.file_name == file_name:
            return source
    
    return None

def search_sources(query: str):
    """Search sources by partial name match."""
    sources = client.sources.list()
    
    matches = [s for s in sources if query.lower() in s.file_name.lower()]
    return matches

# Usage
# Find exact match
source = find_source("document.pdf")
if source:
    print(f"Found: {source.file_name} - {source.status}")
else:
    print("Source not found")

# Search by partial name
matches = search_sources("report")
print(f"Found {len(matches)} sources matching 'report'")
for match in matches:
    print(f"  • {match.file_name}")

Project Health Check

Perform a comprehensive health check of your project:
from graphor import Graphor
from datetime import datetime
import graphor

client = Graphor()

def project_health_check():
    """Perform a comprehensive health check of the project."""
    try:
        sources = client.sources.list()
        
        health_report = {
            "timestamp": datetime.now().isoformat(),
            "total_sources": len(sources),
            "status_summary": {},
            "issues": [],
            "recommendations": []
        }
        
        # Analyze status distribution
        for source in sources:
            status = source.status or "unknown"
            health_report["status_summary"][status] = health_report["status_summary"].get(status, 0) + 1
            
            # Identify issues
            if source.status == "Failed":
                health_report["issues"].append(f"Failed processing: {source.file_name}")
            elif source.status == "unknown":
                health_report["issues"].append(f"Unknown status: {source.file_name}")
        
        # Generate recommendations
        failed_count = health_report["status_summary"].get("Failed", 0)
        if failed_count > 0:
            health_report["recommendations"].append(
                f"Reprocess {failed_count} failed documents using client.sources.parse()"
            )
        
        processing_count = health_report["status_summary"].get("Processing", 0)
        if processing_count > 5:
            health_report["recommendations"].append(
                "Monitor processing queue - high volume detected"
            )
        
        new_count = health_report["status_summary"].get("New", 0)
        if new_count > 0:
            health_report["recommendations"].append(
                f"{new_count} documents awaiting processing"
            )
        
        return health_report
        
    except graphor.APIStatusError as e:
        return {
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }

# Usage
health = project_health_check()
print(f"Project Health Report")
print(f"Timestamp: {health['timestamp']}")
print(f"Total Sources: {health['total_sources']}")
print(f"Status Summary: {health['status_summary']}")

if health.get("issues"):
    print(f"\nIssues:")
    for issue in health["issues"]:
        print(f"  ⚠️ {issue}")

if health.get("recommendations"):
    print(f"\nRecommendations:")
    for rec in health["recommendations"]:
        print(f"  💡 {rec}")

Async Batch Operations

Use the list to perform batch operations efficiently:
import asyncio
from graphor import AsyncGraphor
import graphor

async def reprocess_failed_sources(method: str = "hi_res"):
    """Find and reprocess all failed sources."""
    client = AsyncGraphor(timeout=300.0)
    
    # Get all sources
    sources = await client.sources.list()
    
    # Find failed sources
    failed = [s for s in sources if s.status == "Failed"]
    
    if not failed:
        print("No failed sources to reprocess")
        return []
    
    print(f"Found {len(failed)} failed sources to reprocess")
    
    # Reprocess each failed source
    results = []
    for source in failed:
        try:
            print(f"Reprocessing: {source.file_name}...")
            result = await client.sources.parse(
                file_name=source.file_name,
                partition_method=method
            )
            results.append({"file_name": source.file_name, "status": "success"})
            print(f"  ✅ {source.file_name} reprocessed")
        except graphor.APIStatusError as e:
            results.append({"file_name": source.file_name, "status": "failed", "error": str(e)})
            print(f"  ❌ {source.file_name} failed: {e}")
    
    return results

# Usage
results = asyncio.run(reprocess_failed_sources("hi_res"))

Source Management Class

A complete class for managing sources:
from graphor import Graphor
import graphor
from dataclasses import dataclass
from typing import Optional

@dataclass
class SourceSummary:
    total: int
    completed: int
    processing: int
    failed: int
    new: int
    total_size_mb: float

class SourceManager:
    def __init__(self, api_key: Optional[str] = None):
        self.client = Graphor(api_key=api_key) if api_key else Graphor()
        self._cache = None
    
    def refresh(self):
        """Refresh the sources cache."""
        self._cache = self.client.sources.list()
        return self._cache
    
    @property
    def sources(self):
        """Get sources (cached)."""
        if self._cache is None:
            self.refresh()
        return self._cache
    
    def get_summary(self) -> SourceSummary:
        """Get a summary of all sources."""
        sources = self.sources
        
        total_size = sum(s.file_size for s in sources)
        
        return SourceSummary(
            total=len(sources),
            completed=len([s for s in sources if s.status == "Completed"]),
            processing=len([s for s in sources if s.status == "Processing"]),
            failed=len([s for s in sources if s.status == "Failed"]),
            new=len([s for s in sources if s.status == "New"]),
            total_size_mb=round(total_size / (1024 * 1024), 2)
        )
    
    def find_by_name(self, name: str):
        """Find a source by exact name."""
        for source in self.sources:
            if source.file_name == name:
                return source
        return None
    
    def search(self, query: str):
        """Search sources by partial name match."""
        return [s for s in self.sources if query.lower() in s.file_name.lower()]
    
    def filter_by_status(self, status: str):
        """Filter sources by status."""
        return [s for s in self.sources if s.status == status]
    
    def filter_by_type(self, file_type: str):
        """Filter sources by file type."""
        return [s for s in self.sources if s.file_type == file_type]
    
    def get_failed(self):
        """Get all failed sources."""
        return self.filter_by_status("Failed")
    
    def get_processing(self):
        """Get all processing sources."""
        return self.filter_by_status("Processing")

# Usage
manager = SourceManager()

# Get summary
summary = manager.get_summary()
print(f"Total: {summary.total}, Completed: {summary.completed}, Failed: {summary.failed}")

# Find a specific source
source = manager.find_by_name("document.pdf")
if source:
    print(f"Found: {source.file_name} - {source.status}")

# Search sources
matches = manager.search("report")
print(f"Found {len(matches)} matches for 'report'")

# Get failed sources
failed = manager.get_failed()
print(f"Failed sources: {len(failed)}")

Continuous Monitoring

Set up continuous monitoring of your sources:
import time
from graphor import Graphor
import graphor

client = Graphor()

def continuous_monitoring(interval_seconds: int = 60, max_iterations: int = None):
    """Continuously monitor source processing status."""
    iteration = 0
    
    while max_iterations is None or iteration < max_iterations:
        try:
            sources = client.sources.list()
            
            processing = len([s for s in sources if s.status == "Processing"])
            failed = len([s for s in sources if s.status == "Failed"])
            completed = len([s for s in sources if s.status == "Completed"])
            
            print(f"[{time.strftime('%H:%M:%S')}] "
                  f"✅ {completed} | ⏳ {processing} | ❌ {failed}")
            
            # Alert if new failures detected
            if failed > 0:
                failed_sources = [s for s in sources if s.status == "Failed"]
                print(f"  ⚠️ Failed sources: {[s.file_name for s in failed_sources]}")
            
            time.sleep(interval_seconds)
            iteration += 1
            
        except graphor.APIConnectionError as e:
            print(f"[{time.strftime('%H:%M:%S')}] Connection error: {e}")
            time.sleep(interval_seconds)
        except KeyboardInterrupt:
            print("\nMonitoring stopped")
            break

# Usage (monitor every 30 seconds, 10 times)
# continuous_monitoring(interval_seconds=30, max_iterations=10)

Error Reference

Error TypeStatus CodeDescription
AuthenticationError401Invalid or missing API key
PermissionDeniedError403Access denied to the specified project
RateLimitError429Too many requests, please retry after waiting
InternalServerError≥500Server-side error retrieving sources
APIConnectionErrorN/ANetwork connectivity issues
APITimeoutErrorN/ARequest timed out

Best Practices

Performance Optimization

  • Cache results: Store the response locally when making multiple queries
  • Filter client-side: The SDK returns all sources; filter in your code as needed
  • Use async: For applications that need to perform other work while waiting
# Example: Cache sources for multiple operations
sources = client.sources.list()

# Now perform multiple filter operations without re-fetching
pdfs = [s for s in sources if s.file_type == "pdf"]
completed = [s for s in sources if s.status == "Completed"]
large_files = [s for s in sources if s.file_size > 10 * 1024 * 1024]

Data Management

  • Track processing times: Monitor how long documents take to process
  • Identify patterns: Look for file types or sizes that frequently fail
  • Plan capacity: Use file counts and sizes for storage planning

Error Handling

  • Implement retries: Handle temporary network issues with the SDK’s built-in retry mechanism
  • Monitor status: Regularly check for failed processing jobs
  • Graceful degradation: Have fallback plans when the API is unavailable
from graphor import Graphor

# Configure retries
client = Graphor(max_retries=5)

# Or per-request
sources = client.with_options(max_retries=5).sources.list()

Troubleshooting

Causes: Large number of sources, server load, or network issuesSolutions:
  • Implement request timeouts
  • Use response caching for non-critical applications
  • Consider filtering client-side after initial fetch
client = Graphor(timeout=60.0)  # Increase timeout
Causes: No sources in project, wrong API key, or permission issuesSolutions:
  • Verify you have uploaded documents to your project
  • Check that your API key is correct and active
  • Ensure you’re accessing the right project
Causes: Processing lag, system sync issues, or database inconsistenciesSolutions:
  • Wait a few minutes and retry the request
  • Call sources.list() again to refresh the data
  • Contact support if inconsistencies persist
Causes: Invalid token, expired token, or revoked accessSolutions:
  • Verify API key format and validity
  • Check token hasn’t been revoked in dashboard
  • Generate a new API key if necessary

Next Steps

After successfully listing your sources: