Skip to main content

Real-Time GraphQL with Spring - Subscriptions and WebSockets

· 6 min read
GraphQL Guy

GraphQL Subscriptions

Push live updates to clients with GraphQL subscriptions. Build a real-time notification system using Spring GraphQL and WebSocket.

What Are GraphQL Subscriptions?

While queries fetch data once and mutations change data, subscriptions maintain a persistent connection for real-time updates:

subscription {
bookAdded {
id
title
author { name }
}
}

When a new book is added, all subscribed clients receive an update automatically. No polling required.

Use Cases

  • Live notifications
  • Chat applications
  • Real-time dashboards
  • Collaborative editing
  • Live sports scores
  • Stock tickers

Setting Up WebSocket Support

Dependencies

Add WebSocket support to your project:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-graphql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>

Configuration

Enable WebSocket endpoint in application.yml:

spring:
graphql:
websocket:
path: /graphql
connection-init-timeout: 60s
graphiql:
enabled: true

WebSocket Configuration Class

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
}

Defining Subscriptions in Schema

type Query {
books: [Book!]!
}

type Mutation {
createBook(input: CreateBookInput!): Book!
}

type Subscription {
bookAdded: Book!
bookUpdated(id: ID): Book!
notifications(userId: ID!): Notification!
}

type Book {
id: ID!
title: String!
author: Author!
}

type Notification {
id: ID!
type: NotificationType!
message: String!
createdAt: String!
}

enum NotificationType {
BOOK_ADDED
BOOK_UPDATED
COMMENT_ADDED
MENTION
}

input CreateBookInput {
title: String!
authorId: ID!
}

Implementing Subscriptions

Using Reactor's Flux

Spring GraphQL uses Project Reactor. Subscriptions return Flux<T>:

@Controller
public class BookSubscriptionController {

private final Sinks.Many<Book> bookSink;

public BookSubscriptionController() {
// Create a sink that replays the last event to new subscribers
this.bookSink = Sinks.many().multicast().onBackpressureBuffer();
}

@SubscriptionMapping
public Flux<Book> bookAdded() {
return bookSink.asFlux();
}

// Called by other parts of the app to publish events
public void publishBookAdded(Book book) {
bookSink.tryEmitNext(book);
}
}

Publishing Events from Mutations

@Controller
public class BookMutationController {

private final BookService bookService;
private final BookSubscriptionController subscriptionController;

public BookMutationController(BookService bookService,
BookSubscriptionController subscriptionController) {
this.bookService = bookService;
this.subscriptionController = subscriptionController;
}

@MutationMapping
public Book createBook(@Argument CreateBookInput input) {
Book book = bookService.createBook(input);

// Publish to subscribers
subscriptionController.publishBookAdded(book);

return book;
}
}

Filtered Subscriptions

Allow clients to subscribe to specific events:

@Controller
public class NotificationController {

private final Sinks.Many<Notification> notificationSink;

public NotificationController() {
this.notificationSink = Sinks.many().multicast().onBackpressureBuffer();
}

@SubscriptionMapping
public Flux<Notification> notifications(@Argument String userId) {
return notificationSink.asFlux()
.filter(notification -> notification.targetUserId().equals(userId));
}

@SubscriptionMapping
public Flux<Book> bookUpdated(@Argument String id) {
return bookUpdateSink.asFlux()
.filter(book -> id == null || book.id().equals(id));
}

public void publishNotification(Notification notification) {
notificationSink.tryEmitNext(notification);
}
}

Event-Driven Architecture

For production systems, use Spring's event system or a message broker:

Using Spring Events

// Event class
public record BookCreatedEvent(Book book, Instant timestamp) {}

// Publisher
@Service
public class BookService {

private final ApplicationEventPublisher eventPublisher;

public Book createBook(CreateBookInput input) {
Book book = // ... create book

eventPublisher.publishEvent(new BookCreatedEvent(book, Instant.now()));

return book;
}
}

