-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[ENH] A control interface for GC. #5218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
// This is a placeholder service. The garbage collector currently only exposes a health service. | ||
service GarbageCollector {} | ||
message KickoffGarbageCollectionRequest { | ||
string collection = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CompanyBestPractice]
The variable name 'collection' in the proto field and related code doesn't specify the identifier type being used. According to our naming convention guideline, specify whether this is a collection ID, name, or other identifier type for better readability and maintainability.
Consider renaming to be more specific:
collection_id
if using UUID/IDcollection_name
if using string namecollection_identifier
if the type varies
Affected files: idl/chromadb/proto/garbage_collector.proto:6, rust/garbage_collector/src/lib.rs:42
rust/garbage_collector/src/lib.rs
Outdated
&self, | ||
req: Request<KickoffGarbageCollectionRequest>, | ||
) -> Result<Response<KickoffGarbageCollectionResponse>, Status> { | ||
Err(Status::not_found("resource not found")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The implementation returns a generic "resource not found" error which doesn't provide useful information to callers. Consider using a more descriptive error message that indicates this is a placeholder implementation.
Err(Status::not_found("resource not found")) | |
Err(Status::unimplemented("garbage collection endpoint not yet implemented")) |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Add RPC Control Endpoint for Manual Collection Garbage Collection This PR introduces a new RPC endpoint and supporting infrastructure to trigger garbage collection (GC) of collections via a command-line tool. It extends the Key Changes• Extended Affected Areas• This summary was automatically generated by @propel-code-bot |
#[tokio::main] | ||
async fn main() { | ||
let args = std::env::args().skip(1).collect::<Vec<_>>(); | ||
if args.len() != 2 { | ||
eprintln!("USAGE: chroma-manual-gc [HOST] [COLLECTION_UUID]"); | ||
std::process::exit(13); | ||
} | ||
let gcservice = Channel::from_shared(args[0].clone()) | ||
.expect("could not create channel") | ||
.connect() | ||
.await | ||
.expect("could not connect to gc service"); | ||
let mut client = GarbageCollectorClient::new(gcservice); | ||
client | ||
.kickoff_garbage_collection(KickoffGarbageCollectionRequest { | ||
collection_id: args[1].clone(), | ||
}) | ||
.await | ||
.expect("could not kickoff gc"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
This new command-line tool has a few issues:
- It's not registered as a binary in
rust/garbage_collector/Cargo.toml
, so it won't be built. You'll need to add a[[bin]]
section for it. - The error handling relies on
.expect()
, which will cause the tool to panic on errors like network issues or an invalid host. This isn't very user-friendly. Channel::from_shared()
expects a valid URI (e.g.,http://localhost:50055
). If a user provides justlocalhost:50055
, it will panic. The tool should prependhttp://
to the host argument.
Here's a suggestion that addresses the error handling and URI issues to make the tool more robust:
#[tokio::main] | |
async fn main() { | |
let args = std::env::args().skip(1).collect::<Vec<_>>(); | |
if args.len() != 2 { | |
eprintln!("USAGE: chroma-manual-gc [HOST] [COLLECTION_UUID]"); | |
std::process::exit(13); | |
} | |
let gcservice = Channel::from_shared(args[0].clone()) | |
.expect("could not create channel") | |
.connect() | |
.await | |
.expect("could not connect to gc service"); | |
let mut client = GarbageCollectorClient::new(gcservice); | |
client | |
.kickoff_garbage_collection(KickoffGarbageCollectionRequest { | |
collection_id: args[1].clone(), | |
}) | |
.await | |
.expect("could not kickoff gc"); | |
} | |
#[tokio::main] | |
async fn main() { | |
let args: Vec<String> = std::env::args().skip(1).collect(); | |
if args.len() != 2 { | |
eprintln!("USAGE: chroma-manual-gc [HOST:PORT] [COLLECTION_UUID]"); | |
std::process::exit(1); | |
} | |
let host = &args[0]; | |
let collection_id = &args[1]; | |
let endpoint = format!("http://{}", host); | |
let channel = match Channel::from_shared(endpoint) { | |
Ok(channel) => channel, | |
Err(e) => { | |
eprintln!("Invalid host format '{}': {}", host, e); | |
std::process::exit(1); | |
} | |
}; | |
let gcservice = match channel.connect().await { | |
Ok(service) => service, | |
Err(e) => { | |
eprintln!("Could not connect to gc service at '{}': {}", host, e); | |
std::process::exit(1); | |
} | |
}; | |
let mut client = GarbageCollectorClient::new(gcservice); | |
if let Err(e) = client | |
.kickoff_garbage_collection(KickoffGarbageCollectionRequest { | |
collection_id: collection_id.to_string(), | |
}) | |
.await | |
{ | |
eprintln!("Failed to kickoff gc: {}", e); | |
std::process::exit(1); | |
} | |
println!("Successfully kicked off garbage collection for collection {}", collection_id); | |
} |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
5e198ff
to
3ce50cd
Compare
query := s.read_db.Table("collections"). | ||
Select("collections.id, collections.name, collections.version_file_name, sub.min_oldest_version_ts AS oldest_version_ts, databases.tenant_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name"). | ||
Joins("INNER JOIN databases ON collections.database_id = databases.id"). | ||
Joins("INNER JOIN (?) AS sub ON collections.id = sub.id", sub) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Missing WHERE clause for collection ID filter in FetchCollectionForGc query. The method accepts a collectionID
parameter but doesn't use it in the database query, which will return all collections instead of the specific one requested:
query := s.read_db.Table("collections"). | |
Select("collections.id, collections.name, collections.version_file_name, sub.min_oldest_version_ts AS oldest_version_ts, databases.tenant_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name"). | |
Joins("INNER JOIN databases ON collections.database_id = databases.id"). | |
Joins("INNER JOIN (?) AS sub ON collections.id = sub.id", sub) | |
query := s.read_db.Table("collections"). | |
Select("collections.id, collections.name, collections.version_file_name, sub.min_oldest_version_ts AS oldest_version_ts, databases.tenant_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name"). | |
Joins("INNER JOIN databases ON collections.database_id = databases.id"). | |
Joins("INNER JOIN (?) AS sub ON collections.id = sub.id", sub). | |
Where("collections.id = ?", *collectionID) |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
Missing WHERE clause for collection ID filter in FetchCollectionForGc query. The method accepts a `collectionID` parameter but doesn't use it in the database query, which will return all collections instead of the specific one requested:
```suggestion
query := s.read_db.Table("collections").
Select("collections.id, collections.name, collections.version_file_name, sub.min_oldest_version_ts AS oldest_version_ts, databases.tenant_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name").
Joins("INNER JOIN databases ON collections.database_id = databases.id").
Joins("INNER JOIN (?) AS sub ON collections.id = sub.id", sub).
Where("collections.id = ?", *collectionID)
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: go/pkg/sysdb/metastore/db/dao/collection.go
Line: 158
return nil, grpcutils.BuildInternalGrpcError("request cannot be nil") | ||
} | ||
|
||
if *req.CollectionId == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Potential nil pointer dereference on line 606. The code checks if *req.CollectionId == ""
without first verifying that req.CollectionId
is not nil. If req.CollectionId
is nil, this will panic:
if *req.CollectionId == "" { | |
if req.CollectionId == nil || *req.CollectionId == "" { | |
return nil, grpcutils.BuildInternalGrpcError("collection_id is required") | |
} |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
Potential nil pointer dereference on line 606. The code checks if `*req.CollectionId == ""` without first verifying that `req.CollectionId` is not nil. If `req.CollectionId` is nil, this will panic:
```suggestion
if req.CollectionId == nil || *req.CollectionId == "" {
return nil, grpcutils.BuildInternalGrpcError("collection_id is required")
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: go/pkg/sysdb/grpc/collection_service.go
Line: 606
3ce50cd
to
a35f5cc
Compare
collections := convertCollectionToGcToModel([]*dbmodel.CollectionToGc{collectionToGc}) | ||
return collections[0], nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Potential null pointer dereference: In FetchCollectionForGc
, line 549 directly accesses collections[0]
without verifying that the conversion function convertCollectionToGcToModel
actually returned any elements. If convertCollectionToGcToModel
returns an empty slice for any reason, this will cause a panic.
collections := convertCollectionToGcToModel([]*dbmodel.CollectionToGc{collectionToGc}) | |
return collections[0], nil | |
collections := convertCollectionToGcToModel([]*dbmodel.CollectionToGc{collectionToGc}) | |
if len(collections) == 0 { | |
return nil, fmt.Errorf("failed to convert collection to model") | |
} | |
return collections[0], nil |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
Potential null pointer dereference: In `FetchCollectionForGc`, line 549 directly accesses `collections[0]` without verifying that the conversion function `convertCollectionToGcToModel` actually returned any elements. If `convertCollectionToGcToModel` returns an empty slice for any reason, this will cause a panic.
```suggestion
collections := convertCollectionToGcToModel([]*dbmodel.CollectionToGc{collectionToGc})
if len(collections) == 0 {
return nil, fmt.Errorf("failed to convert collection to model")
}
return collections[0], nil
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: go/pkg/sysdb/coordinator/table_catalog.go
Line: 550
if err != nil { | ||
log.Error("FetchCollectionForGc failed", zap.Error(err)) | ||
return nil, grpcutils.BuildInternalGrpcError(err.Error()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
To provide more meaningful error responses to the client, this handler should check for specific error types from the coordinator. When a collection is not found, it should return a NotFound
gRPC status instead of a generic Internal
error. This relies on the underlying layers propagating a structured "not found" error (which is addressed in a separate comment for go/pkg/sysdb/metastore/db/dao/collection.go
).
Context for Agents
[**BestPractice**]
To provide more meaningful error responses to the client, this handler should check for specific error types from the coordinator. When a collection is not found, it should return a `NotFound` gRPC status instead of a generic `Internal` error. This relies on the underlying layers propagating a structured "not found" error (which is addressed in a separate comment for `go/pkg/sysdb/metastore/db/dao/collection.go`).
File: go/pkg/sysdb/grpc/collection_service.go
Line: 619
let mut manual = vec![]; | ||
{ | ||
let manual_collections = self.manual_collections.lock(); | ||
for collection_id in manual_collections.iter() { | ||
manual.push(*collection_id); | ||
} | ||
} | ||
for collection_id in manual { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
To ensure a manually requested garbage collection is a one-time attempt, it's better to drain the manual_collections
set here. The current implementation can lead to repeated failed attempts if fetching collection info fails (e.g., collection not found), as the ID is never removed from the set in that case. Draining the set makes the behavior more predictable: if a manual GC fails, the user can re-issue the command to retry.
This change also requires removing the manual_collections.remove()
call later in the job processing loop.
let mut manual = vec![]; | |
{ | |
let manual_collections = self.manual_collections.lock(); | |
for collection_id in manual_collections.iter() { | |
manual.push(*collection_id); | |
} | |
} | |
for collection_id in manual { | |
let manual_collections_to_process; | |
{ | |
let mut manual_collections = self.manual_collections.lock(); | |
manual_collections_to_process = manual_collections.drain().collect::<Vec<_>>(); | |
} | |
for collection_id in manual_collections_to_process { |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
To ensure a manually requested garbage collection is a one-time attempt, it's better to drain the `manual_collections` set here. The current implementation can lead to repeated failed attempts if fetching collection info fails (e.g., collection not found), as the ID is never removed from the set in that case. Draining the set makes the behavior more predictable: if a manual GC fails, the user can re-issue the command to retry.
This change also requires removing the `manual_collections.remove()` call later in the job processing loop.
```suggestion
let manual_collections_to_process;
{
let mut manual_collections = self.manual_collections.lock();
manual_collections_to_process = manual_collections.drain().collect::<Vec<_>>();
}
for collection_id in manual_collections_to_process {
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/garbage_collector/src/garbage_collector_component.rs
Line: 426
2b801f6
to
81949cf
Compare
let collection = collections.remove(0); | ||
|
||
Ok(CollectionToGcInfo { | ||
id: collection.collection_id, | ||
tenant: collection.tenant, | ||
name: collection.name, | ||
version_file_path: collection.version_file_path.unwrap_or_default(), | ||
lineage_file_path: collection.lineage_file_path, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Using unwrap_or_default()
on version_file_path
could hide a potential issue where a collection is missing this crucial information. If version_file_path
is None
, it would be better to return an error instead of proceeding with an empty path, which would likely cause failures later in the garbage collection process.
To make this more robust, I suggest adding a new error variant for a malformed collection and using it here.
First, please update GetCollectionsToGcError
enum and its ChromaError
implementation in this file:
#[derive(Debug, Error)]
pub enum GetCollectionsToGcError {
#[error("No such collection")]
NoSuchCollection,
#[error("Collection is malformed: {0}")]
MalformedCollection(String),
#[error("Failed to parse uuid")]
ParsingError(#[from] Error),
#[error("Grpc request failed")]
RequestFailed(#[from] tonic::Status),
}
impl ChromaError for GetCollectionsToGcError {
fn code(&self) -> ErrorCodes {
match self {
GetCollectionsToGcError::NoSuchCollection => ErrorCodes::NotFound,
GetCollectionsToGcError::MalformedCollection(_) => ErrorCodes::Internal,
GetCollectionsToGcError::ParsingError(_) => ErrorCodes::Internal,
GetCollectionsToGcError::RequestFailed(_) => ErrorCodes::Internal,
}
}
}
Then, you can update get_collection_to_gc
to use this new error:
let collection = collections.remove(0); | |
Ok(CollectionToGcInfo { | |
id: collection.collection_id, | |
tenant: collection.tenant, | |
name: collection.name, | |
version_file_path: collection.version_file_path.unwrap_or_default(), | |
lineage_file_path: collection.lineage_file_path, | |
}) | |
let collection = collections.remove(0); | |
let version_file_path = collection.version_file_path.ok_or_else(|| { | |
GetCollectionsToGcError::MalformedCollection(format!( | |
"Collection {} is missing version_file_path", | |
collection_id | |
)) | |
})?; | |
Ok(CollectionToGcInfo { | |
id: collection.collection_id, | |
tenant: collection.tenant, | |
name: collection.name, | |
version_file_path, | |
lineage_file_path: collection.lineage_file_path, | |
}) |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
Using `unwrap_or_default()` on `version_file_path` could hide a potential issue where a collection is missing this crucial information. If `version_file_path` is `None`, it would be better to return an error instead of proceeding with an empty path, which would likely cause failures later in the garbage collection process.
To make this more robust, I suggest adding a new error variant for a malformed collection and using it here.
First, please update `GetCollectionsToGcError` enum and its `ChromaError` implementation in this file:
```rust
#[derive(Debug, Error)]
pub enum GetCollectionsToGcError {
#[error("No such collection")]
NoSuchCollection,
#[error("Collection is malformed: {0}")]
MalformedCollection(String),
#[error("Failed to parse uuid")]
ParsingError(#[from] Error),
#[error("Grpc request failed")]
RequestFailed(#[from] tonic::Status),
}
impl ChromaError for GetCollectionsToGcError {
fn code(&self) -> ErrorCodes {
match self {
GetCollectionsToGcError::NoSuchCollection => ErrorCodes::NotFound,
GetCollectionsToGcError::MalformedCollection(_) => ErrorCodes::Internal,
GetCollectionsToGcError::ParsingError(_) => ErrorCodes::Internal,
GetCollectionsToGcError::RequestFailed(_) => ErrorCodes::Internal,
}
}
}
```
Then, you can update `get_collection_to_gc` to use this new error:
```suggestion
let collection = collections.remove(0);
let version_file_path = collection.version_file_path.ok_or_else(|| {
GetCollectionsToGcError::MalformedCollection(format!(
"Collection {} is missing version_file_path",
collection_id
))
})?;
Ok(CollectionToGcInfo {
id: collection.collection_id,
tenant: collection.tenant,
name: collection.name,
version_file_path,
lineage_file_path: collection.lineage_file_path,
})
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/sysdb/src/sysdb.rs
Line: 1269
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe empty string is correct default for NULL version file. Asking other reviewer to confirm too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approach makes sense
@@ -450,6 +457,36 @@ async fn main() { | |||
tracing::info!("Exported {} collections to {}", len, output_file.display()); | |||
} | |||
|
|||
GarbageCollectorCommand::ManualCollection { collection_id } => { | |||
// Connect to the garbage collector service | |||
let gc_endpoint = std::env::var("GARBAGE_COLLECTOR_ENDPOINT") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a CLI arg rather than env var?
id: collection.collection_id, | ||
tenant: collection.tenant, | ||
name: collection.name, | ||
version_file_path: collection.version_file_path.unwrap_or_default(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should never be None for collections that we can GC
this method should probably return an error if it's none
Description of changes
This introduces an endpoint for garbage collecting collections from the
command line. The goal is to inject a collection provided on the
command line in the list of collections to clean up.
Once provided such a collection, gc will collect it at most once.
Test plan
CI
Migration plan
N/A
Observability plan
I plan to add the endpoint and then observe the world as I use it.
Documentation Changes
N/A