GoodMem
ReferenceClient SDKsJavaScript

Streaming Client

Streaming Client documentation for JavaScript SDK

The StreamingClient class provides real-time streaming support for memory retrieval operations in GoodMem. This is the recommended approach for memory retrieval as it enables efficient handling of large result sets.

Overview

The StreamingClient supports two streaming formats:

  • NDJSON (application/x-ndjson) - Newline-delimited JSON (default, recommended)
  • SSE (text/event-stream) - Server-Sent Events

Supported Formats

const StreamingFormat = {
    NDJSON: 'ndjson',
    SSE: 'sse'
};

Basic Streaming with ChatPostProcessor

Use retrieveMemoryStreamChat() for streaming with automatic ChatPostProcessor configuration:

const GoodMemClient = require('@pairsystems/goodmem-client');
const { StreamingClient } = GoodMemClient;

// Configure client
const defaultClient = GoodMemClient.ApiClient.instance;
defaultClient.basePath = 'http://localhost:8080';
defaultClient.defaultHeaders = {
    'X-API-Key': 'your-api-key'
};

// Create streaming client
const streamingClient = new StreamingClient(defaultClient);

// Create abort controller for cancellation
const controller = new AbortController();

// Stream with ChatPostProcessor
streamingClient.retrieveMemoryStreamChat(
    controller.signal,
    'your search query',                          // message
    ['550e8400-e29b-41d4-a716-446655440000'],     // space IDs
    10,                                            // requested size
    true,                                          // fetch memory
    false,                                         // fetch memory content
    'ndjson',                                      // format (ndjson or sse)
    'llm-uuid',                                    // LLM ID
    'reranker-uuid',                               // reranker ID
    0.5,                                           // relevance threshold
    0.3,                                           // LLM temperature
    10,                                            // max results
    true                                           // chronological resort
).then(async (stream) => {
    for await (const event of stream) {
        if (event.abstractReply) {
            console.log(`Abstract: ${event.abstractReply.text}`);
        } else if (event.retrievedItem && event.retrievedItem.memory) {
            console.log(`Memory: `, event.retrievedItem.memory);
        }
    }
}).catch(error => {
    console.error('Streaming error:', error);
});

// Cancel streaming if needed
// controller.abort();

Advanced Streaming with Custom Post-Processor

Use retrieveMemoryStreamAdvanced() for streaming with custom post-processor configuration:

const GoodMemClient = require('@pairsystems/goodmem-client');
const { StreamingClient } = GoodMemClient;

// Configure client
const defaultClient = GoodMemClient.ApiClient.instance;
defaultClient.basePath = 'http://localhost:8080';
defaultClient.defaultHeaders = {
    'X-API-Key': 'your-api-key'
};

// Create streaming client
const streamingClient = new StreamingClient(defaultClient);

// Create abort controller
const controller = new AbortController();

// Create advanced request with custom post-processor
const request = {
    message: 'your search query',
    spaceIds: ['space-uuid'],
    requestedSize: 10,
    fetchMemory: true,
    fetchMemoryContent: false,
    format: 'ndjson',  // or 'sse'
    postProcessorName: 'com.goodmem.retrieval.postprocess.ChatPostProcessorFactory',
    postProcessorConfig: {
        llm_id: 'llm-uuid',
        reranker_id: 'reranker-uuid',
        relevance_threshold: 0.5,
        llm_temp: 0.3,
        max_results: 10,
        chronological_resort: true
    }
};

// Execute streaming
streamingClient.retrieveMemoryStreamAdvanced(controller.signal, request)
    .then(async (stream) => {
        for await (const event of stream) {
            if (event.abstractReply) {
                console.log(`Abstract: ${event.abstractReply.text}`);
            } else if (event.retrievedItem) {
                console.log(`Retrieved item: `, event.retrievedItem);
            }
        }
    })
    .catch(error => {
        console.error('Streaming error:', error);
    });

Types Reference

StreamingClient

Main client for streaming operations.

Constructor:

new StreamingClient(apiClient)

