Documentation Index Fetch the complete documentation index at: https://docs.graphorlm.com/llms.txt
Use this file to discover all available pages before exploring further.
The List sources endpoint returns every source in the project’s knowledge graph for the authenticated project. Each item includes file metadata (ID, name, size, type, origin), current processing status, and a human-readable message. You can optionally filter by file_id.
Endpoint overview
Authentication
This endpoint requires authentication using an API token. Include your API token as a Bearer token in the Authorization header.
Header Value Required AuthorizationBearer YOUR_API_TOKENYes
Query parameters
Parameter Type Required Description file_idslist of strings No If provided, only sources whose file_id is in this list are returned. Repeat the parameter for multiple IDs (e.g. ?file_ids=id1&file_ids=id2).
When file_ids is omitted, the response includes all sources in the project.
Success response (200 OK)
The endpoint returns a JSON array of source objects. Each object includes file metadata, status, and message:
[
{
"status" : "New" ,
"message" : "Source uploaded, awaiting processing" ,
"file_id" : "file_abc123" ,
"file_name" : "document.pdf" ,
"file_size" : 2048576 ,
"file_type" : "pdf" ,
"file_source" : "local file" ,
"project_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"project_name" : "My Project" ,
"method" : "fast"
},
{
"status" : "Completed" ,
"message" : "Source processed successfully" ,
"file_id" : "file_def456" ,
"file_name" : "presentation.pptx" ,
"file_size" : 1024000 ,
"file_type" : "pptx" ,
"file_source" : "local file" ,
"project_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"project_name" : "My Project" ,
"method" : "balanced"
},
{
"status" : "Processing" ,
"message" : "Source is being processed" ,
"file_id" : "file_ghi789" ,
"file_name" : "webpage-content" ,
"file_size" : 0 ,
"file_type" : "pdf" ,
"file_source" : "url" ,
"project_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"project_name" : "My Project" ,
"method" : "accurate"
}
]
Response fields
Field Type Description statusstring Current processing status (from backend): e.g. New, Processing, Processed, Completed, Failed, Processing failed, or unknown messagestring Human-readable status message file_idstring Unique identifier for the source (use for subsequent API calls) file_namestring Display name of the source file or identifier file_sizeinteger File size in bytes (0 for URL/GitHub/YouTube sources) file_typestring File extension or type file_sourcestring Origin: local file, url, github, or youtube project_idstring UUID of the project project_namestring Name of the project methodstring | null Partitioning strategy used: fast, balanced, accurate, or agentic (when available)
Status values
The status field is returned with capitalisation as stored in the backend (e.g. New, Completed). Typical values:
Status Message Description New”Source uploaded, awaiting processing” Upload/ingestion accepted; processing has not started yet Processing”Source is being processed” Pipeline is running (partitioning, chunking, embedding) Processed”Source has been processed” Source has been processed (intermediate or final state) Completed”Source processed successfully” Ready for ask, extract, retrieve, and get elements Failed”Source processing failed” Processing encountered an error; consider reprocessing Processing failed”Source status: Processing failed” Pipeline failed during processing unknown”Source status: …” Status could not be determined (e.g. when status is null)
File Source Types
Source Type Description Typical Use Cases local fileFiles uploaded directly from your computer Documents, PDFs, images, spreadsheets urlContent imported from web URLs Web pages, articles, online documents githubContent imported from GitHub repositories Code documentation, README files, wikis youtubeContent imported from YouTube videos Video transcripts, educational content
Code examples
JavaScript/Node.js
const listSources = async ( apiToken , fileIds = null ) => {
const url = new URL ( "https://sources.graphorlm.com" );
if ( fileIds && fileIds . length > 0 ) {
fileIds . forEach (( id ) => url . searchParams . append ( "file_ids" , id ));
}
const response = await fetch ( url , {
method: "GET" ,
headers: { Authorization: `Bearer ${ apiToken } ` },
});
if ( ! response . ok ) {
const error = await response . text ();
throw new Error ( `Failed to list sources: ${ response . status } ${ error } ` );
}
const sources = await response . json ();
console . log ( `Found ${ sources . length } sources` );
return sources ;
};
// Usage: list all sources
listSources ( "grlm_your_api_token_here" ). then (( sources ) => {
sources . forEach (( s ) => console . log ( ` ${ s . file_name } - ${ s . status } ` ));
});
// Usage: filter by file_ids
listSources ( "grlm_your_api_token_here" , [ "file_abc123" , "file_def456" ]);
Python
import requests
def list_sources ( api_token , file_ids = None ):
url = "https://sources.graphorlm.com"
headers = { "Authorization" : f "Bearer { api_token } " }
params = {}
if file_ids:
params[ "file_ids" ] = file_ids # list: requests sends as file_ids=id1&file_ids=id2
response = requests.get(url, headers = headers, params = params, timeout = 30 )
response.raise_for_status()
return response.json()
# Usage: list all sources
sources = list_sources( "grlm_your_api_token_here" )
for source in sources:
print ( f " { source[ 'file_name' ] } - { source[ 'status' ] } " )
# Usage: filter by file_ids
sources = list_sources( "grlm_your_api_token_here" , file_ids = [ "file_abc123" , "file_def456" ])
cURL
# List all sources
curl -X GET https://sources.graphorlm.com \
-H "Authorization: Bearer grlm_your_api_token_here"
# List only specific sources (repeat file_ids for each)
curl -X GET "https://sources.graphorlm.com?file_ids=file_abc123&file_ids=file_def456" \
-H "Authorization: Bearer grlm_your_api_token_here"
Error responses
Common error codes
Status code Description 401Invalid or missing API token 403Access denied to the project 500Unexpected internal error while retrieving sources
{
"detail" : "Failed to retrieve sources"
}
Response analysis
Filtering and processing results
def analyze_sources ( sources ):
"""Analyze sources by status and type."""
status_counts = {}
type_counts = {}
total_size = 0
for source in sources:
status = source.get( "status" , "unknown" )
status_counts[status] = status_counts.get(status, 0 ) + 1
file_type = source.get( "file_type" , "unknown" )
type_counts[file_type] = type_counts.get(file_type, 0 ) + 1
total_size += source.get( "file_size" , 0 )
return {
"total_sources" : len (sources),
"status_breakdown" : status_counts,
"type_breakdown" : type_counts,
"total_size_mb" : round (total_size / ( 1024 * 1024 ), 2 ),
}
sources = list_sources( "grlm_your_token" )
analysis = analyze_sources(sources)
print ( "Analysis:" , analysis)
Status monitoring
const monitorProcessingStatus = async ( apiToken ) => {
const sources = await listSources ( apiToken );
const processing = sources . filter (( s ) => s . status === "Processing" );
const failed = sources . filter (( s ) => s . status === "Failed" || s . status === "Processing failed" );
const completed = sources . filter (( s ) => s . status === "Completed" || s . status === "Processed" );
console . log ( `Processing: ${ processing . length } , Failed: ${ failed . length } , Completed: ${ completed . length } ` );
if ( failed . length > 0 ) {
failed . forEach (( s ) => console . log ( `Failed: ${ s . file_name } ( ${ s . file_type } )` ));
}
return { processing , failed , completed };
};
Integration Examples
Project Health Check
import requests
from datetime import datetime
def project_health_check ( api_token ):
"""Perform a comprehensive health check of the project."""
try :
sources = list_sources(api_token)
health_report = {
'timestamp' : datetime.now().isoformat(),
'total_sources' : len (sources),
'status_summary' : {},
'issues' : [],
'recommendations' : []
}
# Analyze status distribution
for source in sources:
status = source.get( 'status' , 'unknown' )
health_report[ 'status_summary' ][status] = health_report[ 'status_summary' ].get(status, 0 ) + 1
# Identify issues
if status in ( "Failed" , "Processing failed" ):
health_report[ "issues" ].append( f "Failed processing: { source[ 'file_name' ] } " )
elif status == "unknown" :
health_report[ "issues" ].append( f "Unknown status: { source[ 'file_name' ] } " )
failed_count = health_report[ "status_summary" ].get( "Failed" , 0 ) + health_report[ "status_summary" ].get( "Processing failed" , 0 )
if failed_count > 0 :
health_report[ "recommendations" ].append( f "Reprocess { failed_count } failed documents" )
processing_count = health_report[ "status_summary" ].get( "Processing" , 0 )
if processing_count > 5 :
health_report[ "recommendations" ].append( "Monitor processing queue - high volume detected" )
return health_report
except Exception as e:
return { 'error' : str (e), 'timestamp' : datetime.now().isoformat()}
# Usage
health = project_health_check( "grlm_your_token" )
print ( f "Project Health Report: { health } " )
Source Management Dashboard
class SourceManager {
constructor ( apiToken ) {
this . apiToken = apiToken ;
}
async getDashboardData () {
const sources = await listSources ( this . apiToken );
return {
overview: this . getOverview ( sources ),
recentUploads: this . getRecentUploads ( sources ),
processingQueue: this . getProcessingQueue ( sources ),
failedSources: this . getFailedSources ( sources )
};
}
getOverview ( sources ) {
const totalSize = sources . reduce (( sum , s ) => sum + ( s . file_size || 0 ), 0 );
return {
totalSources: sources . length ,
totalSizeMB: Math . round ( totalSize / ( 1024 * 1024 )),
byStatus: this . groupByField ( sources , 'status' ),
byType: this . groupByField ( sources , 'file_type' ),
bySource: this . groupByField ( sources , 'file_source' )
};
}
getRecentUploads ( sources , limit = 10 ) {
return sources
. filter ( s => s . status === "New" || s . status === "Processed" || s . status === "Completed" )
. slice ( 0 , limit )
. map ( s => ({
name: s . file_name ,
status: s . status ,
type: s . file_type ,
sizeMB: Math . round (( s . file_size || 0 ) / ( 1024 * 1024 ))
}));
}
getProcessingQueue ( sources ) {
return sources
. filter ( s => s . status === "Processing" )
. map ( s => ({
name: s . file_name ,
type: s . file_type ,
method: s . method
}));
}
getFailedSources ( sources ) {
return sources
. filter ( s => s . status === "Failed" || s . status === "Processing failed" )
. map ( s => ({
name: s . file_name ,
type: s . file_type ,
message: s . message
}));
}
groupByField ( sources , field ) {
return sources . reduce (( acc , source ) => {
const value = source [ field ] || 'unknown' ;
acc [ value ] = ( acc [ value ] || 0 ) + 1 ;
return acc ;
}, {});
}
}
// Usage
const manager = new SourceManager ( 'grlm_your_token' );
manager . getDashboardData ()
. then ( dashboard => console . log ( 'Dashboard:' , dashboard ))
. catch ( error => console . error ( 'Error:' , error ));
Automated Processing Pipeline
import time
import logging
class ProcessingPipeline :
def __init__ ( self , api_token ):
self .api_token = api_token
self .logger = logging.getLogger( __name__ )
def monitor_and_process ( self , check_interval = 60 ):
"""Monitor sources and automatically handle processing."""
while True :
try :
sources = list_sources( self .api_token)
new_sources = [s for s in sources if s[ "status" ] == "New" ]
if new_sources:
self .logger.info( f "Found { len (new_sources) } new sources" )
failed_sources = [s for s in sources if s[ "status" ] in ( "Failed" , "Processing failed" )]
if failed_sources:
self .logger.warning( f "Found { len (failed_sources) } failed sources" )
self .handle_failed_sources(failed_sources)
processing_sources = [s for s in sources if s[ "status" ] == "Processing" ]
if processing_sources:
self .logger.info( f " { len (processing_sources) } sources currently processing" )
time.sleep(check_interval)
except Exception as e:
self .logger.error( f "Pipeline error: { e } " )
time.sleep(check_interval)
def handle_failed_sources ( self , failed_sources ):
"""Handle failed sources - could implement retry logic."""
for source in failed_sources:
self .logger.info( f "Failed source: { source[ 'file_name' ] } - { source[ 'message' ] } " )
# Implement retry logic here if needed
# Usage
pipeline = ProcessingPipeline( "grlm_your_token" )
# pipeline.monitor_and_process() # Uncomment to run monitoring
Best Practices
Cache results : Store the response locally for a reasonable period
Filter client-side : Process the full list to extract specific information you need
Monitor regularly : Set up automated checks for processing status
Batch operations : Use the list to plan batch processing operations
Data Management
Track processing times : Monitor how long documents take to process
Identify patterns : Look for file types or sizes that frequently fail
Maintain logs : Keep records of source management activities
Plan capacity : Use file counts and sizes for storage planning
Error Handling
Implement retries : Handle temporary network issues with exponential backoff
Log failures : Keep detailed logs of API failures for debugging
Monitor status : Regularly check for failed processing jobs
Graceful degradation : Have fallback plans when the API is unavailable
Troubleshooting
Causes : Large number of sources, server load, or network issuesSolutions :
Implement request timeouts (30+ seconds recommended)
Use response caching for non-critical applications
Consider pagination if available in future API versions
Causes : No sources in project, wrong API token, or permission issuesSolutions :
Verify you have uploaded documents to your project
Check that your API token is correct and active
Ensure you’re accessing the right project
Inconsistent status information
Causes : Invalid token, expired token, or revoked accessSolutions :
Verify token format and validity
Check token hasn’t been revoked in dashboard
Generate a new token if necessary
Next steps
After listing your sources:
Upload sources Ingest files, URLs, GitHub repos, or YouTube videos (async)
Reprocess source Re-process an existing source with a different partition method (async)
Delete source Remove a source from your project
Get build status Poll status and optional elements for an async ingestion or reprocess