Class 7: Subscriptions
Duration: 30 minutes Difficulty: Intermediate Prerequisites: Completed Classes 1-6
What You'll Learn
By the end of this class, you will:
- Understand GraphQL subscriptions and when to use them
- Configure WebSocket support in Spring GraphQL
- Implement subscription resolvers
- Push real-time updates to clients
- Test subscriptions in GraphiQL
What Are Subscriptions?
Subscriptions are GraphQL's answer to real-time data. Unlike queries (request-response) and mutations (request-response with side effects), subscriptions maintain an open connection and push data to clients when events occur.
┌─────────────────────────────────────────────────────────────────────┐
│ QUERY vs SUBSCRIPTION │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ QUERY (Pull) SUBSCRIPTION (Push) │
│ ──────────────────────────────────────────────────────────────── │
│ Client: "Give me movies" Client: "Tell me when movies │
│ Server: [Movies] are added" │
│ (Connection closes) Server: (keeps connection open) │
│ ... │
│ Client: "Give me movies" Server: "New movie: Dune 2!" │
│ Server: [Movies + new] Server: "New movie: Oppenheimer!"│
│ (Connection closes) ... │
│ │
│ Use Case: Fetch data on demand Use Case: Real-time updates │
│ │
└─────────────────────────────────────────────────────────────────────┘
Step 1: Add WebSocket Dependency
Add the WebSocket dependency to your pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Step 2: Enable WebSocket in GraphQL
📁 Update src/main/resources/application.properties:
# Existing properties
spring.graphql.graphiql.enabled=true
spring.graphql.graphiql.path=/graphiql
# WebSocket for subscriptions
spring.graphql.websocket.path=/graphql
spring.graphql.websocket.connection-init-timeout=60s
That's it! Spring Boot auto-configures WebSocket support when the dependency is present.
Step 3: Update the Schema
Add subscription types to your schema:
📁 Update src/main/resources/graphql/schema.graphqls:
type Subscription {
"Receive notifications when a new movie is added"
movieAdded: Movie!
"Receive notifications when a movie is updated"
movieUpdated: Movie!
"Receive notifications when a movie is deleted"
movieDeleted: DeleteNotification!
"Receive notifications for a specific genre"
movieAddedByGenre(genre: Genre!): Movie!
"Receive all movie events"
movieEvents: MovieEvent!
}
type DeleteNotification {
id: ID!
title: String!
deletedAt: String!
}
type MovieEvent {
type: EventType!
movie: Movie
movieId: ID
timestamp: String!
}
enum EventType {
CREATED
UPDATED
DELETED
}
Step 4: Create Event Publisher Service
We need a service to publish events that subscriptions will receive:
📁 src/main/java/com/example/moviedb/service/MovieEventPublisher.java
package com.example.moviedb.service;
import com.example.moviedb.model.Genre;
import com.example.moviedb.model.Movie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.time.Instant;
@Service
public class MovieEventPublisher {
private static final Logger log = LoggerFactory.getLogger(MovieEventPublisher.class);
// Sinks for different event types
private final Sinks.Many<Movie> movieAddedSink;
private final Sinks.Many<Movie> movieUpdatedSink;
private final Sinks.Many<DeleteNotification> movieDeletedSink;
private final Sinks.Many<MovieEvent> allEventsSink;
public MovieEventPublisher() {
// multicast() allows multiple subscribers
// onBackpressureBuffer() handles slow subscribers
this.movieAddedSink = Sinks.many().multicast().onBackpressureBuffer();
this.movieUpdatedSink = Sinks.many().multicast().onBackpressureBuffer();
this.movieDeletedSink = Sinks.many().multicast().onBackpressureBuffer();
this.allEventsSink = Sinks.many().multicast().onBackpressureBuffer();
}
// ===== PUBLISH METHODS (called from mutations) =====
public void publishMovieAdded(Movie movie) {
log.info("📢 Publishing movieAdded event: {}", movie.getTitle());
movieAddedSink.tryEmitNext(movie);
allEventsSink.tryEmitNext(new MovieEvent(EventType.CREATED, movie, movie.getId()));
}
public void publishMovieUpdated(Movie movie) {
log.info("📢 Publishing movieUpdated event: {}", movie.getTitle());
movieUpdatedSink.tryEmitNext(movie);
allEventsSink.tryEmitNext(new MovieEvent(EventType.UPDATED, movie, movie.getId()));
}
public void publishMovieDeleted(String movieId, String movieTitle) {
log.info("📢 Publishing movieDeleted event: {}", movieTitle);
DeleteNotification notification = new DeleteNotification(
movieId, movieTitle, Instant.now().toString()
);
movieDeletedSink.tryEmitNext(notification);
allEventsSink.tryEmitNext(new MovieEvent(EventType.DELETED, null, movieId));
}
// ===== SUBSCRIBE METHODS (used by subscription resolvers) =====
public Flux<Movie> getMovieAddedFlux() {
return movieAddedSink.asFlux();
}
public Flux<Movie> getMovieAddedByGenreFlux(Genre genre) {
return movieAddedSink.asFlux()
.filter(movie -> movie.getGenre() == genre);
}
public Flux<Movie> getMovieUpdatedFlux() {
return movieUpdatedSink.asFlux();
}
public Flux<DeleteNotification> getMovieDeletedFlux() {
return movieDeletedSink.asFlux();
}
public Flux<MovieEvent> getAllEventsFlux() {
return allEventsSink.asFlux();
}
// ===== INNER CLASSES =====
public static class DeleteNotification {
private final String id;
private final String title;
private final String deletedAt;
public DeleteNotification(String id, String title, String deletedAt) {
this.id = id;
this.title = title;
this.deletedAt = deletedAt;
}
public String getId() { return id; }
public String getTitle() { return title; }
public String getDeletedAt() { return deletedAt; }
}
public static class MovieEvent {
private final EventType type;
private final Movie movie;
private final String movieId;
private final String timestamp;
public MovieEvent(EventType type, Movie movie, String movieId) {
this.type = type;
this.movie = movie;
this.movieId = movieId;
this.timestamp = Instant.now().toString();
}
public EventType getType() { return type; }
public Movie getMovie() { return movie; }
public String getMovieId() { return movieId; }
public String getTimestamp() { return timestamp; }
}
public enum EventType {
CREATED, UPDATED, DELETED
}
}
Step 5: Create Subscription Controller
📁 src/main/java/com/example/moviedb/controller/SubscriptionController.java
package com.example.moviedb.controller;
import com.example.moviedb.model.Genre;
import com.example.moviedb.model.Movie;
import com.example.moviedb.service.MovieEventPublisher;
import com.example.moviedb.service.MovieEventPublisher.DeleteNotification;
import com.example.moviedb.service.MovieEventPublisher.MovieEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.graphql.data.method.annotation.Argument;
import org.springframework.graphql.data.method.annotation.SubscriptionMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
@Controller
public class SubscriptionController {
private static final Logger log = LoggerFactory.getLogger(SubscriptionController.class);
private final MovieEventPublisher eventPublisher;
public SubscriptionController(MovieEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@SubscriptionMapping
public Flux<Movie> movieAdded() {
log.info("🔔 New subscriber to movieAdded");
return eventPublisher.getMovieAddedFlux()
.doOnSubscribe(s -> log.info("Client subscribed to movieAdded"))
.doOnCancel(() -> log.info("Client unsubscribed from movieAdded"));
}
@SubscriptionMapping
public Flux<Movie> movieUpdated() {
log.info("🔔 New subscriber to movieUpdated");
return eventPublisher.getMovieUpdatedFlux();
}
@SubscriptionMapping
public Flux<DeleteNotification> movieDeleted() {
log.info("🔔 New subscriber to movieDeleted");
return eventPublisher.getMovieDeletedFlux();
}
@SubscriptionMapping
public Flux<Movie> movieAddedByGenre(@Argument Genre genre) {
log.info("🔔 New subscriber to movieAddedByGenre: {}", genre);
return eventPublisher.getMovieAddedByGenreFlux(genre);
}
@SubscriptionMapping
public Flux<MovieEvent> movieEvents() {
log.info("🔔 New subscriber to all movieEvents");
return eventPublisher.getAllEventsFlux();
}
}
Step 6: Integrate with Mutations
Update MutationController to publish events:
📁 Update src/main/java/com/example/moviedb/controller/MutationController.java:
@Controller
public class MutationController {
private final MovieRepository movieRepository;
private final ActorRepository actorRepository;
private final DirectorRepository directorRepository;
private final MovieEventPublisher eventPublisher; // Add this
public MutationController(MovieRepository movieRepository,
ActorRepository actorRepository,
DirectorRepository directorRepository,
MovieEventPublisher eventPublisher) { // Add this
this.movieRepository = movieRepository;
this.actorRepository = actorRepository;
this.directorRepository = directorRepository;
this.eventPublisher = eventPublisher; // Add this
}
@MutationMapping
public Movie createMovie(@Argument CreateMovieInput input) {
// ... validation code ...
Movie movie = new Movie(/* ... */);
Movie savedMovie = movieRepository.save(movie);
// 🔔 Publish event
eventPublisher.publishMovieAdded(savedMovie);
return savedMovie;
}
@MutationMapping
public Movie updateMovie(@Argument String id, @Argument UpdateMovieInput input) {
Movie existing = movieRepository.findById(id)
.orElseThrow(() -> new NotFoundException("Movie", id));
Movie updated = new Movie(/* ... */);
Movie savedMovie = movieRepository.save(updated);
// 🔔 Publish event
eventPublisher.publishMovieUpdated(savedMovie);
return savedMovie;
}
@MutationMapping
public DeleteResponse deleteMovie(@Argument String id) {
Movie movie = movieRepository.findById(id)
.orElseThrow(() -> new NotFoundException("Movie", id));
String title = movie.getTitle();
movieRepository.delete(id);
// 🔔 Publish event
eventPublisher.publishMovieDeleted(id, title);
return DeleteResponse.success(id);
}
// ... other mutations
}
Step 7: Test Subscriptions
Restart your application. Here's how to test:
Using GraphiQL
- Open http://localhost:8080/graphiql
- In one tab, start a subscription:
subscription {
movieAdded {
id
title
genre
rating
}
}
- Click Play - the subscription will stay open waiting for events
- In another browser tab, run a mutation:
mutation {
createMovie(input: {
title: "Test Movie"
releaseYear: 2024
genre: ACTION
rating: 8.0
directorId: "1"
}) {
id
title
}
}
- Check the first tab - you should see the new movie appear!
Testing Filtered Subscriptions
Subscribe to only SCIFI movies:
subscription {
movieAddedByGenre(genre: SCIFI) {
title
genre
}
}
Then create movies with different genres - only SCIFI movies will appear.
Testing All Events
subscription {
movieEvents {
type
movieId
movie {
title
}
timestamp
}
}
Now create, update, and delete movies to see all events.
Understanding Reactive Streams
┌─────────────────────────────────────────────────────────────────────┐
│ REACTIVE FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Mutation Sink Subscriber │
│ ───────────────────────────────────────────────────────────────── │
│ │
│ createMovie() ──────▶ movieAddedSink.tryEmitNext(movie) │
│ │ │
│ ▼ │
│ movieAddedSink.asFlux() │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ Client 1 Client 2 Client 3 │
│ (WebSocket connections) │
│ │
│ Sinks.Many = "Hot" publisher │
│ - multicast() = all subscribers get all events │
│ - onBackpressureBuffer() = buffer if subscriber is slow │
│ │
└─────────────────────────────────────────────────────────────────────┘
Advanced: Subscription with Authentication
Add user context to subscriptions:
@SubscriptionMapping
public Flux<Movie> movieAdded(DataFetchingEnvironment env) {
// Get user from context (if authenticated)
String userId = env.getGraphQlContext().get("userId");
log.info("User {} subscribed to movieAdded", userId);
return eventPublisher.getMovieAddedFlux()
.doOnNext(movie -> log.info("Sending movie {} to user {}", movie.getTitle(), userId));
}
Advanced: Heartbeat for Connection Health
Keep connections alive with periodic heartbeats:
type Subscription {
# ... other subscriptions
heartbeat: Heartbeat!
}
type Heartbeat {
timestamp: String!
serverTime: String!
}
@SubscriptionMapping
public Flux<Heartbeat> heartbeat() {
return Flux.interval(Duration.ofSeconds(30))
.map(tick -> new Heartbeat(
Instant.now().toString(),
LocalDateTime.now().toString()
));
}
Handling Subscription Errors
@SubscriptionMapping
public Flux<Movie> movieAdded() {
return eventPublisher.getMovieAddedFlux()
.onErrorResume(error -> {
log.error("Subscription error", error);
return Flux.empty(); // End gracefully
})
.doOnError(error -> log.error("Error in subscription", error))
.doFinally(signalType -> log.info("Subscription ended: {}", signalType));
}
Exercises
Exercise 1: Actor Subscriptions
Add subscriptions for actors:
actorAdded: Actor!actorAddedToMovie(movieId: ID!): Actor!
Exercise 2: Countdown Subscription
Create a countdown subscription:
subscription {
countdown(from: 10) # Returns 10, 9, 8, ... 1, 0
}
Solution
@SubscriptionMapping
public Flux<Integer> countdown(@Argument int from) {
return Flux.range(0, from + 1)
.map(i -> from - i)
.delayElements(Duration.ofSeconds(1));
}
Exercise 3: Rate Limiting
Limit events to 1 per second per client:
@SubscriptionMapping
public Flux<Movie> movieAddedThrottled() {
return eventPublisher.getMovieAddedFlux()
.sampleFirst(Duration.ofSeconds(1)); // Max 1 event per second
}
Common Issues
Issue: Subscription doesn't receive events
- Check that you're publishing to the sink
- Verify WebSocket is enabled in properties
- Check browser console for connection errors
Issue: Events received multiple times
- Make sure you don't have multiple sinks
- Use
Sinks.many().multicast(), notreplay()
Issue: Subscription closes immediately
- Return a
Flux, not aMono - Don't call
.block()on the Flux
Summary
In this class, you learned:
✅ GraphQL subscriptions for real-time updates
✅ Configuring WebSocket in Spring GraphQL
✅ Using Reactor Sinks for event publishing
✅ @SubscriptionMapping annotation
✅ Filtering subscription events
✅ Testing subscriptions in GraphiQL
What's Next?
In Class 8: Security, we'll learn:
- Authentication and authorization
- Securing queries and mutations
- Field-level permissions
- Protecting sensitive data
Time to lock down your API!