Skip to main content
The delete method (same name as the API endpoint) permanently removes a source from your project. You must pass the source’s file_id (from list or get build status). The operation is irreversible and removes all associated data including partition nodes and metadata.

Method overview

client.sources.delete()

Method signature

client.sources.delete(
    file_id: str,                    # Required
    timeout: float | None = None
) -> SourceDeleteResponse

Parameters

ParameterTypeDescriptionRequired
file_idstrUnique identifier of the source to deleteYes
timeoutfloatRequest timeout in secondsNo

Important considerations

Warning: This operation is irreversible.
  • All document content and metadata are permanently removed
  • Associated partition nodes and embeddings are deleted
  • Flows using this document are automatically updated
  • No backup or recovery is available
  • Use the source’s unique file_id from list or get build status
  • Do not rely on file name; only file_id is supported
  • Dataset nodes using this document are updated automatically
  • Successor nodes in affected flows may be marked as outdated
  • Flow execution may be impacted until nodes are reconfigured

Response

The method returns a response object with:
PropertyTypeDescription
statusstrDeletion result (e.g. “success”)
messagestrHuman-readable confirmation
file_idstrIdentifier of the deleted source
file_namestrName of the deleted file
project_idstrUUID of the project
project_namestrName of the project

Code examples

Basic usage

from graphor import Graphor

client = Graphor()
file_id = "file_abc123"  # from list() or get_build_status

result = client.sources.delete(file_id=file_id)

print(f"Status: {result.status}")
print(f"Deleted: {result.file_name}")

Async usage

import asyncio
from graphor import AsyncGraphor

async def delete_document(file_id: str):
    client = AsyncGraphor()
    result = await client.sources.delete(file_id=file_id)
    print(f"Deleted: {result.file_name}")
    return result

asyncio.run(delete_document("file_abc123"))

Delete with verification

Verify the source exists (by file_id) before deleting:
from graphor import Graphor
import graphor

client = Graphor()

def safe_delete(file_id: str) -> bool:
    sources = client.sources.list()
    file_ids = [s.file_id for s in sources]
    if file_id not in file_ids:
        print(f"Source '{file_id}' not found")
        return False
    info = next(s for s in sources if s.file_id == file_id)
    print(f"Source to delete: {info.file_name} ({info.file_id})")
    confirm = input("Proceed? (yes/no): ")
    if confirm.lower() != "yes":
        return False
    try:
        result = client.sources.delete(file_id=file_id)
        print(f"Success: {result.message}")
        return True
    except graphor.APIStatusError as e:
        print(f"Deletion failed: {e}")
        return False

safe_delete("file_abc123")

Error handling

try:
    result = client.sources.delete(file_id="file_abc123")
    print(f"Deleted: {result.file_name}")
    
except graphor.NotFoundError as e:
    print(f"File not found: {e}")
    
except graphor.BadRequestError as e:
    print(f"Invalid request: {e}")
    
except graphor.AuthenticationError as e:
    print(f"Invalid API key: {e}")
    
except graphor.PermissionDeniedError as e:
    print(f"Access denied: {e}")
    
except graphor.RateLimitError as e:
    print(f"Rate limit exceeded. Please wait and retry: {e}")
    
except graphor.APIConnectionError as e:
    print(f"Connection error: {e}")
    
except graphor.APIStatusError as e:
    print(f"API error (status {e.status_code}): {e}")

Advanced Examples

Batch Deletion

Delete multiple files with safety checks:
from graphor import Graphor
import graphor
import time

client = Graphor()

def batch_delete(file_ids: list[str], confirm: bool = True) -> dict:
    """Delete multiple sources by file_id."""
    sources = client.sources.list()
    by_id = {s.file_id: s for s in sources}
    missing = [f for f in file_ids if f not in by_id]
    if missing:
        print(f"Sources not found: {missing}")
        return {"error": f"Missing: {missing}"}
    total_size = sum(by_id[f].file_size for f in file_ids)
    print(f"Deletion: {len(file_ids)} sources, {total_size / (1024*1024):.2f} MB")
    for fid in file_ids:
        print(f"  - {by_id[fid].file_name} ({by_id[fid].file_id})")
    if confirm:
        if input("Type 'DELETE' to confirm: ") != "DELETE":
            return {"cancelled": True}
    results = {"successful": [], "failed": []}
    for i, fid in enumerate(file_ids, 1):
        try:
            result = client.sources.delete(file_id=fid)
            results["successful"].append({"file_id": fid, "message": result.message})
        except graphor.APIStatusError as e:
            results["failed"].append({"file_id": fid, "error": str(e)})
        if i < len(file_ids):
            time.sleep(0.5)
    return results

