Skip to main content

Class 7: Subscriptions

Duration: 30 minutes Difficulty: Intermediate Prerequisites: Completed Classes 1-6

What You'll Learn

By the end of this class, you will:

  • Understand GraphQL subscriptions and when to use them
  • Configure WebSocket support in Spring GraphQL
  • Implement subscription resolvers
  • Push real-time updates to clients
  • Test subscriptions in GraphiQL

What Are Subscriptions?

Subscriptions are GraphQL's answer to real-time data. Unlike queries (request-response) and mutations (request-response with side effects), subscriptions maintain an open connection and push data to clients when events occur.

┌─────────────────────────────────────────────────────────────────────┐
│ QUERY vs SUBSCRIPTION │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ QUERY (Pull) SUBSCRIPTION (Push) │
│ ──────────────────────────────────────────────────────────────── │
│ Client: "Give me movies" Client: "Tell me when movies │
│ Server: [Movies] are added" │
│ (Connection closes) Server: (keeps connection open) │
│ ... │
│ Client: "Give me movies" Server: "New movie: Dune 2!" │
│ Server: [Movies + new] Server: "New movie: Oppenheimer!"│
│ (Connection closes) ... │
│ │
│ Use Case: Fetch data on demand Use Case: Real-time updates │
│ │
└─────────────────────────────────────────────────────────────────────┘

Step 1: Add WebSocket Dependency

Add the WebSocket dependency to your pom.xml:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Step 2: Enable WebSocket in GraphQL

📁 Update src/main/resources/application.properties:

# Existing properties
spring.graphql.graphiql.enabled=true
spring.graphql.graphiql.path=/graphiql

# WebSocket for subscriptions
spring.graphql.websocket.path=/graphql
spring.graphql.websocket.connection-init-timeout=60s

That's it! Spring Boot auto-configures WebSocket support when the dependency is present.

Step 3: Update the Schema

Add subscription types to your schema:

📁 Update src/main/resources/graphql/schema.graphqls:

type Subscription {
"Receive notifications when a new movie is added"
movieAdded: Movie!

"Receive notifications when a movie is updated"
movieUpdated: Movie!

"Receive notifications when a movie is deleted"
movieDeleted: DeleteNotification!

"Receive notifications for a specific genre"
movieAddedByGenre(genre: Genre!): Movie!

"Receive all movie events"
movieEvents: MovieEvent!
}

type DeleteNotification {
id: ID!
title: String!
deletedAt: String!
}

type MovieEvent {
type: EventType!
movie: Movie
movieId: ID
timestamp: String!
}

enum EventType {
CREATED
UPDATED
DELETED
}

Step 4: Create Event Publisher Service

We need a service to publish events that subscriptions will receive:

📁 src/main/java/com/example/moviedb/service/MovieEventPublisher.java

package com.example.moviedb.service;

import com.example.moviedb.model.Genre;
import com.example.moviedb.model.Movie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.time.Instant;

@Service
public class MovieEventPublisher {

private static final Logger log = LoggerFactory.getLogger(MovieEventPublisher.class);

// Sinks for different event types
private final Sinks.Many<Movie> movieAddedSink;
private final Sinks.Many<Movie> movieUpdatedSink;
private final Sinks.Many<DeleteNotification> movieDeletedSink;
private final Sinks.Many<MovieEvent> allEventsSink;

public MovieEventPublisher() {
// multicast() allows multiple subscribers
// onBackpressureBuffer() handles slow subscribers
this.movieAddedSink = Sinks.many().multicast().onBackpressureBuffer();
this.movieUpdatedSink = Sinks.many().multicast().onBackpressureBuffer();
this.movieDeletedSink = Sinks.many().multicast().onBackpressureBuffer();
this.allEventsSink = Sinks.many().multicast().onBackpressureBuffer();
}

// ===== PUBLISH METHODS (called from mutations) =====

public void publishMovieAdded(Movie movie) {
log.info("📢 Publishing movieAdded event: {}", movie.getTitle());
movieAddedSink.tryEmitNext(movie);
allEventsSink.tryEmitNext(new MovieEvent(EventType.CREATED, movie, movie.getId()));
}

public void publishMovieUpdated(Movie movie) {
log.info("📢 Publishing movieUpdated event: {}", movie.getTitle());
movieUpdatedSink.tryEmitNext(movie);
allEventsSink.tryEmitNext(new MovieEvent(EventType.UPDATED, movie, movie.getId()));
}

public void publishMovieDeleted(String movieId, String movieTitle) {
log.info("📢 Publishing movieDeleted event: {}", movieTitle);
DeleteNotification notification = new DeleteNotification(
movieId, movieTitle, Instant.now().toString()
);
movieDeletedSink.tryEmitNext(notification);
allEventsSink.tryEmitNext(new MovieEvent(EventType.DELETED, null, movieId));
}

// ===== SUBSCRIBE METHODS (used by subscription resolvers) =====

public Flux<Movie> getMovieAddedFlux() {
return movieAddedSink.asFlux();
}

public Flux<Movie> getMovieAddedByGenreFlux(Genre genre) {
return movieAddedSink.asFlux()
.filter(movie -> movie.getGenre() == genre);
}

public Flux<Movie> getMovieUpdatedFlux() {
return movieUpdatedSink.asFlux();
}

public Flux<DeleteNotification> getMovieDeletedFlux() {
return movieDeletedSink.asFlux();
}

public Flux<MovieEvent> getAllEventsFlux() {
return allEventsSink.asFlux();
}

// ===== INNER CLASSES =====

public static class DeleteNotification {
private final String id;
private final String title;
private final String deletedAt;

public DeleteNotification(String id, String title, String deletedAt) {
this.id = id;
this.title = title;
this.deletedAt = deletedAt;
}

public String getId() { return id; }
public String getTitle() { return title; }
public String getDeletedAt() { return deletedAt; }
}

public static class MovieEvent {
private final EventType type;
private final Movie movie;
private final String movieId;
private final String timestamp;

public MovieEvent(EventType type, Movie movie, String movieId) {
this.type = type;
this.movie = movie;
this.movieId = movieId;
this.timestamp = Instant.now().toString();
}

public EventType getType() { return type; }
public Movie getMovie() { return movie; }
public String getMovieId() { return movieId; }
public String getTimestamp() { return timestamp; }
}

public enum EventType {
CREATED, UPDATED, DELETED
}
}

