Streaming Client
Streaming Client documentation for Go SDK
The StreamingClient provides real-time streaming support for memory retrieval operations in GoodMem. This is the recommended approach for memory retrieval as it enables efficient handling of large result sets.
Overview
The StreamingClient supports two streaming formats:
- NDJSON (
application/x-ndjson) - Newline-delimited JSON (default, recommended) - SSE (
text/event-stream) - Server-Sent Events
Supported Formats
const (
FormatNDJSON StreamingFormat = "ndjson"
FormatSSE StreamingFormat = "sse"
)Basic Streaming with ChatPostProcessor
Use RetrieveMemoryStreamChat() for streaming with automatic ChatPostProcessor configuration:
package main
import (
"context"
"fmt"
"log"
goodmem_client "github.com/PAIR-Systems-Inc/goodmem/clients/go"
)
func main() {
// Configure client
configuration := goodmem_client.NewConfiguration()
configuration.Host = "http://localhost:8080"
configuration.DefaultHeader["X-API-Key"] = "your-api-key"
apiClient := goodmem_client.NewAPIClient(configuration)
streamingClient := goodmem_client.NewStreamingClient(apiClient)
// Helper values
requestedSize := int32(10)
fetchMemory := true
fetchMemoryContent := false
llmID := "550e8400-e29b-41d4-a716-446655440001"
rerankerID := "550e8400-e29b-41d4-a716-446655440000"
relevanceThreshold := 0.5
llmTemp := 0.3
maxResults := int32(10)
chronologicalResort := true
// Stream with ChatPostProcessor
ctx := context.Background()
stream, err := streamingClient.RetrieveMemoryStreamChat(
ctx,
"your search query",
[]string{"space-uuid"},
&requestedSize,
&fetchMemory,
&fetchMemoryContent,
goodmem_client.FormatNDJSON, // or FormatSSE
&llmID,
&rerankerID,
&relevanceThreshold,
&llmTemp,
&maxResults,
&chronologicalResort,
)
if err != nil {
log.Fatalf("Error: %v", err)
}
// Process events
for event := range stream {
if event.AbstractReply != nil {
fmt.Printf("Abstract: %s\n", event.AbstractReply.Text)
} else if event.RetrievedItem != nil && event.RetrievedItem.Memory != nil {
fmt.Printf("Memory: %v\n", event.RetrievedItem.Memory)
}
}
}Advanced Streaming with Custom Post-Processor
Use RetrieveMemoryStreamAdvanced() for streaming with custom post-processor configuration:
package main
import (
"context"
"fmt"
"log"
goodmem_client "github.com/PAIR-Systems-Inc/goodmem/clients/go"
)
func main() {
// Configure client
configuration := goodmem_client.NewConfiguration()
configuration.Host = "http://localhost:8080"
configuration.DefaultHeader["X-API-Key"] = "your-api-key"
apiClient := goodmem_client.NewAPIClient(configuration)
streamingClient := goodmem_client.NewStreamingClient(apiClient)
// Create advanced request with custom post-processor
request := &goodmem_client.AdvancedMemoryStreamRequest{
Message: "your search query",
SpaceIDs: []string{"space-uuid"},
RequestedSize: int32(10),
FetchMemory: true,
FetchMemoryContent: false,
Format: goodmem_client.FormatNDJSON, // or FormatSSE
PostProcessorName: "com.goodmem.retrieval.postprocess.ChatPostProcessorFactory",
PostProcessorConfig: map[string]interface{}{
"llm_id": "550e8400-e29b-41d4-a716-446655440001",
"reranker_id": "550e8400-e29b-41d4-a716-446655440000",
"relevance_threshold": 0.5,
"llm_temp": 0.3,
"max_results": 10,
"chronological_resort": true,
},
}
// Stream with advanced configuration
ctx := context.Background()
stream, err := streamingClient.RetrieveMemoryStreamAdvanced(ctx, request)
if err != nil {
log.Fatalf("Error: %v", err)
}
// Process events
for event := range stream {
if event.AbstractReply != nil {
fmt.Printf("Abstract: %s\n", event.AbstractReply.Text)
} else if event.RetrievedItem != nil && event.RetrievedItem.Memory != nil {
fmt.Printf("Memory: %v\n", event.RetrievedItem.Memory)
}
}
}Types Reference
StreamingClient
Main client for streaming operations:
type StreamingClient struct {
client *APIClient
httpClient *http.Client
}Methods:
NewStreamingClient(client *APIClient) *StreamingClient- Create new streaming clientRetrieveMemoryStream(ctx context.Context, req *MemoryStreamRequest) (MemoryStreamChannel, error)- Basic streamingRetrieveMemoryStreamChat(ctx context.Context, ...) (MemoryStreamChannel, error)- With ChatPostProcessorRetrieveMemoryStreamAdvanced(ctx context.Context, req *AdvancedMemoryStreamRequest) (MemoryStreamChannel, error)- Advanced streaming
MemoryStreamRequest
Basic streaming request structure:
type MemoryStreamRequest struct {
Message string `json:"message"`
SpaceIDs []string `json:"spaceIds,omitempty"`
RequestedSize *int32 `json:"requestedSize,omitempty"`
FetchMemory *bool `json:"fetchMemory,omitempty"`
FetchMemoryContent *bool `json:"fetchMemoryContent,omitempty"`
GenerateAbstract *bool `json:"generateAbstract,omitempty"`
Format StreamingFormat `json:"format"`
// ChatPostProcessor parameters
PpLlmID *string `json:"ppLlmId,omitempty"`
PpRerankerID *string `json:"ppRerankerId,omitempty"`
PpRelevanceThreshold *float64 `json:"ppRelevanceThreshold,omitempty"`
PpLlmTemp *float64 `json:"ppLlmTemp,omitempty"`
PpMaxResults *int32 `json:"ppMaxResults,omitempty"`
PpChronologicalResort *bool `json:"ppChronologicalResort,omitempty"`
}AdvancedMemoryStreamRequest
Advanced streaming request with custom post-processor:
type AdvancedMemoryStreamRequest struct {
Message string `json:"message"`
SpaceIDs []string `json:"spaceIds,omitempty"`
RequestedSize *int32 `json:"requestedSize,omitempty"`
FetchMemory *bool `json:"fetchMemory,omitempty"`
FetchMemoryContent *bool `json:"fetchMemoryContent,omitempty"`
Format StreamingFormat `json:"format"`
PostProcessorName string `json:"postProcessorName,omitempty"`
PostProcessorConfig map[string]interface{} `json:"postProcessorConfig,omitempty"`
}MemoryStreamResponse
Individual streaming event:
type MemoryStreamResponse struct {
ResultSetBoundary *ResultSetBoundary `json:"resultSetBoundary,omitempty"`
RetrievedItem *StreamRetrievedItem `json:"retrievedItem,omitempty"`
AbstractReply *AbstractReply `json:"abstractReply,omitempty"`
MemoryDefinition map[string]interface{} `json:"memoryDefinition,omitempty"`
Status *GoodMemStatus `json:"status,omitempty"`
}StreamRetrievedItem
Retrieved item in streaming response:
type StreamRetrievedItem struct {
Memory map[string]interface{} `json:"memory,omitempty"`
Chunk *StreamChunkReference `json:"chunk,omitempty"`
}StreamChunkReference
Reference to a memory chunk with relevance score:
type StreamChunkReference struct {
ResultSetId string `json:"resultSetId"`
Chunk map[string]interface{} `json:"chunk"`
MemoryIndex int `json:"memoryIndex"`
RelevanceScore float64 `json:"relevanceScore"`
}MemoryStreamChannel
Type alias for the streaming channel:
type MemoryStreamChannel <-chan MemoryStreamResponseStreamingFormat
Enumeration of supported streaming formats:
type StreamingFormat string
const (
FormatNDJSON StreamingFormat = "ndjson" // Newline-delimited JSON
FormatSSE StreamingFormat = "sse" // Server-Sent Events
)StreamError
Error structure for streaming errors:
type StreamError struct {
Message string
Code int
}Error Handling
stream, err := streamingClient.RetrieveMemoryStreamChat(ctx, ...)
if err != nil {
log.Printf("Failed to start streaming: %v", err)
return
}
// Process events with error checking
for event := range stream {
// Check for error events
if event.Status != nil && event.Status.Code != 0 {
log.Printf("Stream error: %v", event.Status.Message)
continue
}
// Process event
if event.AbstractReply != nil {
fmt.Printf("Reply: %s\n", event.AbstractReply.Text)
}
}Parameters
Shared Parameters
message(required): Query message for semantic searchspaceIds: List of space UUIDs to search withinrequestedSize: Maximum number of memories to retrievefetchMemory: Whether to fetch memory definitions (default: true)fetchMemoryContent: Whether to fetch original content (default: false)format: Streaming format -FormatNDJSON(default) orFormatSSE
ChatPostProcessor Parameters
ppLlmId: UUID of LLM for abstract generationppRerankerId: UUID of reranker for result rerankingppRelevanceThreshold: Minimum relevance score (default: 0.5)ppLlmTemp: LLM temperature for generation (default: 0.3)ppMaxResults: Maximum results to return (default: 10)ppChronologicalResort: Whether to resort by creation time (default: true)
Authorization
Streaming operations require authentication using an API key:
configuration.DefaultHeader["X-API-Key"] = "your-api-key"See ApiKeyAuth for more details.
Return Type
MemoryStreamChannel - A channel that yields MemoryStreamResponse events as they stream from the server.
See Also
- MemoriesAPI - Memory management API
- RetrieveMemoryRequest - Standard retrieve request
- RetrieveMemoryEvent - Standard retrieve response
- AbstractReply - AI-generated response