Please checkout these tutorials on streaming chat model using LangChain4j's low-level API.
Real-time Response Streaming
Streaming allows LLM responses to be delivered token-by-token as they're generated, providing a more interactive user experience. AI Services support streaming through the TokenStream return type, which offers callbacks for different streaming events. This is useful for:
- Building responsive chat interfaces
- Showing progress indicators during long generations
- Implementing typewriter-style text display
- Canceling long-running generations
Streaming Components
- StreamingChatModel: Model that supports token streaming
- TokenStream: Handle for consuming streamed tokens
- Callbacks: Event handlers for partial responses, errors, completion
- StreamingHandle: Control mechanism for cancellation
Streaming Events
TokenStream provides various callbacks:
onPartialResponse: Each new token or chunk
onCompleteResponse: Final complete response
onError: Streaming errors
onPartialToolCall: Tool call generation progress
onToolExecuted: Tool execution results
Cancellation Support
Streaming can be canceled using StreamingHandle.cancel(), which stops token generation and closes the connection. This is useful for:
- User-initiated cancellation
- Timeout handling
- Resource management
Example
The following example uses Ollama and phi3:mini-128k which is good for a demo and learning but not good for production-grade applications because it has limited reasoning capabilities and accuracy for complex tasks.
This example demonstrates how to use LangChain4j with an streaming chat model to receive AI responses token by token instead of waiting for a full reply. It defines an AI service interface that returns a TokenStream, allowing partial responses to be processed as they arrive. The program shows three scenarios: basic streaming with live token output and counting, streaming while collecting the final response using a CompletableFuture, and streaming with simulated cancellation after a fixed number of tokens. Together, these examples illustrate how streaming enables real-time feedback, asynchronous handling, and controlled interruption of AI-generated responses.
package com.logicbig.example;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.ollama.OllamaStreamingChatModel;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.TokenStream;
import dev.langchain4j.service.UserMessage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamingExample {
interface StreamingAssistant {
@SystemMessage("Always give short answers.")
@UserMessage("{{it}}")
TokenStream chatStream(String message);
}
public static void main(String[] args) throws Exception {
// Create streaming model
StreamingChatModel model =
OllamaStreamingChatModel.builder()
.baseUrl("http://localhost:11434")
.modelName("phi3:mini-128k")
.temperature(0.7)
.build();
StreamingAssistant assistant =
AiServices.create(StreamingAssistant.class, model);
System.out.println("=== Basic Streaming ===");
basicStreaming(assistant);
System.out.println("\n\n=== Streaming with Completion Future ===");
streamingWithFuture(assistant);
System.out.println("\n\n=== Simulated Cancellation ===");
streamingWithCancellation(assistant);
}
private static void basicStreaming(StreamingAssistant assistant) {
CountDownLatch latch = new CountDownLatch(1);
String message = "Tell me about artificial intelligence";
System.out.println("message: " + message);
System.out.println("Streaming response:");
TokenStream stream = assistant.chatStream(message);
AtomicInteger tokenCount = new AtomicInteger(0);
stream.onPartialResponse(partial -> {
System.out.print(partial);
tokenCount.incrementAndGet();
})
.onCompleteResponse(response -> {
System.out.println("\n\nTotal tokens: " + tokenCount.get());
latch.countDown();
})
.onError(error -> {
System.err.println("\nError: " + error.getMessage());
latch.countDown();
})
.start();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void streamingWithFuture(StreamingAssistant assistant) throws Exception {
CompletableFuture<String> completeResponse = new CompletableFuture<>();
StringBuilder fullResponse = new StringBuilder();
String message = "Explain machine learning in simple terms";
System.out.println("message: " + message);
System.out.println("streaming response:");
TokenStream stream = assistant.chatStream(message);
stream
.onPartialResponse(partial -> {
System.out.print(partial);
fullResponse.append(partial);
})
.onCompleteResponse(response -> {
completeResponse.complete(fullResponse.toString());
})
.onError(error -> {
completeResponse.completeExceptionally(error);
})
.start();
// Wait for completion
String finalResponse = completeResponse.get();
System.out.println("\n\nFinal response length: " + finalResponse.length() + " characters");
}
private static void streamingWithCancellation(StreamingAssistant assistant) {
CountDownLatch latch = new CountDownLatch(1);
String message = "Write a long story about a dragon";
System.out.println("message: " + message);
TokenStream stream = assistant.chatStream(message);
System.out.println("Streaming response:");
AtomicInteger tokenCount = new AtomicInteger(0);
stream.onPartialResponseWithContext((partial, context) -> {
System.out.print(partial.text());
int count = tokenCount.incrementAndGet();
// Simulate cancellation after 10 tokens
if (count >= 10) {
System.out.println("\n\n[CANCELLED after " + count + " tokens]");
latch.countDown();
context.streamingHandle().cancel();
}
}).onCompleteResponse(response -> {
System.out.println("\n\nCompleted: " + tokenCount.get() + " tokens");
latch.countDown();
})
.onError(error -> {
error.printStackTrace();
latch.countDown();
})
.start();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Output=== Basic Streaming === message: Tell me about artificial intelligence Streaming response: AI refers to machines designed to mimic human intelligence and perform tasks such as recognizing speech, learning, solving puzzles, and making decisions.
Total tokens: 31
=== Streaming with Completion Future === message: Explain machine learning in simple terms streaming response: Machine learning is a type of artificial intelligence where computers learn from data, identify patterns, and make decisions with minimal human intervention. Imagine teaching someone new skills by showing them examples; that's what machines do here! They take loads of information (data), analyze it repeatedly to get better at tasks like recognizing faces in photos or predicting weather changes without being explicitly programmed for each specific task.
Final response length: 456 characters
=== Simulated Cancellation === message: Write a long story about a dragon Streaming response: Once upon a time, in the land of E
[CANCELLED after 10 tokens]
Conclusion
The output demonstrates real-time token streaming. Each token appears as it's generated, providing immediate feedback to users. The streaming continues until the complete response is received or manually canceled. The AI Service proxy manages the streaming connection, token buffering, and event dispatching, allowing you to focus on processing the streamed content.
Example ProjectDependencies and Technologies Used: - langchain4j 1.10.0 (Build LLM-powered applications in Java: chatbots, agents, RAG, and much more)
- langchain4j-ollama 1.10.0 (LangChain4j :: Integration :: Ollama)
- slf4j-simple 2.0.9 (SLF4J Simple Provider)
- JDK 17
- Maven 3.9.11
|