JetStream Consumer
JetStream Consumer in Shizuku provides a high-level abstraction for consuming messages from NATS JetStream. It includes a powerful routing system and automatic acknowledgment handling.
Core Components
FinalJetStreamProcessor
The FinalJetStreamProcessor
trait is the foundation of JetStream Consumers. It processes NATS messages and returns a Result indicating success or failure:
use shizuku::jetstream::FinalJetStreamProcessor;use async_nats::Message;
struct MyConsumer { // consumer state}
impl Processor<Message, Result<(), Error>> for MyConsumer { async fn process(&self, message: Message) -> Result<(), Error> { // Process the message Ok(()) }}
impl FinalJetStreamProcessor for MyConsumer {} // Implement the marker trait
JetStreamConsumer
The JetStreamConsumer
struct handles message consumption and acknowledgment:
use shizuku::jetstream::JetStreamConsumer;
let consumer = JetStreamConsumer::new( my_processor, nats_client, error_tracer);
// Run the consumerconsumer.run(message_stream).await;
You still need to get the message stream and combine it manually before running the consumer.
Message Routing
Shizuku provides a powerful jet_route!
macro for routing JetStream messages:
use shizuku::jet_route;
impl Processor<Message, Result<(), Error>> for OrderService { async fn process(&self, input: Message) -> Result<(), Error> { jet_route![ input, ["invoice"] => (&self.invoice_processor), ["order", "paid", "*"] => (&self.order_paid_processor), ["order", "cancelled"] => (&self.order_cancelled_processor) ] }}
Route Patterns
The routing system supports several pattern types:
- Static segments:
["order", "paid"]
- Single-level wildcard:
["order", "*", "paid"]
- Multi-level wildcard:
["order", ">"]
(must be last segment) - Nested routes: See example below
Best Practices
-
Message Organization
- Use hierarchical subjects (e.g.,
order.paid
,order.cancelled
) - Keep subject patterns consistent across related consumers
- Use hierarchical subjects (e.g.,
-
Error Handling
- Implement proper error tracing using
ErrorTracer
- Handle errors appropriately to ensure message acknowledgment
- Implement proper error tracing using
-
State Management
- Use
&'static
references for long-lived connections - Consider using
Arc
for shared state between processors
- Use
-
Processing Flow
- Keep processors focused on single responsibilities
- Use the routing system to direct messages to appropriate handlers
Complete Example
Here’s a full example of a JetStream consumer with routing:
use shizuku::jetstream::{FinalJetStreamProcessor, JetStreamConsumer};use shizuku::{jet_route, Error, Processor};use async_nats::Message;
struct OrderService { order_paid_processor: OrderPaidProcessor, order_cancelled_processor: OrderCancelledProcessor, invoice_processor: InvoiceProcessor,}
impl Processor<Message, Result<(), Error>> for OrderService { async fn process(&self, input: Message) -> Result<(), Error> { jet_route![ input, // Basic routes ["invoice"] => (&self.invoice_processor), ["order", "paid", "*"] => (&self.order_paid_processor), ["order", "cancelled"] => (&self.order_cancelled_processor),
// Nested routes example ["order"] => [ ["paid"] => (&self.order_paid_processor), ["cancelled"] => (&self.order_cancelled_processor) ] ] }}
impl FinalJetStreamProcessor for OrderService {}
// Start the consumerlet consumer = JetStreamConsumer::new( OrderService::new(), nats_client, error_tracer);
consumer.run(message_stream).await;
Publishing Events
When you need to publish events from your processors, use the JetStreamMessageSendTrait
:
use shizuku::core::message::JetStreamMessageSendTrait;
#[derive(ByteSerialize)]struct OrderPaidEvent { order_id: String, amount: f64,}
impl DynamicSubjectMessage for OrderPaidEvent { fn subject(&self) -> NatsSubjectPath { vec!["order", "paid", &self.order_id].into() }}
impl JetStreamMessageSendTrait for OrderPaidEvent {}
// Publishing the eventlet event = OrderPaidEvent { order_id: "123".to_string(), amount: 99.99,};event.publish(&js_context).await?;
Or, if the subject is static, after StaticSubjectMessage
is implemented, the JetStreamMessageSendTrait
can be implemented automatically.
Comparison with Traditional Message Queues
JetStream Consumers in Shizuku offer several advantages:
- Built-in subject-based routing with pattern matching
- Automatic acknowledgment handling
- Integrated error tracing
- High performance and reliability through NATS JetStream