Streaming Client
Streaming Client documentation for JavaScript SDK
The StreamingClient class provides real-time streaming support for memory retrieval operations in GoodMem. This is the recommended approach for memory retrieval as it enables efficient handling of large result sets.
Overview
The StreamingClient supports two streaming formats:
- NDJSON (
application/x-ndjson) - Newline-delimited JSON (default, recommended) - SSE (
text/event-stream) - Server-Sent Events
Supported Formats
const StreamingFormat = {
NDJSON: 'ndjson',
SSE: 'sse'
};Basic Streaming with ChatPostProcessor
Use retrieveMemoryStreamChat() for streaming with automatic ChatPostProcessor configuration:
const GoodMemClient = require('@pairsystems/goodmem-client');
const { StreamingClient } = GoodMemClient;
// Configure client
const defaultClient = GoodMemClient.ApiClient.instance;
defaultClient.basePath = 'http://localhost:8080';
defaultClient.defaultHeaders = {
'X-API-Key': 'your-api-key'
};
// Create streaming client
const streamingClient = new StreamingClient(defaultClient);
// Create abort controller for cancellation
const controller = new AbortController();
// Stream with ChatPostProcessor
streamingClient.retrieveMemoryStreamChat(
controller.signal,
'your search query', // message
['550e8400-e29b-41d4-a716-446655440000'], // space IDs
10, // requested size
true, // fetch memory
false, // fetch memory content
'ndjson', // format (ndjson or sse)
'llm-uuid', // LLM ID
'reranker-uuid', // reranker ID
0.5, // relevance threshold
0.3, // LLM temperature
10, // max results
true // chronological resort
).then(async (stream) => {
for await (const event of stream) {
if (event.abstractReply) {
console.log(`Abstract: ${event.abstractReply.text}`);
} else if (event.retrievedItem && event.retrievedItem.memory) {
console.log(`Memory: `, event.retrievedItem.memory);
}
}
}).catch(error => {
console.error('Streaming error:', error);
});
// Cancel streaming if needed
// controller.abort();Advanced Streaming with Custom Post-Processor
Use retrieveMemoryStreamAdvanced() for streaming with custom post-processor configuration:
const GoodMemClient = require('@pairsystems/goodmem-client');
const { StreamingClient } = GoodMemClient;
// Configure client
const defaultClient = GoodMemClient.ApiClient.instance;
defaultClient.basePath = 'http://localhost:8080';
defaultClient.defaultHeaders = {
'X-API-Key': 'your-api-key'
};
// Create streaming client
const streamingClient = new StreamingClient(defaultClient);
// Create abort controller
const controller = new AbortController();
// Create advanced request with custom post-processor
const request = {
message: 'your search query',
spaceIds: ['space-uuid'],
requestedSize: 10,
fetchMemory: true,
fetchMemoryContent: false,
format: 'ndjson', // or 'sse'
postProcessorName: 'com.goodmem.retrieval.postprocess.ChatPostProcessorFactory',
postProcessorConfig: {
llm_id: 'llm-uuid',
reranker_id: 'reranker-uuid',
relevance_threshold: 0.5,
llm_temp: 0.3,
max_results: 10,
chronological_resort: true
}
};
// Execute streaming
streamingClient.retrieveMemoryStreamAdvanced(controller.signal, request)
.then(async (stream) => {
for await (const event of stream) {
if (event.abstractReply) {
console.log(`Abstract: ${event.abstractReply.text}`);
} else if (event.retrievedItem) {
console.log(`Retrieved item: `, event.retrievedItem);
}
}
})
.catch(error => {
console.error('Streaming error:', error);
});Types Reference
StreamingClient
Main client for streaming operations.
Constructor:
new StreamingClient(apiClient)Methods:
-
retrieveMemoryStreamChat(signal, message, spaceIds, requestedSize, fetchMemory, fetchMemoryContent, format, llmId, rerankerId, relevanceThreshold, llmTemp, maxResults, chronologicalResort)- ReturnsPromise<AsyncIterable<MemoryStreamResponse>> -
retrieveMemoryStreamAdvanced(signal, request)- ReturnsPromise<AsyncIterable<MemoryStreamResponse>>
MemoryStreamResponse
Individual streaming event object:
{
resultSetBoundary: ResultSetBoundary, // optional
retrievedItem: StreamRetrievedItem, // optional
abstractReply: AbstractReply, // optional
memoryDefinition: Object, // optional
status: GoodMemStatus // optional
}Properties:
resultSetBoundary: Boundary markers for result setsretrievedItem: Retrieved memory item with chunk referenceabstractReply: AI-generated abstract of resultsmemoryDefinition: Memory definition datastatus: Status information including errors
StreamRetrievedItem
Retrieved item in streaming response:
{
memory: Object, // Memory data as key-value pairs
chunk: StreamChunkReference // Chunk reference with score
}StreamChunkReference
Reference to a memory chunk with relevance score:
{
resultSetId: string, // ID of result set
chunk: Object, // Chunk data
memoryIndex: number, // Index in memory
relevanceScore: number // Relevance score (0.0 - 1.0)
}AdvancedMemoryStreamRequest
Advanced streaming request with custom post-processor:
{
message: string, // required: Query message
spaceIds: string[], // optional: Space UUIDs
requestedSize: number, // optional: Max memories
fetchMemory: boolean, // optional: Fetch definitions
fetchMemoryContent: boolean, // optional: Fetch content
format: string, // optional: 'ndjson' or 'sse'
postProcessorName: string, // optional: Post-processor name
postProcessorConfig: Object // optional: Configuration
}Parameters
ChatPostProcessor Parameters
message(required): Query message for semantic searchspaceIds: List of space UUIDs to search withinrequestedSize: Maximum number of memories to retrievefetchMemory: Whether to fetch memory definitions (default: true)fetchMemoryContent: Whether to fetch original content (default: false)format: Streaming format -'ndjson'(default) or'sse'llmId: UUID of LLM for abstract generationrerankerId: UUID of reranker for result rerankingrelevanceThreshold: Minimum relevance score (default: 0.5)llmTemp: LLM temperature for generation (default: 0.3)maxResults: Maximum results to return (default: 10)chronologicalResort: Whether to resort by creation time (default: true)
Cancellation
Streaming operations can be cancelled using AbortController:
const controller = new AbortController();
const streamPromise = streamingClient.retrieveMemoryStreamChat(
controller.signal, // Pass the signal
...
);
// Cancel the streaming after 5 seconds
setTimeout(() => {
controller.abort();
}, 5000);
streamPromise.then(async (stream) => {
try {
for await (const event of stream) {
// Process events
}
} catch (error) {
if (error.name === 'AbortError') {
console.log('Streaming cancelled');
}
}
});Error Handling
streamingClient.retrieveMemoryStreamChat(signal, ...)
.then(async (stream) => {
for await (const event of stream) {
// Check for errors
if (event.status && event.status.code !== 0) {
console.error(`Error: ${event.status.message}`);
continue;
}
// Process event
if (event.abstractReply) {
console.log(`Reply: ${event.abstractReply.text}`);
}
}
})
.catch(error => {
console.error('Streaming failed:', error.message);
});Authorization
Streaming operations require authentication using an API key:
const defaultClient = GoodMemClient.ApiClient.instance;
defaultClient.defaultHeaders = {
'X-API-Key': 'your-api-key'
};See Authentication for more details.
Return Type
Promise<AsyncIterable<MemoryStreamResponse>> - A promise that resolves to an async iterable of streaming events.
Browser Compatibility
The streaming client requires:
- Support for
fetchAPI with streaming response support - Support for
async/await - Modern JavaScript engines (ES2018+)
See Also
- MemoriesApi - Memory management API
- AbstractReply - AI-generated response
- README - Client documentation