GoodMem
ReferenceClient SDKsJava

Streaming Client

Streaming Client documentation for Java SDK

The StreamingClient class provides real-time streaming support for memory retrieval operations in GoodMem. This is the recommended approach for memory retrieval as it enables efficient handling of large result sets.

Overview

The StreamingClient supports two streaming formats:

  • NDJSON (application/x-ndjson) - Newline-delimited JSON (default, recommended)
  • SSE (text/event-stream) - Server-Sent Events

Supported Formats

public enum StreamingFormat {
    NDJSON("application/x-ndjson"),
    SSE("text/event-stream")
}

Basic Streaming with ChatPostProcessor

Use retrieveMemoryStreamChat() for streaming with automatic ChatPostProcessor configuration:

import ai.pairsys.goodmem.client.ApiClient;
import ai.pairsys.goodmem.client.StreamingClient;
import ai.pairsys.goodmem.client.StreamingClient.*;

import java.util.List;
import java.util.stream.Stream;

public class BasicStreamingExample {
    public static void main(String[] args) {
        // Configure client
        ApiClient apiClient = new ApiClient();
        apiClient.setBasePath("http://localhost:8080");
        apiClient.setApiKey("your-api-key");

        // Create streaming client
        StreamingClient streamingClient = new StreamingClient(apiClient);

        // Stream with ChatPostProcessor (NDJSON format)
        Stream<MemoryStreamResponse> stream = streamingClient.retrieveMemoryStreamChat(
            "your search query",
            List.of("space-uuid"),
            10,                                           // requested size
            true,                                         // fetch memory
            false,                                        // fetch memory content
            StreamingFormat.NDJSON,                       // format (NDJSON or SSE)
            "llm-uuid",                                   // LLM ID
            "reranker-uuid",                              // reranker ID
            0.5,                                          // relevance threshold
            0.3,                                          // LLM temperature
            10,                                           // max results
            true                                          // chronological resort
        );

        // Process events
        stream.forEach(event -> {
            if (event.getAbstractReply() != null) {
                System.out.println("Abstract: " + event.getAbstractReply().getText());
            } else if (event.getRetrievedItem() != null && event.getRetrievedItem().getMemory() != null) {
                System.out.println("Memory: " + event.getRetrievedItem().getMemory());
            }
        });
    }
}

Advanced Streaming with Custom Post-Processor

Use retrieveMemoryStreamAdvanced() for streaming with custom post-processor configuration:

import ai.pairsys.goodmem.client.ApiClient;
import ai.pairsys.goodmem.client.StreamingClient;
import ai.pairsys.goodmem.client.StreamingClient.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class AdvancedStreamingExample {
    public static void main(String[] args) {
        // Configure client
        ApiClient apiClient = new ApiClient();
        apiClient.setBasePath("http://localhost:8080");
        apiClient.setApiKey("your-api-key");

        // Create streaming client
        StreamingClient streamingClient = new StreamingClient(apiClient);

        // Create advanced request with custom post-processor
        AdvancedMemoryStreamRequest request = new AdvancedMemoryStreamRequest();
        request.setMessage("your search query");
        request.setSpaceIds(List.of("space-uuid"));
        request.setRequestedSize(10);
        request.setFetchMemory(true);
        request.setFetchMemoryContent(false);
        request.setFormat(StreamingFormat.NDJSON);
        request.setPostProcessorName("com.goodmem.retrieval.postprocess.ChatPostProcessorFactory");

        // Configure post-processor
        Map<String, Object> config = new HashMap<>();
        config.put("llm_id", "llm-uuid");
        config.put("reranker_id", "reranker-uuid");
        config.put("relevance_threshold", 0.5);
        config.put("llm_temp", 0.3);
        config.put("max_results", 10);
        config.put("chronological_resort", true);
        request.setPostProcessorConfig(config);

        // Execute streaming
        Stream<MemoryStreamResponse> stream = streamingClient.retrieveMemoryStreamAdvanced(request);

        // Process events
        stream.forEach(event -> {
            if (event.getAbstractReply() != null) {
                System.out.println("Abstract: " + event.getAbstractReply().getText());
            } else if (event.getRetrievedItem() != null) {
                System.out.println("Retrieved item: " + event.getRetrievedItem());
            }
        });
    }
}

Types Reference

StreamingClient

Main client for streaming operations:

Methods:

  • new StreamingClient(ApiClient apiClient) - Create new streaming client
  • Stream<MemoryStreamResponse> retrieveMemoryStreamChat(...) - With ChatPostProcessor
  • Stream<MemoryStreamResponse> retrieveMemoryStreamAdvanced(AdvancedMemoryStreamRequest) - Advanced streaming

MemoryStreamResponse

Individual streaming event:

public class MemoryStreamResponse {
    private ResultSetBoundary resultSetBoundary;
    private StreamRetrievedItem retrievedItem;
    private AbstractReply abstractReply;
    private Map<String, Object> memoryDefinition;
    private GoodMemStatus status;
}

Fields:

  • resultSetBoundary: Boundary markers for result sets
  • retrievedItem: Retrieved memory item with chunk reference
  • abstractReply: AI-generated abstract of results
  • memoryDefinition: Memory definition data
  • status: Status information including errors

StreamRetrievedItem

Retrieved item in streaming response:

public class StreamRetrievedItem {
    private Map<String, Object> memory;
    private StreamChunkReference chunk;
}

Fields:

  • memory: Memory data as key-value pairs
  • chunk: Chunk reference with relevance score

StreamChunkReference

Reference to a memory chunk with relevance score:

public class StreamChunkReference {
    private String resultSetId;
    private Map<String, Object> chunk;
    private Integer memoryIndex;
    private Double relevanceScore;
}

Fields:

  • resultSetId: ID of the result set
  • chunk: Chunk data
  • memoryIndex: Index in memory
  • relevanceScore: Relevance score (0.0 - 1.0)

AdvancedMemoryStreamRequest

Advanced streaming request with custom post-processor:

public class AdvancedMemoryStreamRequest {
    private String message;
    private List<String> spaceIds;
    private Integer requestedSize;
    private Boolean fetchMemory;
    private Boolean fetchMemoryContent;
    private StreamingFormat format;
    private String postProcessorName;
    private Map<String, Object> postProcessorConfig;
}

Fields:

  • message (required): Query message for semantic search
  • spaceIds: List of space UUIDs to search within
  • requestedSize: Maximum number of memories to retrieve
  • fetchMemory: Whether to fetch memory definitions
  • fetchMemoryContent: Whether to fetch original content
  • format: Streaming format (NDJSON or SSE)
  • postProcessorName: Name of custom post-processor
  • postProcessorConfig: Configuration dictionary for post-processor

StreamingFormat

Enumeration of supported streaming formats:

public enum StreamingFormat {
    NDJSON("application/x-ndjson"),      // Newline-delimited JSON
    SSE("text/event-stream")              // Server-Sent Events
}

Parameters

ChatPostProcessor Parameters

  • message (required): Query message for semantic search
  • spaceIds: List of space UUIDs to search within
  • requestedSize: Maximum number of memories to retrieve
  • fetchMemory: Whether to fetch memory definitions (default: true)
  • fetchMemoryContent: Whether to fetch original content (default: false)
  • format: Streaming format - NDJSON (default) or SSE
  • llmId: UUID of LLM for abstract generation
  • rerankerId: UUID of reranker for result reranking
  • relevanceThreshold: Minimum relevance score (default: 0.5)
  • llmTemp: LLM temperature for generation (default: 0.3)
  • maxResults: Maximum results to return (default: 10)
  • chronologicalResort: Whether to resort by creation time (default: true)

Error Handling

try {
    Stream<MemoryStreamResponse> stream = streamingClient.retrieveMemoryStreamChat(...);
    stream.forEach(event -> {
        // Check for errors
        if (event.getStatus() != null && event.getStatus().getCode() != 0) {
            System.err.println("Error: " + event.getStatus().getMessage());
            return;
        }

        // Process event
        if (event.getAbstractReply() != null) {
            System.out.println("Reply: " + event.getAbstractReply().getText());
        }
    });
} catch (Exception e) {
    System.err.println("Streaming failed: " + e.getMessage());
    e.printStackTrace();
}

Authorization

Streaming operations require authentication using an API key:

ApiClient apiClient = new ApiClient();
apiClient.setApiKey("your-api-key");

See ApiKeyAuth for more details.

Return Type

Stream<MemoryStreamResponse> - A Java stream that yields MemoryStreamResponse events as they arrive from the server.

See Also