file_ids_to_delete = ["file_1", "file_2", "file_3"]
results = batch_delete(file_ids_to_delete)

Async Batch Deletion

Delete multiple files concurrently:
import asyncio
from graphor import AsyncGraphor
import graphor

async def delete_single(client: AsyncGraphor, file_id: str) -> dict:
    try:
        result = await client.sources.delete(file_id=file_id)
        return {"file_id": file_id, "status": "success", "message": result.message}
    except graphor.APIStatusError as e:
        return {"file_id": file_id, "status": "failed", "error": str(e)}

async def batch_delete_async(file_ids: list[str], max_concurrent: int = 3):
    client = AsyncGraphor()
    semaphore = asyncio.Semaphore(max_concurrent)

    async def delete_with_semaphore(fid: str):
        async with semaphore:
            result = await delete_single(client, fid)
            print(f"{'OK' if result['status'] == 'success' else 'FAIL'} {fid}")
            return result

    tasks = [delete_with_semaphore(f) for f in file_ids]
    return await asyncio.gather(*tasks)

results = asyncio.run(batch_delete_async(["file_1", "file_2", "file_3"], max_concurrent=2))

Cleanup Failed Sources

Automatically clean up sources that failed processing:
from graphor import Graphor
import graphor

client = Graphor()

def cleanup_failed_sources(confirm: bool = True) -> dict:
    """Find and delete all failed sources."""
    
    # Get all sources
    print("Finding failed sources...")
    sources = client.sources.list()
    
    # Filter failed sources
    failed_sources = [s for s in sources if s.status == "Failed"]
    
    if not failed_sources:
        print("No failed sources found")
        return {"deleted": 0}
    
    print(f"Found {len(failed_sources)} failed sources:")
    for source in failed_sources:
        print(f"  - {source.file_name} ({source.file_type})")
    
    # Confirm deletion
    if confirm:
        confirmation = input(f"\nDelete all {len(failed_sources)} failed sources? (yes/no): ")
        if confirmation.lower() != "yes":
            print("Cleanup cancelled")
            return {"cancelled": True}
    
    # Delete failed sources
    deleted = []
    errors = []
    
    for source in failed_sources:
        try:
            client.sources.delete(file_id=source.file_id)
            deleted.append(source.file_name)
            print(f"Deleted: {source.file_name}")
        except graphor.APIStatusError as e:
            errors.append({"file_name": source.file_name, "error": str(e)})
            print(f"Failed to delete {source.file_name}: {e}")
    
    print(f"\nCleanup complete: {len(deleted)} deleted, {len(errors)} errors")
    return {"deleted": deleted, "errors": errors}

# Usage
cleanup_failed_sources()

Document Lifecycle Manager

A complete class for managing document lifecycle:
from graphor import Graphor
import graphor
from dataclasses import dataclass
from typing import Optional

@dataclass
class DeletionResult:
    success: bool
    file_name: str
    message: str
    error: Optional[str] = None

