GoodMem
ReferenceClient SDKsGo

Streaming Client

Streaming Client documentation for Go SDK

The StreamingClient provides real-time streaming support for memory retrieval operations in GoodMem. This is the recommended approach for memory retrieval as it enables efficient handling of large result sets.

Overview

The StreamingClient supports two streaming formats:

  • NDJSON (application/x-ndjson) - Newline-delimited JSON (default, recommended)
  • SSE (text/event-stream) - Server-Sent Events

Supported Formats

const (
    FormatNDJSON StreamingFormat = "ndjson"
    FormatSSE    StreamingFormat = "sse"
)

Basic Streaming with ChatPostProcessor

Use RetrieveMemoryStreamChat() for streaming with automatic ChatPostProcessor configuration:

package main

import (
    "context"
    "fmt"
    "log"

    goodmem_client "github.com/PAIR-Systems-Inc/goodmem/clients/go"
)

func main() {
    // Configure client
    configuration := goodmem_client.NewConfiguration()
    configuration.Host = "http://localhost:8080"
    configuration.DefaultHeader["X-API-Key"] = "your-api-key"

    apiClient := goodmem_client.NewAPIClient(configuration)
    streamingClient := goodmem_client.NewStreamingClient(apiClient)

    // Helper values
    requestedSize := int32(10)
    fetchMemory := true
    fetchMemoryContent := false
    llmID := "550e8400-e29b-41d4-a716-446655440001"
    rerankerID := "550e8400-e29b-41d4-a716-446655440000"
    relevanceThreshold := 0.5
    llmTemp := 0.3
    maxResults := int32(10)
    chronologicalResort := true

    // Stream with ChatPostProcessor
    ctx := context.Background()
    stream, err := streamingClient.RetrieveMemoryStreamChat(
        ctx,
        "your search query",
        []string{"space-uuid"},
        &requestedSize,
        &fetchMemory,
        &fetchMemoryContent,
        goodmem_client.FormatNDJSON,  // or FormatSSE
        &llmID,
        &rerankerID,
        &relevanceThreshold,
        &llmTemp,
        &maxResults,
        &chronologicalResort,
    )
    if err != nil {
        log.Fatalf("Error: %v", err)
    }

    // Process events
    for event := range stream {
        if event.AbstractReply != nil {
            fmt.Printf("Abstract: %s\n", event.AbstractReply.Text)
        } else if event.RetrievedItem != nil && event.RetrievedItem.Memory != nil {
            fmt.Printf("Memory: %v\n", event.RetrievedItem.Memory)
        }
    }
}

Advanced Streaming with Custom Post-Processor

Use RetrieveMemoryStreamAdvanced() for streaming with custom post-processor configuration:

package main

import (
    "context"
    "fmt"
    "log"

    goodmem_client "github.com/PAIR-Systems-Inc/goodmem/clients/go"
)

func main() {
    // Configure client
    configuration := goodmem_client.NewConfiguration()
    configuration.Host = "http://localhost:8080"
    configuration.DefaultHeader["X-API-Key"] = "your-api-key"

    apiClient := goodmem_client.NewAPIClient(configuration)
    streamingClient := goodmem_client.NewStreamingClient(apiClient)

    // Create advanced request with custom post-processor
    request := &goodmem_client.AdvancedMemoryStreamRequest{
        Message: "your search query",
        SpaceIDs: []string{"space-uuid"},
        RequestedSize: int32(10),
        FetchMemory: true,
        FetchMemoryContent: false,
        Format: goodmem_client.FormatNDJSON,  // or FormatSSE
        PostProcessorName: "com.goodmem.retrieval.postprocess.ChatPostProcessorFactory",
        PostProcessorConfig: map[string]interface{}{
            "llm_id": "550e8400-e29b-41d4-a716-446655440001",
            "reranker_id": "550e8400-e29b-41d4-a716-446655440000",
            "relevance_threshold": 0.5,
            "llm_temp": 0.3,
            "max_results": 10,
            "chronological_resort": true,
        },
    }

    // Stream with advanced configuration
    ctx := context.Background()
    stream, err := streamingClient.RetrieveMemoryStreamAdvanced(ctx, request)
    if err != nil {
        log.Fatalf("Error: %v", err)
    }

    // Process events
    for event := range stream {
        if event.AbstractReply != nil {
            fmt.Printf("Abstract: %s\n", event.AbstractReply.Text)
        } else if event.RetrievedItem != nil && event.RetrievedItem.Memory != nil {
            fmt.Printf("Memory: %v\n", event.RetrievedItem.Memory)
        }
    }
}

Types Reference

StreamingClient

Main client for streaming operations:

type StreamingClient struct {
    client     *APIClient
    httpClient *http.Client
}

Methods:

  • NewStreamingClient(client *APIClient) *StreamingClient - Create new streaming client
  • RetrieveMemoryStream(ctx context.Context, req *MemoryStreamRequest) (MemoryStreamChannel, error) - Basic streaming
  • RetrieveMemoryStreamChat(ctx context.Context, ...) (MemoryStreamChannel, error) - With ChatPostProcessor
  • RetrieveMemoryStreamAdvanced(ctx context.Context, req *AdvancedMemoryStreamRequest) (MemoryStreamChannel, error) - Advanced streaming

