One of the challenges of adding messages to an existing system is evolving from a request/response system to a system that is more event driven. To support moving request/response (more akin to RPC, such as a web service or WCF call) to messaging, MassTransit provide a convenient syntax for performing these types of operations. For example, an external web service may want to communicate with internal services via messaging. In this case, this syntax would enable that more easily.
To initiate a request from a client to a service, the following syntax is a good example:
IEndpoint serviceEndpoint = _endpointFactory.GetEndpoint(serviceUri);
_bus.MakeRequest(bus => serviceEndpoint.Send(ping, headers => headers.SetResponseAddress(bus.Endpoint.Uri)))
.When<PongMessage>().RelatedTo(ping.CorrelationId).IsReceived(pong =>
{
// do something with the pong
}
.TimeoutAfter(TimeSpan.FromSeconds(4))
.OnTimeout(() =>
{
// handle the timeout appropriately
}
.Send();
If there is more than one response to the command, additional '.When()' handlers can be specified. The messages handlers are dynamically created and subscribed to the bus to handle the response. With this type of structure, using correlated messages is pretty much required to be able to tie the response to a specific request.
When '.Send()' is called, the handlers for the '.When()' expressions are built and subscribed to the bus after which the expression inside the MakeRequest argument is called. The thread then blocks until either a response is received or the timeout expires (you did set a timeout, didn't you?).
If you need to perform the request asynchronously, such as within a Begin/End web method or a PageAsyncTask, you can use the .BeginSend() call to return an IAsyncResult for asynchronous processing.
For a service that is meant to handle these types of commands, there are some helper methods to make responding to the request originator easier. For example:
public void Consume(PingMessage message)
{
CurrentMessage.Respond(new PongMessage(message.CorrelationId));
}
This would look at the message being handled and determine if a response address was specified. If it was, it will send the response message to that endpoint directly. If no response address is specified, the message will be published instead. If you had a response that needed to be published in addition to being sent to the originator, you could use the Publish intercepter to verify the endpoint was sent the message and send it directly if it wasn't.
Comments (0)
You don't have permission to comment on this page.