class DocumentManager:
    def __init__(self, api_key: Optional[str] = None):
        self.client = Graphor(api_key=api_key) if api_key else Graphor()
    
    def list_sources(self):
        """Get all sources."""
        return self.client.sources.list()
    
    def find_source(self, file_id: str):
        """Find a source by file_id."""
        sources = self.list_sources()
        for source in sources:
            if source.file_id == file_id:
                return source
        return None

    def delete(self, file_id: str, verify: bool = True) -> DeletionResult:
        """Delete a source by file_id with optional verification."""
        if verify:
            source = self.find_source(file_id)
            if not source:
                return DeletionResult(
                    success=False,
                    file_name=file_id,
                    message="Source not found",
                    error="NotFoundError"
                )
        try:
            result = self.client.sources.delete(file_id=file_id)
            return DeletionResult(
                success=True,
                file_name=result.file_name,
                message=result.message
            )
        except graphor.NotFoundError as e:
            return DeletionResult(success=False, file_name=file_id, message="Source not found", error=str(e))
        except graphor.APIStatusError as e:
            return DeletionResult(success=False, file_name=file_id, message=f"API error: {e.status_code}", error=str(e))
    
    def delete_by_type(self, file_type: str, confirm: bool = True) -> list[DeletionResult]:
        """Delete all sources of a specific type."""
        sources = self.list_sources()
        targets = [s for s in sources if s.file_type == file_type]
        
        if not targets:
            print(f"No sources with type '{file_type}' found")
            return []
        
        print(f"Found {len(targets)} sources with type '{file_type}':")
        for source in targets:
            print(f"  - {source.file_name}")
        
        if confirm:
            confirmation = input(f"\nDelete all {len(targets)} {file_type} files? (yes/no): ")
            if confirmation.lower() != "yes":
                print("Cancelled")
                return []
        
        results = []
        for source in targets:
            result = self.delete(source.file_id, verify=False)
            results.append(result)
            status = "OK" if result.success else "FAIL"
            print(f"{status} {source.file_name}: {result.message}")
        
        return results
    
    def delete_by_status(self, status: str, confirm: bool = True) -> list[DeletionResult]:
        """Delete all sources with a specific status."""
        sources = self.list_sources()
        targets = [s for s in sources if s.status == status]
        
        if not targets:
            print(f"No sources with status '{status}' found")
            return []
        
        print(f"Found {len(targets)} sources with status '{status}':")
        for source in targets:
            print(f"  - {source.file_name}")
        
        if confirm:
            confirmation = input(f"\nDelete all {len(targets)} {status} sources? (yes/no): ")
            if confirmation.lower() != "yes":
                print("Cancelled")
                return []
        
        results = []
        for source in targets:
            result = self.delete(source.file_id, verify=False)
            results.append(result)
            status_icon = "OK" if result.success else "FAIL"
            print(f"{status_icon} {source.file_name}: {result.message}")
        
        return results

# Usage
manager = DocumentManager()

# Delete a single file
result = manager.delete("file_abc123")
print(f"Deleted: {result.success}")

# Delete all PDFs
results = manager.delete_by_type("pdf")

# Delete all failed sources
results = manager.delete_by_status("Failed")

Archival Tool with Dry Run

Test deletions before executing them:
from graphor import Graphor
import graphor
from typing import Optional

client = Graphor()

def archive_sources(
    file_types: Optional[list[str]] = None,
    statuses: Optional[list[str]] = None,
    min_size_mb: Optional[float] = None,
    max_size_mb: Optional[float] = None,
    dry_run: bool = True
) -> dict:
    """
    Archive sources based on criteria.
    Use dry_run=True to see what would be deleted without actually deleting.
    """
    
    print("Analyzing sources for archival...")
    sources = client.sources.list()
    
    # Apply filters
    candidates = []
    for source in sources:
        # File type filter
        if file_types and source.file_type not in file_types:
            continue
        
        # Status filter
        if statuses and source.status not in statuses:
            continue
        
        # Size filters
        size_mb = source.file_size / (1024 * 1024)
        if min_size_mb and size_mb < min_size_mb:
            continue
        if max_size_mb and size_mb > max_size_mb:
            continue
        
        candidates.append(source)
    
    if not candidates:
        print("No sources match the criteria")
        return {"candidates": 0}
    
    # Show summary
    total_size = sum(s.file_size for s in candidates)
    total_size_mb = total_size / (1024 * 1024)
    
    print(f"\nFound {len(candidates)} sources matching criteria:")
    for source in candidates:
        size_mb = source.file_size / (1024 * 1024)
        print(f"  - {source.file_name} ({source.file_type}, {size_mb:.1f}MB, {source.status})")
    
    print(f"\nTotal size to be freed: {total_size_mb:.1f}MB")
    
    if dry_run:
        print("\nDRY RUN - No files will be deleted")
        print("Set dry_run=False to perform actual deletion")
        return {
            "dry_run": True,
            "candidates": len(candidates),
            "total_size_mb": total_size_mb,
            "files": [s.file_name for s in candidates]
        }
    
    # Confirm and delete
    confirmation = input(f"\nDelete {len(candidates)} sources permanently? (type 'DELETE'): ")
    if confirmation != "DELETE":
        print("Archival cancelled")
        return {"cancelled": True}
    
    deleted = []
    failed = []
    
    for source in candidates:
        try:
            client.sources.delete(file_id=source.file_id)
            deleted.append(source.file_name)
            print(f"Archived: {source.file_name}")
        except graphor.APIStatusError as e:
            failed.append({"file_name": source.file_name, "error": str(e)})
            print(f"Failed: {source.file_name} - {e}")
    
    return {
        "deleted": deleted,
        "failed": failed,
        "total_deleted": len(deleted),
        "size_freed_mb": sum(
            s.file_size for s in candidates if s.file_name in deleted
        ) / (1024 * 1024)
    }

