DEV Community

Cover image for Spring Cloud Stream: write once, run anywhere
AISSAM ASSOUIK
AISSAM ASSOUIK

Posted on

Spring Cloud Stream: write once, run anywhere

In modern microservices architectures, asynchronous communication isn't just an option – it's the backbone of scalable, resilient systems. But let's face it: working directly with message brokers like RabbitMQ or Kafka often means drowning in boilerplate code, broker-specific configurations, and operational complexity.

Enter Spring Cloud Stream – the game-changing framework that transforms messaging from a technical hurdle into a strategic advantage. With its elegant abstraction layer, you can implement robust event-driven communication between services without tying your code to a specific broker.

In this article, I'll demonstrate how Spring Cloud Stream enabled me to:

  1. Build broker-agnostic producers/consumers with identical code for RabbitMQ and Kafka.
  2. Implement enterprise-grade messaging patterns (retries, DLQs) in 3 lines of configuration.
  3. Switch messaging technologies with a single environment variable change.
  4. Maintain focus on business logic while the framework handles messaging plumbing.

Whether you're handling payment processing events, real-time analytics, or IoT data streams, Spring Cloud Stream turns asynchronous communication from a challenge into your system's superpower. Let’s dive in!

Context

Spring Cloud Stream

To be able to demonstrate the mentioned above, we are going to use two Spring Boot applications that both able to communicate asynchronously and synchronously.

  • TTS, exposing one endpoint that serves the purpose of converting text entered by user to speech using FreeTTS Java Library.
  • TTS Analytics, a microservice that serves the role of doing analytics on user IP addresses and User Agent in order to provide device and country info (for this lab, we only mock this behavior). In addition, it receives Post TTS Analytics messages for some post processing scenarios that we can face in real world application through a Queue (if we use RabbitMQ) or a Topic (if we use Kafka).

Key Components

┌──────────────────────────────┐
│       Business Logic         │
└──────────────┬───────────────┘
               │
┌──────────────▼───────────────┐
│    Spring Cloud Stream       │
│  ┌────────────────────────┐  │
│  │      StreamBridge      │◄───SEND
│  └────────────────────────┘  │
│  ┌────────────────────────┐  │
│  │  @Bean Supplier/       │◄───POLL
│  │    Consumer            │  │
│  └────────────────────────┘  │
└──────────────┬───────────────┘
               │
┌──────────────▼───────────────┐
│         Binders              │
│  ┌───────┐  ┌───────┐        │
│  │Rabbit │  │ Kafka │ ...    │
│  │MQ     │  │       │        │
│  └───────┘  └───────┘        │
└──────────────┬───────────────┘
               │
┌──────────────▼──────────────┐
│   Message Broker            │
│  (RabbitMQ/Kafka/PubSub)    │
└─────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Key terminology in the world of Spring Cloud Stream includes but not limited to:

  • Binder: Plug-in connector to specific brokers (RabbitMQ/Kafka).
  • Binding: Configuration linking code to broker destinations.
  • Destination: Logical address (exchange/topic) for messages.
  • Message Channel: Virtual pipeline (input/output).
  • StreamBridge: Imperative message sending utility.
  • Supplier/Consumer: Functional interfaces for streams.
  • Consumer Group: Scaling mechanism for parallel processing.

Message Flow (Producer → Consumer)

                PRODUCER SIDE
┌───────────┐       ┌───────────┐       ┌───────────┐
│ Business  │       │ Stream    │       │ Message   │
│  Logic    ├──────►│ Bridge    ├──────►│ Broker    │
│           │       │           │       │           │
└───────────┘       └───────────┘       └─────┬─────┘
                                              │
                CONSUMER SIDE                 ▼
┌───────────┐       ┌───────────┐       ┌───────────┐
│ @Bean     │       │ Spring    │       │ Message   │
│ Consumer  │◄──────┤ Cloud     │◄──────┤ Broker    │
│           │       │ Stream    │       │           │
└───────────┘       └───────────┘       └───────────┘
Enter fullscreen mode Exit fullscreen mode

Business logic may include multiple methods or services that need to send messages to our exchange/topic, perfect use case of StreamBridge. The consumer application then receives messages from the channel for processing through the method that returns a Consumer that will process the message/event.