Step 5: Create Subscription Controller

📁 src/main/java/com/example/moviedb/controller/SubscriptionController.java

package com.example.moviedb.controller;

import com.example.moviedb.model.Genre;
import com.example.moviedb.model.Movie;
import com.example.moviedb.service.MovieEventPublisher;
import com.example.moviedb.service.MovieEventPublisher.DeleteNotification;
import com.example.moviedb.service.MovieEventPublisher.MovieEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.graphql.data.method.annotation.Argument;
import org.springframework.graphql.data.method.annotation.SubscriptionMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;

@Controller
public class SubscriptionController {

private static final Logger log = LoggerFactory.getLogger(SubscriptionController.class);

private final MovieEventPublisher eventPublisher;

public SubscriptionController(MovieEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}

@SubscriptionMapping
public Flux<Movie> movieAdded() {
log.info("🔔 New subscriber to movieAdded");
return eventPublisher.getMovieAddedFlux()
.doOnSubscribe(s -> log.info("Client subscribed to movieAdded"))
.doOnCancel(() -> log.info("Client unsubscribed from movieAdded"));
}

@SubscriptionMapping
public Flux<Movie> movieUpdated() {
log.info("🔔 New subscriber to movieUpdated");
return eventPublisher.getMovieUpdatedFlux();
}

@SubscriptionMapping
public Flux<DeleteNotification> movieDeleted() {
log.info("🔔 New subscriber to movieDeleted");
return eventPublisher.getMovieDeletedFlux();
}

@SubscriptionMapping
public Flux<Movie> movieAddedByGenre(@Argument Genre genre) {
log.info("🔔 New subscriber to movieAddedByGenre: {}", genre);
return eventPublisher.getMovieAddedByGenreFlux(genre);
}

@SubscriptionMapping
public Flux<MovieEvent> movieEvents() {
log.info("🔔 New subscriber to all movieEvents");
return eventPublisher.getAllEventsFlux();
}
}

Step 6: Integrate with Mutations

Update MutationController to publish events:

📁 Update src/main/java/com/example/moviedb/controller/MutationController.java:

@Controller
public class MutationController {

private final MovieRepository movieRepository;
private final ActorRepository actorRepository;
private final DirectorRepository directorRepository;
private final MovieEventPublisher eventPublisher; // Add this

public MutationController(MovieRepository movieRepository,
ActorRepository actorRepository,
DirectorRepository directorRepository,
MovieEventPublisher eventPublisher) { // Add this
this.movieRepository = movieRepository;
this.actorRepository = actorRepository;
this.directorRepository = directorRepository;
this.eventPublisher = eventPublisher; // Add this
}

@MutationMapping
public Movie createMovie(@Argument CreateMovieInput input) {
// ... validation code ...

Movie movie = new Movie(/* ... */);
Movie savedMovie = movieRepository.save(movie);

// 🔔 Publish event
eventPublisher.publishMovieAdded(savedMovie);

return savedMovie;
}

@MutationMapping
public Movie updateMovie(@Argument String id, @Argument UpdateMovieInput input) {
Movie existing = movieRepository.findById(id)
.orElseThrow(() -> new NotFoundException("Movie", id));

Movie updated = new Movie(/* ... */);
Movie savedMovie = movieRepository.save(updated);

// 🔔 Publish event
eventPublisher.publishMovieUpdated(savedMovie);

return savedMovie;
}

@MutationMapping
public DeleteResponse deleteMovie(@Argument String id) {
Movie movie = movieRepository.findById(id)
.orElseThrow(() -> new NotFoundException("Movie", id));

String title = movie.getTitle();
movieRepository.delete(id);

// 🔔 Publish event
eventPublisher.publishMovieDeleted(id, title);

return DeleteResponse.success(id);
}

// ... other mutations
}

