delete method (same name as the API endpoint) permanently removes a source from your project. You must pass the source’s file_id (from list or get build status). The operation is irreversible and removes all associated data including partition nodes and metadata.
Method overview
- Python
- TypeScript
client.sources.delete()await client.sources.delete()Method signature
- Python
- TypeScript
Copy
client.sources.delete(
file_id: str, # Required
timeout: float | None = None
) -> SourceDeleteResponse
Copy
await client.sources.delete({
fileId: string, // Required
}): Promise<SourceDeleteResponse>
Parameters
- Python
- TypeScript
| Parameter | Type | Description | Required |
|---|---|---|---|
file_id | str | Unique identifier of the source to delete | Yes |
timeout | float | Request timeout in seconds | No |
| Parameter | Type | Description | Required |
|---|---|---|---|
fileId | string | Unique identifier of the source to delete | Yes |
Important considerations
Permanent deletion
Permanent deletion
Warning: This operation is irreversible.
- All document content and metadata are permanently removed
- Associated partition nodes and embeddings are deleted
- Flows using this document are automatically updated
- No backup or recovery is available
Using file_id
Using file_id
- Use the source’s unique
file_idfrom list or get build status - Do not rely on file name; only
file_idis supported
Flow impact
Flow impact
- Dataset nodes using this document are updated automatically
- Successor nodes in affected flows may be marked as outdated
- Flow execution may be impacted until nodes are reconfigured
Response
The method returns a response object with:| Property | Type | Description |
|---|---|---|
status | str | Deletion result (e.g. “success”) |
message | str | Human-readable confirmation |
file_id | str | Identifier of the deleted source |
file_name | str | Name of the deleted file |
project_id | str | UUID of the project |
project_name | str | Name of the project |
Code examples
Basic usage
- Python
- TypeScript
Copy
from graphor import Graphor
client = Graphor()
file_id = "file_abc123" # from list() or get_build_status
result = client.sources.delete(file_id=file_id)
print(f"Status: {result.status}")
print(f"Deleted: {result.file_name}")
Copy
const client = new Graphor();
const fileId = 'file_abc123';
const result = await client.sources.delete({ fileId });
console.log('Status:', result.status);
console.log('Deleted:', result.file_name);
Async usage
- Python
- TypeScript
Copy
import asyncio
from graphor import AsyncGraphor
async def delete_document(file_id: str):
client = AsyncGraphor()
result = await client.sources.delete(file_id=file_id)
print(f"Deleted: {result.file_name}")
return result
asyncio.run(delete_document("file_abc123"))
Copy
async function deleteDocument(fileId: string) {
const result = await client.sources.delete({ fileId });
console.log('Deleted:', result.file_name);
return result;
}
await deleteDocument('file_abc123');
Delete with verification
Verify the source exists (byfile_id) before deleting:
- Python
- TypeScript
Copy
from graphor import Graphor
import graphor
client = Graphor()
def safe_delete(file_id: str) -> bool:
sources = client.sources.list()
file_ids = [s.file_id for s in sources]
if file_id not in file_ids:
print(f"Source '{file_id}' not found")
return False
info = next(s for s in sources if s.file_id == file_id)
print(f"Source to delete: {info.file_name} ({info.file_id})")
confirm = input("Proceed? (yes/no): ")
if confirm.lower() != "yes":
return False
try:
result = client.sources.delete(file_id=file_id)
print(f"Success: {result.message}")
return True
except graphor.APIStatusError as e:
print(f"Deletion failed: {e}")
return False
safe_delete("file_abc123")
Copy
const sources = await client.sources.list();
const fileIds = sources.map((s) => s.file_id);
if (!fileIds.includes(fileId)) {
console.log('Source not found');
return;
}
const info = sources.find((s) => s.file_id === fileId)!;
console.log(`Source to delete: ${info.file_name} (${info.file_id})`);
const result = await client.sources.delete({ fileId });
console.log('Success:', result.message);
Error handling
- Python
- TypeScript
Copy
try:
result = client.sources.delete(file_id="file_abc123")
print(f"Deleted: {result.file_name}")
except graphor.NotFoundError as e:
print(f"File not found: {e}")
except graphor.BadRequestError as e:
print(f"Invalid request: {e}")
except graphor.AuthenticationError as e:
print(f"Invalid API key: {e}")
except graphor.PermissionDeniedError as e:
print(f"Access denied: {e}")
except graphor.RateLimitError as e:
print(f"Rate limit exceeded. Please wait and retry: {e}")
except graphor.APIConnectionError as e:
print(f"Connection error: {e}")
except graphor.APIStatusError as e:
print(f"API error (status {e.status_code}): {e}")
Copy
import Graphor from 'graphor';
const client = new Graphor();
try {
const result = await client.sources.delete({ fileId: 'file_abc123' });
console.log(`Deleted: ${result.file_name}`);
} catch (err) {
if (err instanceof Graphor.NotFoundError) {
console.log(`File not found: ${err.message}`);
} else if (err instanceof Graphor.BadRequestError) {
console.log(`Invalid request: ${err.message}`);
} else if (err instanceof Graphor.AuthenticationError) {
console.log(`Invalid API key: ${err.message}`);
} else if (err instanceof Graphor.PermissionDeniedError) {
console.log(`Access denied: ${err.message}`);
} else if (err instanceof Graphor.RateLimitError) {
console.log(`Rate limit exceeded. Please wait and retry: ${err.message}`);
} else if (err instanceof Graphor.APIConnectionError) {
console.log(`Connection error: ${err.message}`);
} else if (err instanceof Graphor.APIError) {
console.log(`API error (status ${err.status}): ${err.message}`);
} else {
throw err;
}
}
Advanced Examples
Batch Deletion
Delete multiple files with safety checks:- Python
- TypeScript
Copy
from graphor import Graphor
import graphor
import time
client = Graphor()
def batch_delete(file_ids: list[str], confirm: bool = True) -> dict:
"""Delete multiple sources by file_id."""
sources = client.sources.list()
by_id = {s.file_id: s for s in sources}
missing = [f for f in file_ids if f not in by_id]
if missing:
print(f"Sources not found: {missing}")
return {"error": f"Missing: {missing}"}
total_size = sum(by_id[f].file_size for f in file_ids)
print(f"Deletion: {len(file_ids)} sources, {total_size / (1024*1024):.2f} MB")
for fid in file_ids:
print(f" - {by_id[fid].file_name} ({by_id[fid].file_id})")
if confirm:
if input("Type 'DELETE' to confirm: ") != "DELETE":
return {"cancelled": True}
results = {"successful": [], "failed": []}
for i, fid in enumerate(file_ids, 1):
try:
result = client.sources.delete(file_id=fid)
results["successful"].append({"file_id": fid, "message": result.message})
except graphor.APIStatusError as e:
results["failed"].append({"file_id": fid, "error": str(e)})
if i < len(file_ids):
time.sleep(0.5)
return results
file_ids_to_delete = ["file_1", "file_2", "file_3"]
results = batch_delete(file_ids_to_delete)
Copy
import Graphor from 'graphor';
const client = new Graphor();
async function batchDelete(fileIds: string[]) {
const sources = await client.sources.list();
const byId = new Map(sources.map((s) => [s.file_id, s]));
const missing = fileIds.filter((f) => !byId.has(f));
if (missing.length > 0) {
console.log('Sources not found:', missing);
return { error: `Missing: ${missing}` };
}
const results: { successful: { fileId: string }[]; failed: { fileId: string; error: string }[] } = {
successful: [],
failed: [],
};
for (const fileId of fileIds) {
try {
await client.sources.delete({ fileId });
results.successful.push({ fileId });
} catch (err) {
const message = err instanceof Graphor.APIError ? err.message : String(err);
results.failed.push({ fileId, error: message });
}
await new Promise((r) => setTimeout(r, 500));
}
return results;
}
const results = await batchDelete(['file_1', 'file_2', 'file_3']);
Async Batch Deletion
Delete multiple files concurrently:- Python
- TypeScript
Copy
import asyncio
from graphor import AsyncGraphor
import graphor
async def delete_single(client: AsyncGraphor, file_id: str) -> dict:
try:
result = await client.sources.delete(file_id=file_id)
return {"file_id": file_id, "status": "success", "message": result.message}
except graphor.APIStatusError as e:
return {"file_id": file_id, "status": "failed", "error": str(e)}
async def batch_delete_async(file_ids: list[str], max_concurrent: int = 3):
client = AsyncGraphor()
semaphore = asyncio.Semaphore(max_concurrent)
async def delete_with_semaphore(fid: str):
async with semaphore:
result = await delete_single(client, fid)
print(f"{'OK' if result['status'] == 'success' else 'FAIL'} {fid}")
return result
tasks = [delete_with_semaphore(f) for f in file_ids]
return await asyncio.gather(*tasks)
results = asyncio.run(batch_delete_async(["file_1", "file_2", "file_3"], max_concurrent=2))
Copy
import Graphor from 'graphor';
const client = new Graphor();
async function deleteSingle(fileId: string) {
try {
const result = await client.sources.delete({ fileId });
return { fileId, status: 'success' as const, message: result.message };
} catch (err) {
const message = err instanceof Graphor.APIError ? err.message : String(err);
return { fileId, status: 'failed' as const, error: message };
}
}
async function batchDeleteAsync(fileIds: string[], maxConcurrent = 3) {
// Simple concurrency limiter
const results: Awaited<ReturnType<typeof deleteSingle>>[] = [];
for (let i = 0; i < fileIds.length; i += maxConcurrent) {
const batch = fileIds.slice(i, i + maxConcurrent);
const batchResults = await Promise.all(
batch.map((fileId) => deleteSingle(fileId)),
);
results.push(...batchResults);
}
return results;
}
const results = await batchDeleteAsync(['file_1', 'file_2', 'file_3'], 2);
Cleanup Failed Sources
Automatically clean up sources that failed processing:- Python
- TypeScript
Copy
from graphor import Graphor
import graphor
client = Graphor()
def cleanup_failed_sources(confirm: bool = True) -> dict:
"""Find and delete all failed sources."""
# Get all sources
print("Finding failed sources...")
sources = client.sources.list()
# Filter failed sources
failed_sources = [s for s in sources if s.status == "Failed"]
if not failed_sources:
print("No failed sources found")
return {"deleted": 0}
print(f"Found {len(failed_sources)} failed sources:")
for source in failed_sources:
print(f" - {source.file_name} ({source.file_type})")
# Confirm deletion
if confirm:
confirmation = input(f"\nDelete all {len(failed_sources)} failed sources? (yes/no): ")
if confirmation.lower() != "yes":
print("Cleanup cancelled")
return {"cancelled": True}
# Delete failed sources
deleted = []
errors = []
for source in failed_sources:
try:
client.sources.delete(file_id=source.file_id)
deleted.append(source.file_name)
print(f"Deleted: {source.file_name}")
except graphor.APIStatusError as e:
errors.append({"file_name": source.file_name, "error": str(e)})
print(f"Failed to delete {source.file_name}: {e}")
print(f"\nCleanup complete: {len(deleted)} deleted, {len(errors)} errors")
return {"deleted": deleted, "errors": errors}
# Usage
cleanup_failed_sources()
Copy
import Graphor from 'graphor';
const client = new Graphor();
async function cleanupFailedSources() {
// Get all sources
console.log('Finding failed sources...');
const sources = await client.sources.list();
// Filter failed sources
const failedSources = sources.filter((s) => s.status === 'Failed');
if (failedSources.length === 0) {
console.log('No failed sources found');
return { deleted: 0 };
}
console.log(`Found ${failedSources.length} failed sources:`);
for (const source of failedSources) {
console.log(` - ${source.file_name} (${source.file_type})`);
}
// Delete failed sources
const deleted: string[] = [];
const errors: { fileName: string; error: string }[] = [];
for (const source of failedSources) {
try {
await client.sources.delete({ fileId: source.file_id });
deleted.push(source.file_name);
console.log(`Deleted: ${source.file_name}`);
} catch (err) {
const message = err instanceof Graphor.APIError ? err.message : String(err);
errors.push({ fileName: source.file_name, error: message });
console.log(`Failed to delete ${source.file_name}: ${message}`);
}
}
console.log(`\nCleanup complete: ${deleted.length} deleted, ${errors.length} errors`);
return { deleted, errors };
}
// Usage
await cleanupFailedSources();
Document Lifecycle Manager
A complete class for managing document lifecycle:- Python
- TypeScript
Copy
from graphor import Graphor
import graphor
from dataclasses import dataclass
from typing import Optional
@dataclass
class DeletionResult:
success: bool
file_name: str
message: str
error: Optional[str] = None
class DocumentManager:
def __init__(self, api_key: Optional[str] = None):
self.client = Graphor(api_key=api_key) if api_key else Graphor()
def list_sources(self):
"""Get all sources."""
return self.client.sources.list()
def find_source(self, file_id: str):
"""Find a source by file_id."""
sources = self.list_sources()
for source in sources:
if source.file_id == file_id:
return source
return None
def delete(self, file_id: str, verify: bool = True) -> DeletionResult:
"""Delete a source by file_id with optional verification."""
if verify:
source = self.find_source(file_id)
if not source:
return DeletionResult(
success=False,
file_name=file_id,
message="Source not found",
error="NotFoundError"
)
try:
result = self.client.sources.delete(file_id=file_id)
return DeletionResult(
success=True,
file_name=result.file_name,
message=result.message
)
except graphor.NotFoundError as e:
return DeletionResult(success=False, file_name=file_id, message="Source not found", error=str(e))
except graphor.APIStatusError as e:
return DeletionResult(success=False, file_name=file_id, message=f"API error: {e.status_code}", error=str(e))
def delete_by_type(self, file_type: str, confirm: bool = True) -> list[DeletionResult]:
"""Delete all sources of a specific type."""
sources = self.list_sources()
targets = [s for s in sources if s.file_type == file_type]
if not targets:
print(f"No sources with type '{file_type}' found")
return []
print(f"Found {len(targets)} sources with type '{file_type}':")
for source in targets:
print(f" - {source.file_name}")
if confirm:
confirmation = input(f"\nDelete all {len(targets)} {file_type} files? (yes/no): ")
if confirmation.lower() != "yes":
print("Cancelled")
return []
results = []
for source in targets:
result = self.delete(source.file_id, verify=False)
results.append(result)
status = "OK" if result.success else "FAIL"
print(f"{status} {source.file_name}: {result.message}")
return results
def delete_by_status(self, status: str, confirm: bool = True) -> list[DeletionResult]:
"""Delete all sources with a specific status."""
sources = self.list_sources()
targets = [s for s in sources if s.status == status]
if not targets:
print(f"No sources with status '{status}' found")
return []
print(f"Found {len(targets)} sources with status '{status}':")
for source in targets:
print(f" - {source.file_name}")
if confirm:
confirmation = input(f"\nDelete all {len(targets)} {status} sources? (yes/no): ")
if confirmation.lower() != "yes":
print("Cancelled")
return []
results = []
for source in targets:
result = self.delete(source.file_id, verify=False)
results.append(result)
status_icon = "OK" if result.success else "FAIL"
print(f"{status_icon} {source.file_name}: {result.message}")
return results
# Usage
manager = DocumentManager()
# Delete a single file
result = manager.delete("file_abc123")
print(f"Deleted: {result.success}")
# Delete all PDFs
results = manager.delete_by_type("pdf")
# Delete all failed sources
results = manager.delete_by_status("Failed")
Copy
import Graphor from 'graphor';
interface DeletionResult {
success: boolean;
fileName: string;
message: string;
error?: string;
}
class DocumentManager {
private client: Graphor;
constructor(apiKey?: string) {
this.client = apiKey ? new Graphor({ apiKey }) : new Graphor();
}
async listSources() {
return this.client.sources.list();
}
async findSource(fileId: string) {
const sources = await this.listSources();
return sources.find((s) => s.file_id === fileId) ?? null;
}
async delete(fileId: string, verify = true): Promise<DeletionResult> {
if (verify) {
const source = await this.findSource(fileId);
if (!source) {
return { success: false, fileName: fileId, message: 'Source not found', error: 'NotFoundError' };
}
}
try {
const result = await this.client.sources.delete({ fileId });
return { success: true, fileName: result.file_name, message: result.message };
} catch (err) {
if (err instanceof Graphor.NotFoundError) {
return { success: false, fileName: fileId, message: 'Source not found', error: err.message };
}
if (err instanceof Graphor.APIError) {
return { success: false, fileName: fileId, message: `API error: ${err.status}`, error: err.message };
}
throw err;
}
}
async deleteByType(fileType: string): Promise<DeletionResult[]> {
const sources = await this.listSources();
const targets = sources.filter((s) => s.file_type === fileType);
if (targets.length === 0) {
console.log(`No sources with type '${fileType}' found`);
return [];
}
console.log(`Found ${targets.length} sources with type '${fileType}':`);
for (const source of targets) {
console.log(` - ${source.file_name}`);
}
const results: DeletionResult[] = [];
for (const source of targets) {
const result = await this.delete(source.file_id, false);
results.push(result);
const status = result.success ? 'OK' : 'FAIL';
console.log(`${status} ${source.file_name}: ${result.message}`);
}
return results;
}
async deleteByStatus(status: string): Promise<DeletionResult[]> {
const sources = await this.listSources();
const targets = sources.filter((s) => s.status === status);
if (targets.length === 0) {
console.log(`No sources with status '${status}' found`);
return [];
}
console.log(`Found ${targets.length} sources with status '${status}':`);
for (const source of targets) {
console.log(` - ${source.file_name}`);
}
const results: DeletionResult[] = [];
for (const source of targets) {
const result = await this.delete(source.file_id, false);
results.push(result);
const statusIcon = result.success ? 'OK' : 'FAIL';
console.log(`${statusIcon} ${source.file_name}: ${result.message}`);
}
return results;
}
}
// Usage
const manager = new DocumentManager();
// Delete a single file
const result = await manager.delete('file_abc123');
console.log(`Deleted: ${result.success}`);
// Delete all PDFs
await manager.deleteByType('pdf');
// Delete all failed sources
await manager.deleteByStatus('Failed');
Archival Tool with Dry Run
Test deletions before executing them:- Python
- TypeScript
Copy
from graphor import Graphor
import graphor
from typing import Optional
client = Graphor()
def archive_sources(
file_types: Optional[list[str]] = None,
statuses: Optional[list[str]] = None,
min_size_mb: Optional[float] = None,
max_size_mb: Optional[float] = None,
dry_run: bool = True
) -> dict:
"""
Archive sources based on criteria.
Use dry_run=True to see what would be deleted without actually deleting.
"""
print("Analyzing sources for archival...")
sources = client.sources.list()
# Apply filters
candidates = []
for source in sources:
# File type filter
if file_types and source.file_type not in file_types:
continue
# Status filter
if statuses and source.status not in statuses:
continue
# Size filters
size_mb = source.file_size / (1024 * 1024)
if min_size_mb and size_mb < min_size_mb:
continue
if max_size_mb and size_mb > max_size_mb:
continue
candidates.append(source)
if not candidates:
print("No sources match the criteria")
return {"candidates": 0}
# Show summary
total_size = sum(s.file_size for s in candidates)
total_size_mb = total_size / (1024 * 1024)
print(f"\nFound {len(candidates)} sources matching criteria:")
for source in candidates:
size_mb = source.file_size / (1024 * 1024)
print(f" - {source.file_name} ({source.file_type}, {size_mb:.1f}MB, {source.status})")
print(f"\nTotal size to be freed: {total_size_mb:.1f}MB")
if dry_run:
print("\nDRY RUN - No files will be deleted")
print("Set dry_run=False to perform actual deletion")
return {
"dry_run": True,
"candidates": len(candidates),
"total_size_mb": total_size_mb,
"files": [s.file_name for s in candidates]
}
# Confirm and delete
confirmation = input(f"\nDelete {len(candidates)} sources permanently? (type 'DELETE'): ")
if confirmation != "DELETE":
print("Archival cancelled")
return {"cancelled": True}
deleted = []
failed = []
for source in candidates:
try:
client.sources.delete(file_id=source.file_id)
deleted.append(source.file_name)
print(f"Archived: {source.file_name}")
except graphor.APIStatusError as e:
failed.append({"file_name": source.file_name, "error": str(e)})
print(f"Failed: {source.file_name} - {e}")
return {
"deleted": deleted,
"failed": failed,
"total_deleted": len(deleted),
"size_freed_mb": sum(
s.file_size for s in candidates if s.file_name in deleted
) / (1024 * 1024)
}
# Usage examples
# Dry run: see what would be deleted
archive_sources(
file_types=["tmp", "test"],
dry_run=True
)
# Archive all failed sources (dry run first)
archive_sources(
statuses=["Failed"],
dry_run=True
)
# Archive large files over 50MB (dry run)
archive_sources(
min_size_mb=50,
dry_run=True
)
# Actually delete failed sources
archive_sources(
statuses=["Failed"],
dry_run=False # This will delete!
)
Copy
import Graphor from 'graphor';
const client = new Graphor();
interface ArchiveOptions {
fileTypes?: string[];
statuses?: string[];
minSizeMb?: number;
maxSizeMb?: number;
dryRun?: boolean;
}
async function archiveSources(options: ArchiveOptions = {}) {
const { fileTypes, statuses, minSizeMb, maxSizeMb, dryRun = true } = options;
console.log('Analyzing sources for archival...');
const sources = await client.sources.list();
// Apply filters
const candidates = sources.filter((source) => {
if (fileTypes && !fileTypes.includes(source.file_type)) return false;
if (statuses && !statuses.includes(source.status)) return false;
const sizeMb = source.file_size / (1024 * 1024);
if (minSizeMb && sizeMb < minSizeMb) return false;
if (maxSizeMb && sizeMb > maxSizeMb) return false;
return true;
});
if (candidates.length === 0) {
console.log('No sources match the criteria');
return { candidates: 0 };
}
// Show summary
const totalSize = candidates.reduce((sum, s) => sum + s.file_size, 0);
const totalSizeMb = totalSize / (1024 * 1024);
console.log(`\nFound ${candidates.length} sources matching criteria:`);
for (const source of candidates) {
const sizeMb = source.file_size / (1024 * 1024);
console.log(
` - ${source.file_name} (${source.file_type}, ${sizeMb.toFixed(1)}MB, ${source.status})`,
);
}
console.log(`\nTotal size to be freed: ${totalSizeMb.toFixed(1)}MB`);
if (dryRun) {
console.log('\nDRY RUN - No files will be deleted');
console.log('Set dryRun: false to perform actual deletion');
return {
dryRun: true,
candidates: candidates.length,
totalSizeMb,
files: candidates.map((s) => s.file_name),
};
}
// Perform deletions
const deleted: string[] = [];
const failed: { fileName: string; error: string }[] = [];
for (const source of candidates) {
try {
await client.sources.delete({ fileId: source.file_id });
deleted.push(source.file_name);
console.log(`Archived: ${source.file_name}`);
} catch (err) {
const message = err instanceof Graphor.APIError ? err.message : String(err);
failed.push({ fileName: source.file_name, error: message });
console.log(`Failed: ${source.file_name} - ${message}`);
}
}
const sizeFreed =
candidates
.filter((s) => deleted.includes(s.file_name))
.reduce((sum, s) => sum + s.file_size, 0) /
(1024 * 1024);
return { deleted, failed, totalDeleted: deleted.length, sizeFreedMb: sizeFreed };
}
// Usage examples
// Dry run: see what would be deleted
await archiveSources({ fileTypes: ['tmp', 'test'], dryRun: true });
// Archive all failed sources (dry run first)
await archiveSources({ statuses: ['Failed'], dryRun: true });
// Archive large files over 50MB (dry run)
await archiveSources({ minSizeMb: 50, dryRun: true });
// Actually delete failed sources
await archiveSources({ statuses: ['Failed'], dryRun: false }); // This will delete!
Error Reference
| Error Type | Status Code | Description |
|---|---|---|
BadRequestError | 400 | Invalid request format or missing file name |
AuthenticationError | 401 | Invalid or missing API key |
PermissionDeniedError | 403 | Access denied to the specified project |
NotFoundError | 404 | Source not found for the given file_id |
RateLimitError | 429 | Too many requests, please retry after waiting |
InternalServerError | ≥500 | Server-side error during deletion |
APIConnectionError | N/A | Network connectivity issues |
APITimeoutError | N/A | Request timed out |
Best Practices
Pre-deletion verification
Verify the source exists (byfile_id) before deleting:
- Python
- TypeScript
Copy
sources = client.sources.list()
file_ids = [s.file_id for s in sources]
if file_id in file_ids:
client.sources.delete(file_id=file_id)
else:
print("Source not found")
Copy
const sources = await client.sources.list();
const fileIds = sources.map((s) => s.file_id);
if (fileIds.includes(fileId)) {
await client.sources.delete({ fileId });
} else {
console.log('Source not found');
}
Safety Measures
- Implement confirmation prompts in interactive applications
- Log all deletion operations for audit trails
- Use dry_run patterns to preview deletions before executing
- Test with non-production data when implementing deletion features
Error Handling
- Implement retry logic for transient network errors (not for 404/403 errors)
- Validate file_id (e.g. from list) before deleting
- Handle batch operations carefully to avoid partial failures
- Python
- TypeScript
Copy
# Retry logic for transient errors
from graphor import Graphor
import graphor
client = Graphor(max_retries=3) # Automatic retries for transient errors
try:
result = client.sources.delete(file_id="file_abc123")
except graphor.NotFoundError:
# Don't retry - file doesn't exist
pass
except graphor.APIConnectionError:
# Already retried by SDK
pass
Copy
// Retry logic for transient errors
import Graphor from 'graphor';
const client = new Graphor({ maxRetries: 3 }); // Automatic retries for transient errors
try {
const result = await client.sources.delete({ fileId: 'file_abc123' });
} catch (err) {
if (err instanceof Graphor.NotFoundError) {
// Don't retry - file doesn't exist
} else if (err instanceof Graphor.APIConnectionError) {
// Already retried by SDK
}
}
Troubleshooting
Source not found (404)
Source not found (404)
Causes: Source doesn’t exist for the given
file_id, wrong file_id, or already deleted.Solutions:- Use
client.sources.list()to get currentfile_ids before deleting - Verify you’re using the correct project/API key
- Python
- TypeScript
Copy
sources = client.sources.list()
for s in sources:
print(s.file_id, s.file_name)
Copy
const sources = await client.sources.list();
for (const s of sources) {
console.log(s.fileId, s.fileName);
}
Authentication failures
Authentication failures
Causes: Invalid token, token revoked, or wrong project accessSolutions:
- Verify API key format (should start with “grlm_”)
- Check token status in the Graphor dashboard
- Ensure token has delete permissions for the project
Deletion timeouts
Deletion timeouts
Causes: Large files, complex cleanup operations, or server loadSolutions:
- Increase request timeout
- Python
- TypeScript
Copy
client = Graphor(timeout=120.0)
# Or per-request
client.with_options(timeout=120.0).sources.delete(file_id=file_id)
Copy
const client = new Graphor({ timeout: 120 * 1000 });
// Or per-request
await client.sources.delete({ fileId }, { timeout: 120 * 1000 });
Flow synchronization issues
Flow synchronization issues
Causes: Flows updating asynchronously after deletionSolutions:
- Allow time for flow updates to propagate
- Refresh flow data in your application
- Reconfigure affected flows manually if needed
Next steps
After deleting a source:List sources
View remaining sources and their file_ids
Upload
Ingest new files, URLs, GitHub, or YouTube
Reprocess source
Re-process a source with a different partition method
Get elements
Retrieve parsed elements from a source