// Subscription controller
@Controller
public class BookSubscriptionController {

private final Sinks.Many<Book> bookSink = Sinks.many().multicast().onBackpressureBuffer();

@EventListener
public void handleBookCreated(BookCreatedEvent event) {
bookSink.tryEmitNext(event.book());
}

@SubscriptionMapping
public Flux<Book> bookAdded() {
return bookSink.asFlux();
}
}

Using Redis Pub/Sub (Scalable)

For multiple server instances:

@Configuration
public class RedisConfig {

@Bean
public ReactiveRedisTemplate<String, Book> reactiveRedisTemplate(
ReactiveRedisConnectionFactory factory) {
RedisSerializationContext<String, Book> context = RedisSerializationContext
.<String, Book>newSerializationContext(new StringRedisSerializer())
.value(new Jackson2JsonRedisSerializer<>(Book.class))
.build();
return new ReactiveRedisTemplate<>(factory, context);
}
}

@Controller
public class BookSubscriptionController {

private final ReactiveRedisTemplate<String, Book> redisTemplate;

@SubscriptionMapping
public Flux<Book> bookAdded() {
return redisTemplate.listenToChannel("books:created")
.map(message -> message.getMessage());
}
}

@Service
public class BookService {

private final ReactiveRedisTemplate<String, Book> redisTemplate;

public Book createBook(CreateBookInput input) {
Book book = // ... create book

redisTemplate.convertAndSend("books:created", book).subscribe();

return book;
}
}

Client Implementation

JavaScript with graphql-ws

import { createClient } from 'graphql-ws';

const client = createClient({
url: 'ws://localhost:8080/graphql',
});

// Subscribe to new books
const unsubscribe = client.subscribe(
{
query: `subscription {
bookAdded {
id
title
author { name }
}
}`,
},
{
next: (data) => {
console.log('New book:', data.data.bookAdded);
// Update UI
addBookToList(data.data.bookAdded);
},
error: (err) => {
console.error('Subscription error:', err);
},
complete: () => {
console.log('Subscription completed');
},
}
);

// Later: unsubscribe
unsubscribe();

React with Apollo Client

import { useSubscription, gql } from '@apollo/client';

const BOOK_ADDED = gql`
subscription OnBookAdded {
bookAdded {
id
title
author { name }
}
}
`;

function BookList() {
const [books, setBooks] = useState([]);

const { data, loading, error } = useSubscription(BOOK_ADDED, {
onData: ({ data }) => {
setBooks(prev => [...prev, data.data.bookAdded]);
}
});

if (loading) return <p>Connecting...</p>;
if (error) return <p>Error: {error.message}</p>;

return (
<ul>
{books.map(book => (
<li key={book.id}>{book.title} by {book.author.name}</li>
))}
</ul>
);
}

Connection Lifecycle

Understanding the WebSocket connection lifecycle:

┌─────────────────────────────────────────────────────────────────┐
│ WebSocket Connection │
└─────────────────────────────────────────────────────────────────┘

Client Server
│ │
│──────────── WebSocket Handshake ──────────────────▶│
│◀─────────── Connection Accepted ──────────────────│
│ │
│──────────── connection_init ──────────────────────▶│
│◀─────────── connection_ack ───────────────────────│
│ │
│──────────── subscribe (id: "1") ──────────────────▶│
│ │
│◀─────────── next (id: "1", data) ─────────────────│
│◀─────────── next (id: "1", data) ─────────────────│
│◀─────────── next (id: "1", data) ─────────────────│
│ │
│──────────── complete (id: "1") ───────────────────▶│
│ │
│──────────── connection_terminate ─────────────────▶│
│ │

Handling Connection Authentication

Authenticate WebSocket connections:

@Component
public class SubscriptionInterceptor implements WebSocketInterceptor {

private final AuthService authService;

@Override
public Mono<Object> handleConnectionInitialization(
WebSocketSessionInfo info, Map<String, Object> payload) {

String token = (String) payload.get("authToken");

if (token == null) {
return Mono.error(new UnauthorizedException("Auth token required"));
}

return authService.validateToken(token)
.map(user -> Map.of("user", user)) // Store in session
.switchIfEmpty(Mono.error(new UnauthorizedException("Invalid token")));
}
}