# Usage examples

# Dry run: see what would be deleted
archive_sources(
    file_types=["tmp", "test"],
    dry_run=True
)

# Archive all failed sources (dry run first)
archive_sources(
    statuses=["Failed"],
    dry_run=True
)

# Archive large files over 50MB (dry run)
archive_sources(
    min_size_mb=50,
    dry_run=True
)

# Actually delete failed sources
archive_sources(
    statuses=["Failed"],
    dry_run=False  # This will delete!
)

Error Reference

Error TypeStatus CodeDescription
BadRequestError400Invalid request format or missing file name
AuthenticationError401Invalid or missing API key
PermissionDeniedError403Access denied to the specified project
NotFoundError404Source not found for the given file_id
RateLimitError429Too many requests, please retry after waiting
InternalServerError≥500Server-side error during deletion
APIConnectionErrorN/ANetwork connectivity issues
APITimeoutErrorN/ARequest timed out

Best Practices

Pre-deletion verification

Verify the source exists (by file_id) before deleting:
sources = client.sources.list()
file_ids = [s.file_id for s in sources]
if file_id in file_ids:
    client.sources.delete(file_id=file_id)
else:
    print("Source not found")

Safety Measures

  • Implement confirmation prompts in interactive applications
  • Log all deletion operations for audit trails
  • Use dry_run patterns to preview deletions before executing
  • Test with non-production data when implementing deletion features

Error Handling

  • Implement retry logic for transient network errors (not for 404/403 errors)
  • Validate file_id (e.g. from list) before deleting
  • Handle batch operations carefully to avoid partial failures
# Retry logic for transient errors
from graphor import Graphor
import graphor

client = Graphor(max_retries=3)  # Automatic retries for transient errors

try:
    result = client.sources.delete(file_id="file_abc123")
except graphor.NotFoundError:
    # Don't retry - file doesn't exist
    pass
except graphor.APIConnectionError:
    # Already retried by SDK
    pass

Troubleshooting

Causes: Source doesn’t exist for the given file_id, wrong file_id, or already deleted.Solutions:
  • Use client.sources.list() to get current file_ids before deleting
  • Verify you’re using the correct project/API key
sources = client.sources.list()
for s in sources:
    print(s.file_id, s.file_name)
Causes: Invalid token, token revoked, or wrong project accessSolutions:
  • Verify API key format (should start with “grlm_”)
  • Check token status in the Graphor dashboard
  • Ensure token has delete permissions for the project
Causes: Large files, complex cleanup operations, or server loadSolutions:
  • Increase request timeout
client = Graphor(timeout=120.0)
# Or per-request
client.with_options(timeout=120.0).sources.delete(file_id=file_id)
Causes: Flows updating asynchronously after deletionSolutions:
  • Allow time for flow updates to propagate
  • Refresh flow data in your application
  • Reconfigure affected flows manually if needed

Next steps

After deleting a source:

List sources

View remaining sources and their file_ids

Upload

Ingest new files, URLs, GitHub, or YouTube

Reprocess source

Re-process a source with a different partition method

Get elements

Retrieve parsed elements from a source