Consumer Group Scaling

           ┌───────────────────────┐
           │      Message Broker   │
           │  (post-tts-analytics) │
           └───────────┬───────────┘
                       │
           ┌───────────▼───────────┐
           │    Consumer Group     │
           │  "analytics-group"    │
           └───────────┬───────────┘
         ┌─────────────┼─────────────┐
┌────────▼───┐   ┌─────▼─────┐   ┌───▼───────┐
│ Consumer   │   │ Consumer  │   │ Consumer  │
│ Instance 1 │   │ Instance2 │   │ Instance3 │
└────────────┘   └───────────┘   └───────────┘
Enter fullscreen mode Exit fullscreen mode

In real world scenarios, we probably have multiple instances of consumer application. The different instances end up as consumer competing between each other, and we expect that each message is handled by only one instance. Here where Consumer Group concept is important. Each consumer binding can specify a group name, each group that subscribe to a given destination will receive a copy of message/event and only one instance/member from that group will receive the message/event.

Binder Abstraction Layer

┌───────────────────────────────────┐
│        Your Application Code      │
│  ┌──────────────────────────────┐ │
│  │  Spring Cloud Stream         │ │
│  │  ┌───────────────────────┐   │ │
│  │  │     Bindings          │   │ │
│  │  │ (Logical Destinations)│   │ │
│  │  └───────────┬───────────┘   │ │
│  └──────────────│───────────────┘ │
│                 ▼                 │
│  ┌──────────────────────────────┐ │
│  │        Binder Bridge         │ │
│  └──────────────┬───────────────┘ │
└─────────────────│─────────────────┘
                  │
       ┌──────────┴─────────────┐
┌──────▼─────────┐    ┌─────────▼───────┐
│ RabbitMQ       │    │ Kafka           │
│ Implementation │    │ Implementation  │
└────────────────┘    └─────────────────┘
Enter fullscreen mode Exit fullscreen mode

The Binder Service Provider Interface (SPI) is the foundation of Spring Cloud Stream's broker independence. It defines a contract for connecting application logic to messaging systems while abstracting broker-specific details.

public interface Binder<T> {
    Binding<T> bindProducer(String name, T outboundTarget, 
                           ProducerProperties producerProperties);

    Binding<T> bindConsumer(String name, String group, 
                           T inboundTarget, 
                           ConsumerProperties consumerProperties);
}
Enter fullscreen mode Exit fullscreen mode
  • bindProducer(): Connects output channels to broker destinations.
  • bindConsumer(): Links input channels to broker queues/topics.

Kafka binder implementation maps the destination and consumer group to a Kafka topic. And for RabbitMQ binder implementation, it maps the destination to a TopicExchange and for each consumer group a queue will be bound to that TopicExchange.

Producer Application

@Service
public class PostTtsAnalyticsPublisherService {
    private final StreamBridge streamBridge;

    public void sendAnalytics(PostTtsAnalyticsRequest request) {
        streamBridge.send("postTtsAnalytics-out-0", request);
    }
}
Enter fullscreen mode Exit fullscreen mode
  • Imperative Sending: StreamBridge allows ad-hoc message sending from any business logic.
  • Binding Abstraction: postTtsAnalytics-out-0 maps to broker destinations via configuration.
  • Zero Broker-Specific Code: No RabbitMQ/Kafka API dependencies.
spring:
  cloud:
    stream:
      bindings:
        postTtsAnalytics-out-0:
          destination: post-tts-analytics # Exchange/topic name
          content-type: application/json
      default-binder: ${BINDER_TYPE:rabbit} # Magic switch
Enter fullscreen mode Exit fullscreen mode
  • Binding Name: postTtsAnalytics-out-0, defines an output channel for sending messages and follow convention typically as -out-.
  • destination: post-tts-analytics, specifies the target destination in the messaging system. For RabbitMQ, this would be an exchange named post-tts-analytics; for Kafka, a topic with the same name.
  • content-type: application/json, indicates that messages sent through this binding will be serialized as JSON. Spring Cloud Stream uses this to handle message conversion appropriately.
  • default-binder: ${BINDER_TYPE:rabbit}, sets the default binder to use for message communication. The BINDER_TYPE environment variable is set, its value will be used; otherwise, it defaults to rabbit. This flexibility enables easy switching between different messaging systems (e.g., RabbitMQ or Kafka) without changing the codebase.
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Enter fullscreen mode Exit fullscreen mode

By providing binders dependencies for both Kafka and RabbitMQ, we can easily switch between Kafka and RabbitMQ without any codebase changes.

Consumer Application

