List all sources in your project using the Graphor SDK
The list method returns every source in the project. Each item includes file metadata (ID, name, size, type, origin), current processing status, and a human-readable message. You can optionally filter by file_ids.
from graphor import Graphorclient = Graphor()# List all sources in the projectsources = client.sources.list()print(f"Found {len(sources)} sources")for source in sources: print(f"{source.file_name} - {source.status}")
Copy
import Graphor from 'graphor';const client = new Graphor();// List all sources in the projectconst sources = await client.sources.list();console.log(`Found ${sources.length} sources`);for (const source of sources) { console.log(`${source.file_name} - ${source.status}`);}
from graphor import Graphorclient = Graphor()sources = client.sources.list()# Filter by statuscompleted = [s for s in sources if s.status == "Completed"]processing = [s for s in sources if s.status == "Processing"]failed = [s for s in sources if s.status == "Failed"]new = [s for s in sources if s.status == "New"]print(f"Completed: {len(completed)}")print(f"Processing: {len(processing)}")print(f"Failed: {len(failed)}")print(f"New: {len(new)}")
from graphor import Graphorclient = Graphor()sources = client.sources.list()# Filter by file typepdf_files = [s for s in sources if s.file_type == "pdf"]docx_files = [s for s in sources if s.file_type == "docx"]images = [s for s in sources if s.file_type in ("png", "jpg", "jpeg")]print(f"PDFs: {len(pdf_files)}")print(f"Word docs: {len(docx_files)}")print(f"Images: {len(images)}")# List all PDF filesfor pdf in pdf_files: size_mb = pdf.file_size / (1024 * 1024) print(f" {pdf.file_name} ({size_mb:.2f} MB)")
Copy
import Graphor from 'graphor';const client = new Graphor();const sources = await client.sources.list();// Filter by file typeconst pdfFiles = sources.filter((s) => s.file_type === 'pdf');const docxFiles = sources.filter((s) => s.file_type === 'docx');const images = sources.filter((s) => ['png', 'jpg', 'jpeg'].includes(s.file_type));console.log(`PDFs: ${pdfFiles.length}`);console.log(`Word docs: ${docxFiles.length}`);console.log(`Images: ${images.length}`);// List all PDF filesfor (const pdf of pdfFiles) { const sizeMb = pdf.file_size / (1024 * 1024); console.log(` ${pdf.file_name} (${sizeMb.toFixed(2)} MB)`);}
from graphor import Graphorclient = Graphor()sources = client.sources.list()# Filter by source typelocal_files = [s for s in sources if s.file_source == "local file"]url_sources = [s for s in sources if s.file_source == "url"]github_sources = [s for s in sources if s.file_source == "github"]youtube_sources = [s for s in sources if s.file_source == "youtube"]print(f"Local files: {len(local_files)}")print(f"URL sources: {len(url_sources)}")print(f"GitHub repos: {len(github_sources)}")print(f"YouTube videos: {len(youtube_sources)}")
from graphor import Graphorclient = Graphor()def monitor_processing_status(): """Monitor and report on processing status.""" sources = client.sources.list() processing = [s for s in sources if s.status == "Processing"] failed = [s for s in sources if s.status == "Failed"] completed = [s for s in sources if s.status == "Completed"] new = [s for s in sources if s.status == "New"] print("=" * 50) print("Processing Status Report") print("=" * 50) print(f"Completed: {len(completed)}") print(f"Processing: {len(processing)}") print(f"New: {len(new)}") print(f"Failed: {len(failed)}") print("=" * 50) # List files currently processing if processing: print("\nCurrently Processing:") for source in processing: print(f" - {source.file_name} ({source.partition_method})") # List failed files that need attention if failed: print("\nFailed Files (need attention):") for source in failed: print(f" - {source.file_name}: {source.message}") return { "completed": completed, "processing": processing, "new": new, "failed": failed }# Usagestatus = monitor_processing_status()
Copy
import Graphor from 'graphor';const client = new Graphor();async function monitorProcessingStatus() { const sources = await client.sources.list(); const processing = sources.filter((s) => s.status === 'Processing'); const failed = sources.filter((s) => s.status === 'Failed'); const completed = sources.filter((s) => s.status === 'Completed'); const newSources = sources.filter((s) => s.status === 'New'); console.log('='.repeat(50)); console.log('Processing Status Report'); console.log('='.repeat(50)); console.log(`Completed: ${completed.length}`); console.log(`Processing: ${processing.length}`); console.log(`New: ${newSources.length}`); console.log(`Failed: ${failed.length}`); console.log('='.repeat(50)); // List files currently processing if (processing.length > 0) { console.log('\nCurrently Processing:'); for (const source of processing) { console.log(` - ${source.file_name} (${source.partition_method})`); } } // List failed files that need attention if (failed.length > 0) { console.log('\nFailed Files (need attention):'); for (const source of failed) { console.log(` - ${source.file_name}: ${source.message}`); } } return { completed, processing, new: newSources, failed };}// Usageconst status = await monitorProcessingStatus();
from graphor import Graphorclient = Graphor()def find_source(file_name: str): """Find a source by exact file name.""" sources = client.sources.list() for source in sources: if source.file_name == file_name: return source return Nonedef search_sources(query: str): """Search sources by partial name match.""" sources = client.sources.list() matches = [s for s in sources if query.lower() in s.file_name.lower()] return matches# Usage# Find exact matchsource = find_source("document.pdf")if source: print(f"Found: {source.file_name} - {source.status}")else: print("Source not found")# Search by partial namematches = search_sources("report")print(f"Found {len(matches)} sources matching 'report'")for match in matches: print(f" - {match.file_name}")
Copy
import Graphor from 'graphor';const client = new Graphor();async function findSource(fileName: string) { const sources = await client.sources.list(); return sources.find((s) => s.file_name === fileName) ?? null;}async function searchSources(query: string) { const sources = await client.sources.list(); return sources.filter((s) => s.file_name.toLowerCase().includes(query.toLowerCase()));}// Usage// Find exact matchconst source = await findSource('document.pdf');if (source) { console.log(`Found: ${source.file_name} - ${source.status}`);} else { console.log('Source not found');}// Search by partial nameconst matches = await searchSources('report');console.log(`Found ${matches.length} sources matching 'report'`);for (const match of matches) { console.log(` - ${match.file_name}`);}
from graphor import Graphorimport graphorfrom dataclasses import dataclassfrom typing import Optional@dataclassclass SourceSummary: total: int completed: int processing: int failed: int new: int total_size_mb: floatclass SourceManager: def __init__(self, api_key: Optional[str] = None): self.client = Graphor(api_key=api_key) if api_key else Graphor() self._cache = None def refresh(self): """Refresh the sources cache.""" self._cache = self.client.sources.list() return self._cache @property def sources(self): """Get sources (cached).""" if self._cache is None: self.refresh() return self._cache def get_summary(self) -> SourceSummary: """Get a summary of all sources.""" sources = self.sources total_size = sum(s.file_size for s in sources) return SourceSummary( total=len(sources), completed=len([s for s in sources if s.status == "Completed"]), processing=len([s for s in sources if s.status == "Processing"]), failed=len([s for s in sources if s.status == "Failed"]), new=len([s for s in sources if s.status == "New"]), total_size_mb=round(total_size / (1024 * 1024), 2) ) def find_by_name(self, name: str): """Find a source by exact name.""" for source in self.sources: if source.file_name == name: return source return None def search(self, query: str): """Search sources by partial name match.""" return [s for s in self.sources if query.lower() in s.file_name.lower()] def filter_by_status(self, status: str): """Filter sources by status.""" return [s for s in self.sources if s.status == status] def filter_by_type(self, file_type: str): """Filter sources by file type.""" return [s for s in self.sources if s.file_type == file_type] def get_failed(self): """Get all failed sources.""" return self.filter_by_status("Failed") def get_processing(self): """Get all processing sources.""" return self.filter_by_status("Processing")# Usagemanager = SourceManager()# Get summarysummary = manager.get_summary()print(f"Total: {summary.total}, Completed: {summary.completed}, Failed: {summary.failed}")# Find a specific sourcesource = manager.find_by_name("document.pdf")if source: print(f"Found: {source.file_name} - {source.status}")# Search sourcesmatches = manager.search("report")print(f"Found {len(matches)} matches for 'report'")# Get failed sourcesfailed = manager.get_failed()print(f"Failed sources: {len(failed)}")
import timefrom graphor import Graphorimport graphorclient = Graphor()def continuous_monitoring(interval_seconds: int = 60, max_iterations: int = None): """Continuously monitor source processing status.""" iteration = 0 while max_iterations is None or iteration < max_iterations: try: sources = client.sources.list() processing = len([s for s in sources if s.status == "Processing"]) failed = len([s for s in sources if s.status == "Failed"]) completed = len([s for s in sources if s.status == "Completed"]) print(f"[{time.strftime('%H:%M:%S')}] " f"Completed: {completed} | Processing: {processing} | Failed: {failed}") # Alert if new failures detected if failed > 0: failed_sources = [s for s in sources if s.status == "Failed"] print(f" Warning - Failed sources: {[s.file_name for s in failed_sources]}") time.sleep(interval_seconds) iteration += 1 except graphor.APIConnectionError as e: print(f"[{time.strftime('%H:%M:%S')}] Connection error: {e}") time.sleep(interval_seconds) except KeyboardInterrupt: print("\nMonitoring stopped") break# Usage (monitor every 30 seconds, 10 times)# continuous_monitoring(interval_seconds=30, max_iterations=10)
Copy
import Graphor from 'graphor';const client = new Graphor();async function continuousMonitoring(intervalSeconds = 60, maxIterations?: number) { let iteration = 0; while (maxIterations === undefined || iteration < maxIterations) { try { const sources = await client.sources.list(); const processing = sources.filter((s) => s.status === 'Processing').length; const failed = sources.filter((s) => s.status === 'Failed').length; const completed = sources.filter((s) => s.status === 'Completed').length; const time = new Date().toLocaleTimeString(); console.log( `[${time}] Completed: ${completed} | Processing: ${processing} | Failed: ${failed}`, ); // Alert if new failures detected if (failed > 0) { const failedSources = sources.filter((s) => s.status === 'Failed'); console.log( ` Warning - Failed sources: ${failedSources.map((s) => s.file_name).join(', ')}`, ); } await new Promise((r) => setTimeout(r, intervalSeconds * 1000)); iteration++; } catch (err) { if (err instanceof Graphor.APIConnectionError) { const time = new Date().toLocaleTimeString(); console.log(`[${time}] Connection error: ${err.message}`); await new Promise((r) => setTimeout(r, intervalSeconds * 1000)); } else { throw err; } } }}// Usage (monitor every 30 seconds, 10 times)// await continuousMonitoring(30, 10);
Cache results: Store the response locally when making multiple queries
Filter client-side: The SDK returns all sources; filter in your code as needed
Use async: For applications that need to perform other work while waiting
Python
TypeScript
Copy
# Example: Cache sources for multiple operationssources = client.sources.list()# Now perform multiple filter operations without re-fetchingpdfs = [s for s in sources if s.file_type == "pdf"]completed = [s for s in sources if s.status == "Completed"]large_files = [s for s in sources if s.file_size > 10 * 1024 * 1024]