Client sends token during connection:

const client = createClient({
url: 'ws://localhost:8080/graphql',
connectionParams: {
authToken: 'your-jwt-token'
},
});

Testing Subscriptions

@SpringBootTest
@AutoConfigureWebTestClient
class BookSubscriptionTest {

@Autowired
private WebTestClient webTestClient;

@Autowired
private BookSubscriptionController subscriptionController;

@Test
void shouldReceiveBookAddedEvents() {
// Create a test client for WebSocket
Flux<Book> subscription = subscriptionController.bookAdded();

StepVerifier.create(subscription.take(2))
.then(() -> {
// Simulate adding books
subscriptionController.publishBookAdded(
new Book("1", "Book 1", "author-1", null, null));
subscriptionController.publishBookAdded(
new Book("2", "Book 2", "author-1", null, null));
})
.expectNextMatches(book -> book.title().equals("Book 1"))
.expectNextMatches(book -> book.title().equals("Book 2"))
.verifyComplete();
}
}

Production Considerations

1. Connection Limits

Limit connections per user:

@Component
public class ConnectionLimitInterceptor implements WebSocketInterceptor {

private final Map<String, AtomicInteger> connectionCounts = new ConcurrentHashMap<>();
private static final int MAX_CONNECTIONS = 5;

@Override
public Mono<Object> handleConnectionInitialization(
WebSocketSessionInfo info, Map<String, Object> payload) {

String userId = extractUserId(payload);
AtomicInteger count = connectionCounts.computeIfAbsent(
userId, k -> new AtomicInteger(0));

if (count.incrementAndGet() > MAX_CONNECTIONS) {
count.decrementAndGet();
return Mono.error(new TooManyConnectionsException());
}

return Mono.just(payload);
}
}

2. Heartbeat/Keep-Alive

Configure ping/pong intervals:

spring:
graphql:
websocket:
connection-init-timeout: 60s
# Keep-alive ping every 30 seconds
server:
servlet:
session:
timeout: 30m

3. Graceful Shutdown

Handle server restarts:

@PreDestroy
public void shutdown() {
bookSink.tryEmitComplete();
}

Complete Example: Live Notifications

Here's a complete notification system:

type Subscription {
notifications(userId: ID!): Notification!
}

type Notification {
id: ID!
type: NotificationType!
title: String!
message: String!
link: String
read: Boolean!
createdAt: String!
}

enum NotificationType {
NEW_BOOK
NEW_COMMENT
MENTION
SYSTEM
}
@Controller
public class NotificationSubscriptionController {

private final Sinks.Many<Notification> sink =
Sinks.many().multicast().onBackpressureBuffer();

@SubscriptionMapping
public Flux<Notification> notifications(@Argument String userId) {
return sink.asFlux()
.filter(n -> n.targetUserId().equals(userId))
.doOnSubscribe(s -> log.info("User {} subscribed to notifications", userId))
.doOnCancel(() -> log.info("User {} unsubscribed", userId));
}

@EventListener
public void handleNewBook(BookCreatedEvent event) {
// Notify followers of the author
event.author().followers().forEach(followerId -> {
Notification notification = new Notification(
UUID.randomUUID().toString(),
NotificationType.NEW_BOOK,
followerId,
"New Book Available",
event.author().name() + " published: " + event.book().title(),
"/books/" + event.book().id(),
false,
Instant.now()
);
sink.tryEmitNext(notification);
});
}
}

Summary

ConceptImplementation
Define subscriptiontype Subscription { ... } in schema
Return typeFlux<T> from controller method
Annotation@SubscriptionMapping
Event publishingSinks.Many<T> or Spring Events
ScalingRedis Pub/Sub or Kafka
AuthenticationWebSocketInterceptor

Subscriptions bring your GraphQL API to life with real-time capabilities. Combined with Spring's reactive support, you can build scalable, event-driven systems.

Next: DataLoader and Batch Loading - solving the N+1 problem in Spring GraphQL.