@Service
public class PostTtsAnalyticsConsumerService {

    @Bean
    public Consumer<PostTtsAnalyticsRequest> postTtsAnalytics() {
        return this::processAnalytics;
    }
}
Enter fullscreen mode Exit fullscreen mode
  • Declarative Consumption: Functional Consumer bean handles incoming messages.
  • Automatic Binding: Method name postTtsAnalytics matches binding config.
  • Retry/DLQ Support: Configured via simple YAML.
spring:
  cloud:
    stream:
      bindings:
        postTtsAnalytics-in-0:
          destination: post-tts-analytics
          content-type: application/json
          group: analytics-group
          consumer:
            max-attempts: 3                 # Retry attempts
            back-off-initial-interval: 1000 # Retry delay
      default-binder: ${BINDER_TYPE:rabbit} # Magic switch
Enter fullscreen mode Exit fullscreen mode
  • Binding Name: postTtsAnalytics-in-0, defines an input binding, indicating that this application will consume messages. The naming convention typically follows -in-.

  • destination: post-tts-analytics, specifies the target destination in the messaging system. For RabbitMQ, this would correspond to an exchange named post-tts-analytics; for Kafka, a topic with the same name.

  • group: analytics-group, defines a consumer group. In Kafka, this ensures that messages are distributed among consumers in the same group. In RabbitMQ, it helps in creating a shared queue for the group.

  • max-attempts: 3, specifies that the application will attempt to process a message up to three times before considering it failed (this includes the initial attempt and two retries).

  • back-off-initial-interval: 1000, sets the initial delay (in milliseconds) between retry attempts. In this case, there's a 1-second delay before the first retry.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Testing Strategy

@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class PostTtsAnalyticsPublisherServiceTest {

    @Autowired
    private OutputDestination outputDestination;

    @Autowired
    private CompositeMessageConverter converter;

    @Autowired
    private PostTtsAnalyticsPublisherService analyticsService;

    @Test
    void testSendMessage() {
        PostTtsAnalyticsRequest request = new PostTtsAnalyticsRequest(
                "123", LocalDateTime.now(), 1500
        );

        analyticsService.sendAnalytics(request);

        var message = outputDestination.receive(1000, "post-tts-analytics");
        assert message != null;

        PostTtsAnalyticsRequest received = (PostTtsAnalyticsRequest) converter.fromMessage(message, PostTtsAnalyticsRequest.class);

        assert Objects.requireNonNull(received).id().equals(request.id());
    }

}
Enter fullscreen mode Exit fullscreen mode

This test effectively ensures that the PostTtsAnalyticsPublisherService publishes messages as expected, validating both the sending mechanism and the message content.

  • @import(TestChannelBinderConfiguration.class): Imports the TestChannelBinderConfiguration, which sets up the in-memory test binder provided by Spring Cloud Stream. This binder simulates message broker interactions within the JVM, eliminating the need for an actual message broker during testing.
  • OutputDestination: An abstraction provided by the test binder to capture messages sent by the application. It allows the test to retrieve messages that would have been sent to an external message broker.
  • CompositeMessageConverter: A composite of multiple MessageConverter instances. It facilitates the conversion of message payloads to and from different formats, such as JSON **to **POJO, based on the content type.
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class PostTtsAnalyticsConsumerServiceTest {

    @Autowired
    private InputDestination inputDestination;

    @MockitoSpyBean
    private PostTtsAnalyticsConsumerService consumerService;

    @Test
    void testReceiveMessage() {
        PostTtsAnalyticsRequest request = new PostTtsAnalyticsRequest(
                "456", LocalDateTime.now(), 2000
        );

        inputDestination.send(new GenericMessage<>(request), "post-tts-analytics");

        // Verify the handler was called
        verify(consumerService, timeout(1000)).processAnalytics(request);
    }

    @Test
    void testRetryMechanism() throws Exception {
        PostTtsAnalyticsRequest request = new PostTtsAnalyticsRequest(
                "456", LocalDateTime.now(), 2000
        );

        // Mock failure scenario
        doThrow(new RuntimeException("Simulated processing failure"))
                .when(consumerService).processAnalytics(request);

        // Send test message
        inputDestination.send(new GenericMessage<>(request), "post-tts-analytics");

        // Verify retry attempts
        verify(consumerService, timeout(4000).times(3)).processAnalytics(request);
    }

}

Enter fullscreen mode Exit fullscreen mode