Methods:

  • retrieveMemoryStreamChat(signal, message, spaceIds, requestedSize, fetchMemory, fetchMemoryContent, format, llmId, rerankerId, relevanceThreshold, llmTemp, maxResults, chronologicalResort) - Returns Promise<AsyncIterable<MemoryStreamResponse>>

  • retrieveMemoryStreamAdvanced(signal, request) - Returns Promise<AsyncIterable<MemoryStreamResponse>>

MemoryStreamResponse

Individual streaming event object:

{
    resultSetBoundary: ResultSetBoundary,      // optional
    retrievedItem: StreamRetrievedItem,        // optional
    abstractReply: AbstractReply,              // optional
    memoryDefinition: Object,                  // optional
    status: GoodMemStatus                      // optional
}

Properties:

  • resultSetBoundary: Boundary markers for result sets
  • retrievedItem: Retrieved memory item with chunk reference
  • abstractReply: AI-generated abstract of results
  • memoryDefinition: Memory definition data
  • status: Status information including errors

StreamRetrievedItem

Retrieved item in streaming response:

{
    memory: Object,                            // Memory data as key-value pairs
    chunk: StreamChunkReference                // Chunk reference with score
}

StreamChunkReference

Reference to a memory chunk with relevance score:

{
    resultSetId: string,                       // ID of result set
    chunk: Object,                             // Chunk data
    memoryIndex: number,                       // Index in memory
    relevanceScore: number                     // Relevance score (0.0 - 1.0)
}

AdvancedMemoryStreamRequest

Advanced streaming request with custom post-processor:

{
    message: string,                           // required: Query message
    spaceIds: string[],                        // optional: Space UUIDs
    requestedSize: number,                     // optional: Max memories
    fetchMemory: boolean,                      // optional: Fetch definitions
    fetchMemoryContent: boolean,               // optional: Fetch content
    format: string,                            // optional: 'ndjson' or 'sse'
    postProcessorName: string,                 // optional: Post-processor name
    postProcessorConfig: Object                // optional: Configuration
}

Parameters

ChatPostProcessor Parameters

  • message (required): Query message for semantic search
  • spaceIds: List of space UUIDs to search within
  • requestedSize: Maximum number of memories to retrieve
  • fetchMemory: Whether to fetch memory definitions (default: true)
  • fetchMemoryContent: Whether to fetch original content (default: false)
  • format: Streaming format - 'ndjson' (default) or 'sse'
  • llmId: UUID of LLM for abstract generation
  • rerankerId: UUID of reranker for result reranking
  • relevanceThreshold: Minimum relevance score (default: 0.5)
  • llmTemp: LLM temperature for generation (default: 0.3)
  • maxResults: Maximum results to return (default: 10)
  • chronologicalResort: Whether to resort by creation time (default: true)

Cancellation

Streaming operations can be cancelled using AbortController:

const controller = new AbortController();

const streamPromise = streamingClient.retrieveMemoryStreamChat(
    controller.signal,  // Pass the signal
    ...
);

// Cancel the streaming after 5 seconds
setTimeout(() => {
    controller.abort();
}, 5000);

streamPromise.then(async (stream) => {
    try {
        for await (const event of stream) {
            // Process events
        }
    } catch (error) {
        if (error.name === 'AbortError') {
            console.log('Streaming cancelled');
        }
    }
});

Error Handling

streamingClient.retrieveMemoryStreamChat(signal, ...)
    .then(async (stream) => {
        for await (const event of stream) {
            // Check for errors
            if (event.status && event.status.code !== 0) {
                console.error(`Error: ${event.status.message}`);
                continue;
            }

            // Process event
            if (event.abstractReply) {
                console.log(`Reply: ${event.abstractReply.text}`);
            }
        }
    })
    .catch(error => {
        console.error('Streaming failed:', error.message);
    });

Authorization

Streaming operations require authentication using an API key:

const defaultClient = GoodMemClient.ApiClient.instance;
defaultClient.defaultHeaders = {
    'X-API-Key': 'your-api-key'
};

See Authentication for more details.

Return Type

Promise<AsyncIterable<MemoryStreamResponse>> - A promise that resolves to an async iterable of streaming events.

Browser Compatibility

The streaming client requires:

  • Support for fetch API with streaming response support
  • Support for async/await
  • Modern JavaScript engines (ES2018+)

See Also