Step 7: Test Subscriptions

Restart your application. Here's how to test:

Using GraphiQL

  1. Open http://localhost:8080/graphiql
  2. In one tab, start a subscription:
subscription {
movieAdded {
id
title
genre
rating
}
}
  1. Click Play - the subscription will stay open waiting for events
  2. In another browser tab, run a mutation:
mutation {
createMovie(input: {
title: "Test Movie"
releaseYear: 2024
genre: ACTION
rating: 8.0
directorId: "1"
}) {
id
title
}
}
  1. Check the first tab - you should see the new movie appear!

Testing Filtered Subscriptions

Subscribe to only SCIFI movies:

subscription {
movieAddedByGenre(genre: SCIFI) {
title
genre
}
}

Then create movies with different genres - only SCIFI movies will appear.

Testing All Events

subscription {
movieEvents {
type
movieId
movie {
title
}
timestamp
}
}

Now create, update, and delete movies to see all events.

Understanding Reactive Streams

┌─────────────────────────────────────────────────────────────────────┐
│ REACTIVE FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Mutation Sink Subscriber │
│ ───────────────────────────────────────────────────────────────── │
│ │
│ createMovie() ──────▶ movieAddedSink.tryEmitNext(movie) │
│ │ │
│ ▼ │
│ movieAddedSink.asFlux() │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ Client 1 Client 2 Client 3 │
│ (WebSocket connections) │
│ │
│ Sinks.Many = "Hot" publisher │
│ - multicast() = all subscribers get all events │
│ - onBackpressureBuffer() = buffer if subscriber is slow │
│ │
└─────────────────────────────────────────────────────────────────────┘

Advanced: Subscription with Authentication

Add user context to subscriptions:

@SubscriptionMapping
public Flux<Movie> movieAdded(DataFetchingEnvironment env) {
// Get user from context (if authenticated)
String userId = env.getGraphQlContext().get("userId");
log.info("User {} subscribed to movieAdded", userId);

return eventPublisher.getMovieAddedFlux()
.doOnNext(movie -> log.info("Sending movie {} to user {}", movie.getTitle(), userId));
}

Advanced: Heartbeat for Connection Health

Keep connections alive with periodic heartbeats:

type Subscription {
# ... other subscriptions
heartbeat: Heartbeat!
}

type Heartbeat {
timestamp: String!
serverTime: String!
}
@SubscriptionMapping
public Flux<Heartbeat> heartbeat() {
return Flux.interval(Duration.ofSeconds(30))
.map(tick -> new Heartbeat(
Instant.now().toString(),
LocalDateTime.now().toString()
));
}

Handling Subscription Errors

@SubscriptionMapping
public Flux<Movie> movieAdded() {
return eventPublisher.getMovieAddedFlux()
.onErrorResume(error -> {
log.error("Subscription error", error);
return Flux.empty(); // End gracefully
})
.doOnError(error -> log.error("Error in subscription", error))
.doFinally(signalType -> log.info("Subscription ended: {}", signalType));
}

Exercises

Exercise 1: Actor Subscriptions

Add subscriptions for actors:

  • actorAdded: Actor!
  • actorAddedToMovie(movieId: ID!): Actor!

Exercise 2: Countdown Subscription

Create a countdown subscription:

subscription {
countdown(from: 10) # Returns 10, 9, 8, ... 1, 0
}
Solution
@SubscriptionMapping
public Flux<Integer> countdown(@Argument int from) {
return Flux.range(0, from + 1)
.map(i -> from - i)
.delayElements(Duration.ofSeconds(1));
}

Exercise 3: Rate Limiting

Limit events to 1 per second per client:

@SubscriptionMapping
public Flux<Movie> movieAddedThrottled() {
return eventPublisher.getMovieAddedFlux()
.sampleFirst(Duration.ofSeconds(1)); // Max 1 event per second
}

Common Issues

Issue: Subscription doesn't receive events

  • Check that you're publishing to the sink
  • Verify WebSocket is enabled in properties
  • Check browser console for connection errors

Issue: Events received multiple times

  • Make sure you don't have multiple sinks
  • Use Sinks.many().multicast(), not replay()

Issue: Subscription closes immediately

  • Return a Flux, not a Mono
  • Don't call .block() on the Flux

Summary

In this class, you learned:

✅ GraphQL subscriptions for real-time updates ✅ Configuring WebSocket in Spring GraphQL ✅ Using Reactor Sinks for event publishing ✅ @SubscriptionMapping annotation ✅ Filtering subscription events ✅ Testing subscriptions in GraphiQL

What's Next?

In Class 8: Security, we'll learn:

  • Authentication and authorization
  • Securing queries and mutations
  • Field-level permissions
  • Protecting sensitive data

Time to lock down your API!