testReceiveMessage(), validates that the consumer correctly processes an incoming message. testRetryMechanism(), Tests the consumer's retry mechanism when message processing fails.

  • InputDestination: An abstraction provided by the test binder to send messages to the application, simulating incoming messages from a message broker.

  • @MockitoSpyBean: Creates a spy of the PostTtsAnalyticsConsumerService bean, allowing the test to verify interactions with its methods while preserving the original behavior.

RabbitMQ Demo

We run both applications without setting BINDER_TYPE environment variable.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.5)

2025-05-25T21:20:41.345+01:00  INFO 15156 --- [tts] [           main] com.example.tts.TtsApplication           : No active profile set, falling back to 1 default profile: "default"
2025-05-25T21:20:42.657+01:00  INFO 15156 --- [tts] [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2025-05-25T21:20:42.666+01:00  INFO 15156 --- [tts] [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2025-05-25T21:20:43.297+01:00  INFO 15156 --- [tts] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8080 (http)
2025-05-25T21:20:43.312+01:00  INFO 15156 --- [tts] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-05-25T21:20:43.313+01:00  INFO 15156 --- [tts] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-25T21:20:43.387+01:00  INFO 15156 --- [tts] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-05-25T21:20:43.387+01:00  INFO 15156 --- [tts] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1977 ms
2025-05-25T21:20:45.676+01:00  INFO 15156 --- [tts] [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2025-05-25T21:20:45.676+01:00  INFO 15156 --- [tts] [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'tts.errorChannel' has 1 subscriber(s).
2025-05-25T21:20:45.676+01:00  INFO 15156 --- [tts] [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2025-05-25T21:20:45.767+01:00  INFO 15156 --- [tts] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8080 (http) with context path '/'
2025-05-25T21:20:45.794+01:00  INFO 15156 --- [tts] [           main] com.example.tts.TtsApplication           : Started TtsApplication in 5.064 seconds (process running for 5.525)
Enter fullscreen mode Exit fullscreen mode

Above are logs from Producer application startup with default rabbit binder. We can see the following:

  • Auto-configures Spring Integration channels (errorChannel, integrationHeaderChannelRegistry)
  • No explicit RabbitMQ connection logs (binder initialized lazily)
  • Implicit errorChannel subscriber for logging
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.5)

2025-05-25T21:20:46.613+01:00  INFO 23932 --- [tts-analytics] [           main] c.e.t.TtsAnalyticsApplication            : No active profile set, falling back to 1 default profile: "default"
2025-05-25T21:20:47.894+01:00  INFO 23932 --- [tts-analytics] [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2025-05-25T21:20:47.903+01:00  INFO 23932 --- [tts-analytics] [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2025-05-25T21:20:48.520+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8090 (http)
2025-05-25T21:20:48.535+01:00  INFO 23932 --- [tts-analytics] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-05-25T21:20:48.535+01:00  INFO 23932 --- [tts-analytics] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-25T21:20:48.599+01:00  INFO 23932 --- [tts-analytics] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-05-25T21:20:48.599+01:00  INFO 23932 --- [tts-analytics] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1913 ms
2025-05-25T21:20:50.354+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'tts-analytics.postTtsAnalytics-in-0' has 1 subscriber(s).
2025-05-25T21:20:50.479+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2025-05-25T21:20:50.479+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'tts-analytics.errorChannel' has 1 subscriber(s).
2025-05-25T21:20:50.483+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2025-05-25T21:20:50.558+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: rabbit
2025-05-25T21:20:50.558+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.c.s.binder.DefaultBinderFactory      : Constructing binder child context for rabbit
2025-05-25T21:20:50.760+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: rabbit
2025-05-25T21:20:50.790+01:00  INFO 23932 --- [tts-analytics] [           main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: post-tts-analytics.analytics-group, bound to: post-tts-analytics
2025-05-25T21:20:50.804+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2025-05-25T21:20:50.876+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2055833f:0/SimpleConnection@1ff463bb [delegate=amqp://guest@127.0.0.1:5672/, localPort=54933]
2025-05-25T21:20:50.964+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'rabbit-230456842.postTtsAnalytics-in-0.errors' has 1 subscriber(s).
2025-05-25T21:20:50.966+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'rabbit-230456842.postTtsAnalytics-in-0.errors' has 2 subscriber(s).
2025-05-25T21:20:50.997+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started bean 'inbound.post-tts-analytics.analytics-group'
2025-05-25T21:20:51.020+01:00  INFO 23932 --- [tts-analytics] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8090 (http) with context path '/'
2025-05-25T21:20:51.047+01:00  INFO 23932 --- [tts-analytics] [           main] c.e.t.TtsAnalyticsApplication            : Started TtsAnalyticsApplication in 5.096 seconds (process running for 5.797)
Enter fullscreen mode Exit fullscreen mode

And for Consumer application, from startup logs we see:

  • Binds to RabbitMQ queue: post-tts-analytics.analytics-group

  • Creates connection to localhost:5672

  • AmqpInboundChannelAdapter : Started message listener (bean 'inbound.post-tts-analytics.analytics-group')

  • BinderErrorChannel with 2 subscribers (retry + DLQ handling)

/ # rabbitmqctl list_connections name user state protocol
Listing connections ...
name    user    state   protocol
172.17.0.1:43720 -> 172.17.0.2:5672     guest   running {0,9,1}
/ # rabbitmqctl list_channels name connection user confirm consumer_count
Listing channels ...
name    connection      user    confirm consumer_count
172.17.0.1:43720 -> 172.17.0.2:5672 (1) <rabbit@6332dec50309.1748204432.670.0>  guest   false   1
/ # rabbitmqctl list_exchanges name type durable auto_delete
Listing exchanges for vhost / ...
name    type    durable auto_delete
post-tts-analytics      topic   true    false
amq.match       headers true    false
amq.fanout      fanout  true    false
amq.rabbitmq.trace      topic   true    false
amq.headers     headers true    false
        direct  true    false
amq.topic       topic   true    false
amq.direct      direct  true    false
/ # rabbitmqctl list_queues name durable auto_delete consumers state
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    durable auto_delete     consumers       state
post-tts-analytics.analytics-group      true    false   1       running
/ # rabbitmqctl list_bindings source_name routing_key destination_name
Listing bindings for vhost /...
source_name     routing_key     destination_name
        post-tts-analytics.analytics-group      post-tts-analytics.analytics-group
post-tts-analytics      #       post-tts-analytics.analytics-group
Enter fullscreen mode Exit fullscreen mode

We can see that all RabbitMQ resources are created correctly by Spring Cloud Stream.

2025-05-25T21:38:52.121+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] c.e.t.controller.TextToSpeechController  : textToSpeech request: TtsRequest[text=Hi this is a complete test. From RabbitMQ]
2025-05-25T21:38:52.809+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] c.e.tts.service.TextToSpeechService      : do Something with analytics response: TtsAnalyticsResponse[device=Bot, countryIso=ZA]
Wrote synthesized speech to \tts\output\14449f58-7140-45a5-b024-26021229dfb3.wav
2025-05-25T21:38:53.063+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] c.e.t.m.PostTtsAnalyticsPublisherService : sendAnalytics: PostTtsAnalyticsRequest[id=14449f58-7140-45a5-b024-26021229dfb3, creationDate=2025-05-25T21:38:53.063039900, processTimeMs=937]
2025-05-25T21:38:53.068+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: rabbit
2025-05-25T21:38:53.068+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.c.s.binder.DefaultBinderFactory      : Constructing binder child context for rabbit
2025-05-25T21:38:53.188+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: rabbit
2025-05-25T21:38:53.207+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2025-05-25T21:38:53.255+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#54463380:0/SimpleConnection@18b2124b [delegate=amqp://guest@127.0.0.1:5672/, localPort=55076]
2025-05-25T21:38:53.293+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'tts.postTtsAnalytics-out-0' has 1 subscriber(s).
2025-05-25T21:38:53.311+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2025-05-25T21:38:53.319+01:00  INFO 15156 --- [tts] [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#105dbd15:0/SimpleConnection@7243df50 [delegate=amqp://guest@127.0.0.1:5672/, localPort=55077]
2025-05-25T21:38:55.309+01:00  INFO 15156 --- [tts] [nio-8080-exec-3] c.e.t.controller.TextToSpeechController  : textToSpeech request: TtsRequest[text=Hi this is a complete test. From RabbitMQ]
2025-05-25T21:38:55.315+01:00  INFO 15156 --- [tts] [nio-8080-exec-3] c.e.tts.service.TextToSpeechService      : do Something with analytics response: TtsAnalyticsResponse[device=Tablet, countryIso=AU]
Wrote synthesized speech to tts\output\a4021436-fd12-486b-9d65-cff25c5611c5.wav
2025-05-25T21:38:55.400+01:00  INFO 15156 --- [tts] [nio-8080-exec-3] c.e.t.m.PostTtsAnalyticsPublisherService : sendAnalytics: PostTtsAnalyticsRequest[id=a4021436-fd12-486b-9d65-cff25c5611c5, creationDate=2025-05-25T21:38:55.400422300, processTimeMs=91]

Enter fullscreen mode Exit fullscreen mode

By calling the "/tts" endpoint in TTS applicatoin, we can see the following:

  • The application processes text-to-speech requests, generates audio files, and publishes analytics data to a RabbitMQ exchange using Spring Cloud Stream.
  • Upon the first request, the application initializes the RabbitMQ binder, constructs the necessary context, and caches it for future use.
  • The application establishes connections to RabbitMQ at localhost:5672, confirming successful communication with the message broker.
  • The application confirms that the output channel has an active subscriber, ensuring that messages are being routed correctly.
  • Further requests are processed efficiently, with the application reusing the established binder and connections, demonstrating effective resource management. processTimeMs=91 for second request in comparison to processTimeMs=937 for first one.
2025-05-25T21:38:52.567+01:00  INFO 23932 --- [tts-analytics] [nio-8090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-05-25T21:38:52.567+01:00  INFO 23932 --- [tts-analytics] [nio-8090-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-05-25T21:38:52.567+01:00  INFO 23932 --- [tts-analytics] [nio-8090-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
2025-05-25T21:38:52.664+01:00  INFO 23932 --- [tts-analytics] [nio-8090-exec-1] c.e.t.controller.AnalyticsController     : doAnalytics request: TtsAnalyticsRequest[clientIp=0:0:0:0:0:0:0:1, userAgent=PostmanRuntime/7.44.0]
2025-05-25T21:38:53.381+01:00  INFO 23932 --- [tts-analytics] [alytics-group-1] c.e.t.m.PostTtsAnalyticsConsumerService  : processAnalytics: PostTtsAnalyticsRequest[id=14449f58-7140-45a5-b024-26021229dfb3, creationDate=2025-05-25T21:38:53.063039900, processTimeMs=937]
2025-05-25T21:38:55.313+01:00  INFO 23932 --- [tts-analytics] [nio-8090-exec-2] c.e.t.controller.AnalyticsController     : doAnalytics request: TtsAnalyticsRequest[clientIp=0:0:0:0:0:0:0:1, userAgent=PostmanRuntime/7.44.0]
2025-05-25T21:38:55.408+01:00  INFO 23932 --- [tts-analytics] [alytics-group-1] c.e.t.m.PostTtsAnalyticsConsumerService  : processAnalytics: PostTtsAnalyticsRequest[id=a4021436-fd12-486b-9d65-cff25c5611c5, creationDate=2025-05-25T21:38:55.400422300, processTimeMs=91]
Enter fullscreen mode Exit fullscreen mode

From correspond logs above of TTS Analytics following the messages sent from TTS application we see:

  • The application acts as a consumer in a Spring Cloud Stream setup, receiving messages from a RabbitMQ exchange named post-tts-analytics.
  • Upon receiving a PostTtsAnalyticsRequest, the PostTtsAnalyticsConsumerService processes the message, which likely involves business logic such as storing analytics data or triggering further actions.
  • The presence of the AnalyticsController handling TtsAnalyticsRequest indicates that the application also exposes REST endpoints, possibly for manual testing or additional data ingestion.

Kafka Demo

We run both applications now with setting the environment variable BINDER_TYPE=kafka. Producer application TTS run as previous example without any difference.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.5)

2025-05-26T23:31:21.842+01:00  INFO 17204 --- [tts-analytics] [           main] c.e.t.TtsAnalyticsApplication            : No active profile set, falling back to 1 default profile: "default"
2025-05-26T23:31:24.238+01:00  INFO 17204 --- [tts-analytics] [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2025-05-26T23:31:24.260+01:00  INFO 17204 --- [tts-analytics] [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2025-05-26T23:31:25.529+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8090 (http)
2025-05-26T23:31:25.556+01:00  INFO 17204 --- [tts-analytics] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-05-26T23:31:25.556+01:00  INFO 17204 --- [tts-analytics] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-26T23:31:25.673+01:00  INFO 17204 --- [tts-analytics] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-05-26T23:31:25.676+01:00  INFO 17204 --- [tts-analytics] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3734 ms
2025-05-26T23:31:29.799+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'tts-analytics.postTtsAnalytics-in-0' has 1 subscriber(s).
2025-05-26T23:31:30.167+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2025-05-26T23:31:30.167+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'tts-analytics.errorChannel' has 1 subscriber(s).
2025-05-26T23:31:30.167+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2025-05-26T23:31:30.373+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2025-05-26T23:31:30.376+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.c.s.binder.DefaultBinderFactory      : Constructing binder child context for kafka
2025-05-26T23:31:30.980+01:00  INFO 17204 --- [tts-analytics] [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2025-05-26T23:31:31.055+01:00  INFO 17204 --- [tts-analytics] [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
............
2025-05-26T23:31:37.664+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-analytics-group-2-2e0c644b-239a-444d-bedb-24be9d40df31', protocol='range'}
2025-05-26T23:31:37.674+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Finished assignment for group at generation 1: {consumer-analytics-group-2-2e0c644b-239a-444d-bedb-24be9d40df31=Assignment(partitions=[post-tts-analytics-0])}
2025-05-26T23:31:37.691+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-analytics-group-2-2e0c644b-239a-444d-bedb-24be9d40df31', protocol='range'}
2025-05-26T23:31:37.692+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Notifying assignor about the new Assignment(partitions=[post-tts-analytics-0])
2025-05-26T23:31:37.696+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Adding newly assigned partitions: post-tts-analytics-0
2025-05-26T23:31:37.713+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Found no committed offset for partition post-tts-analytics-0
2025-05-26T23:31:37.736+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-analytics-group-2, groupId=analytics-group] Resetting offset for partition post-tts-analytics-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-05-26T23:31:37.738+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : analytics-group: partitions assigned: [post-tts-analytics-0]
...........
Enter fullscreen mode Exit fullscreen mode

We can see the following from logs above:

  • Spring Cloud Stream wires up the input binding channel named tts-analytics.postTtsAnalytics-in-0 and confirms it has one subscriber (consumer application).
  • Immediately after, an error‐logging endpoint is attached to the errorChannel—so any exceptions in message handling will be logged rather than silently dropped.
  • The Kafka binder is created, its child context constructed, and cached.
  • The Kafka consumer (clientId=consumer-analytics-group-2) successfully joins the analytics-group, receives its partition assignment (post-tts-analytics-0) and syncs the group and notifies the internal assignor.

Next Kafka commands are verifying that all Kafka resources are created properly:

/ $ /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
post-tts-analytics
/ $ /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic post-tts-analytics
Topic: post-tts-analytics       TopicId: bq8s8k8mTJCexcXcG_s-3A PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: post-tts-analytics       Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr: 
/ $ /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
analytics-group
/ $ /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --describe --group analytics-group

GROUP           TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
analytics-group post-tts-analytics 0          2               2               0               consumer-analytics-group-2-2e0c644b-239a-444d-bedb-24be9d40df31 /172.17.0.1     consumer-analytics-group-2
Enter fullscreen mode Exit fullscreen mode

As we did earlier, we are going to call same endpoint and test the whole flow.

2025-05-26T23:59:25.601+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-05-26T23:59:25.602+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-05-26T23:59:25.604+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2025-05-26T23:59:25.785+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] c.e.t.controller.TextToSpeechController  : textToSpeech request: TtsRequest[text=Hi this is a complete test. From Kafka]
2025-05-26T23:59:26.992+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] c.e.tts.service.TextToSpeechService      : do Something with analytics response: TtsAnalyticsResponse[device=Mobile, countryIso=ZA]
Wrote synthesized speech to tts\output\7e82f0d0-050f-4e05-8f50-f39fc8549ae2.wav
2025-05-26T23:59:27.492+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] c.e.t.m.PostTtsAnalyticsPublisherService : sendAnalytics: PostTtsAnalyticsRequest[id=7e82f0d0-050f-4e05-8f50-f39fc8549ae2, creationDate=2025-05-26T23:59:27.491530100, processTimeMs=1701]
2025-05-26T23:59:27.504+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2025-05-26T23:59:27.504+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.c.s.binder.DefaultBinderFactory      : Constructing binder child context for kafka
2025-05-26T23:59:27.793+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2025-05-26T23:59:27.815+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: post-tts-analytics
2025-05-26T23:59:27.826+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
..........
2025-05-26T23:59:28.970+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-05-26T23:59:29.023+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-05-26T23:59:29.023+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-05-26T23:59:29.023+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1748300369023
2025-05-26T23:59:29.044+01:00  INFO 19336 --- [tts] [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 5L6g3nShT-eMCtK--X86sw
2025-05-26T23:59:29.066+01:00  INFO 19336 --- [tts] [nio-8080-exec-2] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'tts.postTtsAnalytics-out-0' has 1 subscriber(s).
2025-05-26T23:59:36.998+01:00  INFO 19336 --- [tts] [nio-8080-exec-3] c.e.t.controller.TextToSpeechController  : textToSpeech request: TtsRequest[text=Hi this is a complete test. From Kafka]
2025-05-26T23:59:37.007+01:00  INFO 19336 --- [tts] [nio-8080-exec-3] c.e.tts.service.TextToSpeechService      : do Something with analytics response: TtsAnalyticsResponse[device=SmartTV, countryIso=US]
Wrote synthesized speech to tts\output\fe504b51-5149-4f6b-a65f-86eecf6daa66.wav
2025-05-26T23:59:37.158+01:00  INFO 19336 --- [tts] [nio-8080-exec-3] c.e.t.m.PostTtsAnalyticsPublisherService : sendAnalytics: PostTtsAnalyticsRequest[id=fe504b51-5149-4f6b-a65f-86eecf6daa66, creationDate=2025-05-26T23:59:37.158350700, processTimeMs=159]

Enter fullscreen mode Exit fullscreen mode

We can see from logs above:

  • DefaultBinderFactory creates, constructs the child context for kafka then caches that binder instance—this is our "magic switch" picking Kafka.
  • KafkaTopicProvisioner confirms it will use the topic post-tts-analytics, ensuring the topic exists.
  • Create the messages and send them to the topic to be processed by the consumer as shown below.
2025-05-26T23:59:26.739+01:00  INFO 17204 --- [tts-analytics] [nio-8090-exec-1] c.e.t.controller.AnalyticsController     : doAnalytics request: TtsAnalyticsRequest[clientIp=0:0:0:0:0:0:0:1, userAgent=PostmanRuntime/7.44.0]
2025-05-26T23:59:29.272+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] c.e.t.m.PostTtsAnalyticsConsumerService  : processAnalytics: PostTtsAnalyticsRequest[id=7e82f0d0-050f-4e05-8f50-f39fc8549ae2, creationDate=2025-05-26T23:59:27.491530100, processTimeMs=1701]
2025-05-26T23:59:37.004+01:00  INFO 17204 --- [tts-analytics] [nio-8090-exec-2] c.e.t.controller.AnalyticsController     : doAnalytics request: TtsAnalyticsRequest[clientIp=0:0:0:0:0:0:0:1, userAgent=PostmanRuntime/7.44.0]
2025-05-26T23:59:37.168+01:00  INFO 17204 --- [tts-analytics] [container-0-C-1] c.e.t.m.PostTtsAnalyticsConsumerService  : processAnalytics: PostTtsAnalyticsRequest[id=fe504b51-5149-4f6b-a65f-86eecf6daa66, creationDate=2025-05-26T23:59:37.158350700, processTimeMs=159]

Enter fullscreen mode Exit fullscreen mode

For those tests, RabbitMQ and Kafka instances are running locally as containers with Docker using:

docker run -d --name=kafka -p 9092:9092 apache/kafka
docker run -d --name=rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management
Enter fullscreen mode Exit fullscreen mode

Summary

  • Broker-Agnostic Messaging: Spring Cloud Stream provides a thin abstraction over messaging systems (RabbitMQ, Kafka, etc.), so you can write your producer and consumer logic once and switch brokers simply by changing an environment variable.
  • Key Components: It introduces Binders (plug‐ins for specific brokers), Bindings (logical links to destinations), and the StreamBridge API for imperative sends—all wired via Spring Boot auto-configuration.
  • Lightweight Configuration: Complex patterns like retries and dead-letter queues require only a few YAML lines (max-attempts, back-off-initial-interval), enabling enterprise-grade messaging with minimal boilerplate.
  • Testing Strategy: Demonstrates in-JVM tests using TestChannelBinderConfiguration plus InputDestination/OutputDestination and CompositeMessageConverter to verify end-to-end message publishing and consumption without a real broker.

Project GitHub Repo

Top comments (0)