Skip to content

mcp/streamable: use event store to fix unbounded memory issues #335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

samthanawalla
Copy link
Contributor

This CL utilizes the event store to write outgoing messages and removes the unbounded outgoing data structure.

For #190

This CL utilizes the event store to write outgoing messages and removes
the unbounded outgoing data structure.

For modelcontextprotocol#190
@samthanawalla samthanawalla marked this pull request as ready for review August 20, 2025 15:47
@samthanawalla samthanawalla requested review from findleyr and jba August 20, 2025 15:47
for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) {
if err != nil {
// Wait for session initialization before yielding.
if errors.Is(err, ErrUnknownSession) || errors.Is(err, ErrUnknownStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing it this way, I would avoid calling After at all if there is no session or stream.

If there is a session and stream and After returns one of these errors, I think it is a real error and should be yielded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure there is an easy way to do that because the session and stream may exist but it may not exist in the event store yet.

Append could happen before or after the After call which is why we need After to report the error to us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only call After when the client sends Last-Event-ID. If they send it too early, the server should return an error. I don't understand the state where After is called before Append.

Copy link
Contributor Author

@samthanawalla samthanawalla Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only call After when the client sends Last-Event-ID.

We can call After with an index of -1 to start writing from the beginning of the stream which allows us to simplify the logic even if last-event-id is not sent.

I don't understand the state where After is called before Append.

After is called when respondSSE is called which is disjoint from when Append is called in Write. These events can happen in any order which is why we case on ErrUnknownSession and ErrUnknownStream to skip to the logic below which waits for a stream signal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just call Append with nil data when we create the stream?

Here's the problem: I'd like the eventstore to be able to completely clean up the stream or session at will, and so when we get an unknown session or stream, we should fail this connection because it will never be recoverable.

@@ -283,6 +283,12 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID
// index is no longer available.
var ErrEventsPurged = errors.New("data purged")

// ErrUnknownSession is the error that [EventStore.After] should return if the session ID is unknown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/return/wrap: we don't return this error value exactly, but one that wraps it.


// If all requests have been handled and replied to, we should terminate this connection.
// "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream."
// §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server
// We only want to terminate POSTs, and GETs that are replaying. The general-purpose GET
// (stream ID 0) will never have requests, and should remain open indefinitely.
if nOutstanding == 0 && !persistent {
if nOutstanding == 0 && !persistent && lastIndex >= int(stream.lastWriteIndex.Load()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought that this commit reintroduced the bug that was fixed in findleyr@f4a9396. However, I see that it probably doesn't, because of this atomic check.

I think it would be simpler to just move the check for nOutstanding above the After loop above. Then you don't need lastWriteIndex. WDYT?

I prefer to avoid atomics when there's already a synchronization mechanism (mu), because it's hard to reason about the relationship between the atomics and critical sections.

for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) {
if err != nil {
// Wait for session initialization before yielding.
if errors.Is(err, ErrUnknownSession) || errors.Is(err, ErrUnknownStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just call Append with nil data when we create the stream?

Here's the problem: I'd like the eventstore to be able to completely clean up the stream or session at will, and so when we get an unknown session or stream, we should fail this connection because it will never be recoverable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants