Streaming Client
Streaming Client documentation for Java SDK
The StreamingClient class provides real-time streaming support for memory retrieval operations in GoodMem. This is the recommended approach for memory retrieval as it enables efficient handling of large result sets.
Overview
The StreamingClient supports two streaming formats:
- NDJSON (
application/x-ndjson) - Newline-delimited JSON (default, recommended) - SSE (
text/event-stream) - Server-Sent Events
Supported Formats
public enum StreamingFormat {
NDJSON("application/x-ndjson"),
SSE("text/event-stream")
}Basic Streaming with ChatPostProcessor
Use retrieveMemoryStreamChat() for streaming with automatic ChatPostProcessor configuration:
import ai.pairsys.goodmem.client.ApiClient;
import ai.pairsys.goodmem.client.StreamingClient;
import ai.pairsys.goodmem.client.StreamingClient.*;
import java.util.List;
import java.util.stream.Stream;
public class BasicStreamingExample {
public static void main(String[] args) {
// Configure client
ApiClient apiClient = new ApiClient();
apiClient.setBasePath("http://localhost:8080");
apiClient.setApiKey("your-api-key");
// Create streaming client
StreamingClient streamingClient = new StreamingClient(apiClient);
// Stream with ChatPostProcessor (NDJSON format)
Stream<MemoryStreamResponse> stream = streamingClient.retrieveMemoryStreamChat(
"your search query",
List.of("space-uuid"),
10, // requested size
true, // fetch memory
false, // fetch memory content
StreamingFormat.NDJSON, // format (NDJSON or SSE)
"llm-uuid", // LLM ID
"reranker-uuid", // reranker ID
0.5, // relevance threshold
0.3, // LLM temperature
10, // max results
true // chronological resort
);
// Process events
stream.forEach(event -> {
if (event.getAbstractReply() != null) {
System.out.println("Abstract: " + event.getAbstractReply().getText());
} else if (event.getRetrievedItem() != null && event.getRetrievedItem().getMemory() != null) {
System.out.println("Memory: " + event.getRetrievedItem().getMemory());
}
});
}
}Advanced Streaming with Custom Post-Processor
Use retrieveMemoryStreamAdvanced() for streaming with custom post-processor configuration:
import ai.pairsys.goodmem.client.ApiClient;
import ai.pairsys.goodmem.client.StreamingClient;
import ai.pairsys.goodmem.client.StreamingClient.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
public class AdvancedStreamingExample {
public static void main(String[] args) {
// Configure client
ApiClient apiClient = new ApiClient();
apiClient.setBasePath("http://localhost:8080");
apiClient.setApiKey("your-api-key");
// Create streaming client
StreamingClient streamingClient = new StreamingClient(apiClient);
// Create advanced request with custom post-processor
AdvancedMemoryStreamRequest request = new AdvancedMemoryStreamRequest();
request.setMessage("your search query");
request.setSpaceIds(List.of("space-uuid"));
request.setRequestedSize(10);
request.setFetchMemory(true);
request.setFetchMemoryContent(false);
request.setFormat(StreamingFormat.NDJSON);
request.setPostProcessorName("com.goodmem.retrieval.postprocess.ChatPostProcessorFactory");
// Configure post-processor
Map<String, Object> config = new HashMap<>();
config.put("llm_id", "llm-uuid");
config.put("reranker_id", "reranker-uuid");
config.put("relevance_threshold", 0.5);
config.put("llm_temp", 0.3);
config.put("max_results", 10);
config.put("chronological_resort", true);
request.setPostProcessorConfig(config);
// Execute streaming
Stream<MemoryStreamResponse> stream = streamingClient.retrieveMemoryStreamAdvanced(request);
// Process events
stream.forEach(event -> {
if (event.getAbstractReply() != null) {
System.out.println("Abstract: " + event.getAbstractReply().getText());
} else if (event.getRetrievedItem() != null) {
System.out.println("Retrieved item: " + event.getRetrievedItem());
}
});
}
}Types Reference
StreamingClient
Main client for streaming operations:
Methods:
new StreamingClient(ApiClient apiClient)- Create new streaming clientStream<MemoryStreamResponse> retrieveMemoryStreamChat(...)- With ChatPostProcessorStream<MemoryStreamResponse> retrieveMemoryStreamAdvanced(AdvancedMemoryStreamRequest)- Advanced streaming
MemoryStreamResponse
Individual streaming event:
public class MemoryStreamResponse {
private ResultSetBoundary resultSetBoundary;
private StreamRetrievedItem retrievedItem;
private AbstractReply abstractReply;
private Map<String, Object> memoryDefinition;
private GoodMemStatus status;
}Fields:
resultSetBoundary: Boundary markers for result setsretrievedItem: Retrieved memory item with chunk referenceabstractReply: AI-generated abstract of resultsmemoryDefinition: Memory definition datastatus: Status information including errors
StreamRetrievedItem
Retrieved item in streaming response:
public class StreamRetrievedItem {
private Map<String, Object> memory;
private StreamChunkReference chunk;
}Fields:
memory: Memory data as key-value pairschunk: Chunk reference with relevance score
StreamChunkReference
Reference to a memory chunk with relevance score:
public class StreamChunkReference {
private String resultSetId;
private Map<String, Object> chunk;
private Integer memoryIndex;
private Double relevanceScore;
}Fields:
resultSetId: ID of the result setchunk: Chunk datamemoryIndex: Index in memoryrelevanceScore: Relevance score (0.0 - 1.0)
AdvancedMemoryStreamRequest
Advanced streaming request with custom post-processor:
public class AdvancedMemoryStreamRequest {
private String message;
private List<String> spaceIds;
private Integer requestedSize;
private Boolean fetchMemory;
private Boolean fetchMemoryContent;
private StreamingFormat format;
private String postProcessorName;
private Map<String, Object> postProcessorConfig;
}Fields:
message(required): Query message for semantic searchspaceIds: List of space UUIDs to search withinrequestedSize: Maximum number of memories to retrievefetchMemory: Whether to fetch memory definitionsfetchMemoryContent: Whether to fetch original contentformat: Streaming format (NDJSON or SSE)postProcessorName: Name of custom post-processorpostProcessorConfig: Configuration dictionary for post-processor
StreamingFormat
Enumeration of supported streaming formats:
public enum StreamingFormat {
NDJSON("application/x-ndjson"), // Newline-delimited JSON
SSE("text/event-stream") // Server-Sent Events
}Parameters
ChatPostProcessor Parameters
message(required): Query message for semantic searchspaceIds: List of space UUIDs to search withinrequestedSize: Maximum number of memories to retrievefetchMemory: Whether to fetch memory definitions (default: true)fetchMemoryContent: Whether to fetch original content (default: false)format: Streaming format -NDJSON(default) orSSEllmId: UUID of LLM for abstract generationrerankerId: UUID of reranker for result rerankingrelevanceThreshold: Minimum relevance score (default: 0.5)llmTemp: LLM temperature for generation (default: 0.3)maxResults: Maximum results to return (default: 10)chronologicalResort: Whether to resort by creation time (default: true)
Error Handling
try {
Stream<MemoryStreamResponse> stream = streamingClient.retrieveMemoryStreamChat(...);
stream.forEach(event -> {
// Check for errors
if (event.getStatus() != null && event.getStatus().getCode() != 0) {
System.err.println("Error: " + event.getStatus().getMessage());
return;
}
// Process event
if (event.getAbstractReply() != null) {
System.out.println("Reply: " + event.getAbstractReply().getText());
}
});
} catch (Exception e) {
System.err.println("Streaming failed: " + e.getMessage());
e.printStackTrace();
}Authorization
Streaming operations require authentication using an API key:
ApiClient apiClient = new ApiClient();
apiClient.setApiKey("your-api-key");See ApiKeyAuth for more details.
Return Type
Stream<MemoryStreamResponse> - A Java stream that yields MemoryStreamResponse events as they arrive from the server.
See Also
- MemoriesApi - Memory management API
- AbstractReply - AI-generated response
- ApiKeyAuth - Authentication