MemoryStreamRequest

Basic streaming request structure:

type MemoryStreamRequest struct {
    Message            string          `json:"message"`
    SpaceIDs           []string        `json:"spaceIds,omitempty"`
    RequestedSize      *int32          `json:"requestedSize,omitempty"`
    FetchMemory        *bool           `json:"fetchMemory,omitempty"`
    FetchMemoryContent *bool           `json:"fetchMemoryContent,omitempty"`
    GenerateAbstract   *bool           `json:"generateAbstract,omitempty"`
    Format             StreamingFormat `json:"format"`

    // ChatPostProcessor parameters
    PpLlmID               *string  `json:"ppLlmId,omitempty"`
    PpRerankerID          *string  `json:"ppRerankerId,omitempty"`
    PpRelevanceThreshold  *float64 `json:"ppRelevanceThreshold,omitempty"`
    PpLlmTemp             *float64 `json:"ppLlmTemp,omitempty"`
    PpMaxResults          *int32   `json:"ppMaxResults,omitempty"`
    PpChronologicalResort *bool    `json:"ppChronologicalResort,omitempty"`
}

AdvancedMemoryStreamRequest

Advanced streaming request with custom post-processor:

type AdvancedMemoryStreamRequest struct {
    Message             string                 `json:"message"`
    SpaceIDs            []string               `json:"spaceIds,omitempty"`
    RequestedSize       *int32                 `json:"requestedSize,omitempty"`
    FetchMemory         *bool                  `json:"fetchMemory,omitempty"`
    FetchMemoryContent  *bool                  `json:"fetchMemoryContent,omitempty"`
    Format              StreamingFormat        `json:"format"`
    PostProcessorName   string                 `json:"postProcessorName,omitempty"`
    PostProcessorConfig map[string]interface{} `json:"postProcessorConfig,omitempty"`
}

MemoryStreamResponse

Individual streaming event:

type MemoryStreamResponse struct {
    ResultSetBoundary *ResultSetBoundary     `json:"resultSetBoundary,omitempty"`
    RetrievedItem     *StreamRetrievedItem   `json:"retrievedItem,omitempty"`
    AbstractReply     *AbstractReply         `json:"abstractReply,omitempty"`
    MemoryDefinition  map[string]interface{} `json:"memoryDefinition,omitempty"`
    Status            *GoodMemStatus         `json:"status,omitempty"`
}

StreamRetrievedItem

Retrieved item in streaming response:

type StreamRetrievedItem struct {
    Memory map[string]interface{} `json:"memory,omitempty"`
    Chunk  *StreamChunkReference  `json:"chunk,omitempty"`
}

StreamChunkReference

Reference to a memory chunk with relevance score:

type StreamChunkReference struct {
    ResultSetId    string                 `json:"resultSetId"`
    Chunk          map[string]interface{} `json:"chunk"`
    MemoryIndex    int                    `json:"memoryIndex"`
    RelevanceScore float64                `json:"relevanceScore"`
}

MemoryStreamChannel

Type alias for the streaming channel:

type MemoryStreamChannel <-chan MemoryStreamResponse

StreamingFormat

Enumeration of supported streaming formats:

type StreamingFormat string

const (
    FormatNDJSON StreamingFormat = "ndjson"  // Newline-delimited JSON
    FormatSSE    StreamingFormat = "sse"     // Server-Sent Events
)

StreamError

Error structure for streaming errors:

type StreamError struct {
    Message string
    Code    int
}

Error Handling

stream, err := streamingClient.RetrieveMemoryStreamChat(ctx, ...)
if err != nil {
    log.Printf("Failed to start streaming: %v", err)
    return
}

// Process events with error checking
for event := range stream {
    // Check for error events
    if event.Status != nil && event.Status.Code != 0 {
        log.Printf("Stream error: %v", event.Status.Message)
        continue
    }

    // Process event
    if event.AbstractReply != nil {
        fmt.Printf("Reply: %s\n", event.AbstractReply.Text)
    }
}

Parameters

Shared Parameters

  • message (required): Query message for semantic search
  • spaceIds: List of space UUIDs to search within
  • requestedSize: Maximum number of memories to retrieve
  • fetchMemory: Whether to fetch memory definitions (default: true)
  • fetchMemoryContent: Whether to fetch original content (default: false)
  • format: Streaming format - FormatNDJSON (default) or FormatSSE

ChatPostProcessor Parameters

  • ppLlmId: UUID of LLM for abstract generation
  • ppRerankerId: UUID of reranker for result reranking
  • ppRelevanceThreshold: Minimum relevance score (default: 0.5)
  • ppLlmTemp: LLM temperature for generation (default: 0.3)
  • ppMaxResults: Maximum results to return (default: 10)
  • ppChronologicalResort: Whether to resort by creation time (default: true)

Authorization

Streaming operations require authentication using an API key:

configuration.DefaultHeader["X-API-Key"] = "your-api-key"

See ApiKeyAuth for more details.

Return Type

MemoryStreamChannel - A channel that yields MemoryStreamResponse events as they stream from the server.

See Also