public final class ERequestFeed extends EFeed implements IERequestFeed
ERequestFeed
is the application entry point for
posting request messages
to repliers.
Follow these steps to use this feed:
Step 1: Implement the ERequestor
interface.
Step 2:
Use ERequestFeed.Builder
to create a request feed
instance for a given ERequestor
instance and
type+topic message key
.
Use ERequestFeed.Builder.statusCallback(FeedStatusCallback)
,
ERequestFeed.Builder.replyCallback(ReplyCallback)
and/or
ERequestFeed.Builder.replyCallback(java.lang.Class, ReplyCallback)
to set Java lambda expression used in place of
ERequestor
interface methods.
Step 3:
Subscribe
to reply message key.
Step 4:
Send
a request to one or
more advertised repliers. If there are no advertised repliers,
then request(ERequestMessage)
returns
ERequestFeed.RequestState.DONE
, signaling that the requestor will
not receive any replies.
Step 5: Wait for
replies
to arrive. When the remaining
parameter is zero, that
means this is the final reply from all repliers. When
EReplyMessage.replyStatus
is
EReplyMessage.ReplyStatus.ERROR
or EReplyMessage.ReplyStatus.OK_FINAL
,
then this is the final reply from that replier.
Step 6: A requestor may terminate a request
prior to completion by calling
ERequestFeed.ERequest.close()
. No more replies should
be received once this is done. (Note: there is a possibility
that an in-flight reply will still be posted to the
requestor.)
Step 7: When
requestor is shutting down, retract
the
subscription and close
the feed.
ERequestFeed
import java.util.ArrayList;
import java.util.List;
import net.sf.eBus.client.EFeed.FeedScope;
import net.sf.eBus.client.ERequestFeed;
import net.sf.eBus.client.ERequestor;
import net.sf.eBus.messages.EMessageKey;
import net.sf.eBus.messages.EReplyMessage;
Step 1: Implement the ERequestor interface.
public class CatalogRequestor implements ERequestor {
private final EMessageKey mKey;
private final FeedScope mScope;
private final List<ERequestFeed.ERequest> mRequests;
private ERequestFeed mFeed;
public CatalogRequestor(final String subject, final FeedScope scope) {
mKey = new EMessageKey(com.acme.CatalogOrder.class, subject);
mScope = scope;
mRequests = new ArrayList<>();
}
@Override
public void startup() {
try {
Step 2: Open request feed. May override ERequestor interface methods.
mFeed = (ERequestFeed.builder()).target(this)
.messageKey(mKey)
.scope(mScope)
.builder();
Step 3: Subscribe to reply message key.
mFeed.subscribe();
} catch (IllegalArgumentException argex) {
// Open failed. Place recovery code here.
}
}
@Override
public void shutdown() {
synchronized (mRequests) {
for (ERequestFeed.ERequest request : mRequests) {
Step 6: Cancel request by closing request.
request.close();
}
mRequests.clear();
}
Step 7: On shutdown, close request feed.
if (mFeed != null) {
mFeed.close();
mFeed = null;
}
}
@Override
public void feedStatus(final EFeedState feedState, final ERequestFeed feed) {
if (feedState == EFeedState.DOWN) {
// Down. There are no repliers.
} else {
// Up. There is at least one replier.
}
}
Step 4: Wait for replies to request.
@Override
public void reply(final int remaining,
final EReplyMessage reply,
final ERequestFeed.ERequest request) {
final String reason = msg.replyReason();
if (msg.replyStatus == EReplyMessage.ReplyStatus.ERROR)
{
// The replier rejected the request. Report the reason
} else if (msg.replyStatus == EReplyMessage.ReplyStatus.OK_CONTINUING) {
// The replier will be sending more replies.
} else {
// This is the replier's last reply.
}
if (remaining == 0) {
synchronized (mRequests) {
mRequests.remove(request);
}
}
}
Step 5: Send a request message.
public void placeOrder(final String product,
final int quantity,
final Price price,
final ShippingEnum shipping,
final ShippingAddress address) {
final CatalogOrder msg = (CatalogOrder.builder()).subject(mKey.subject())
.timestamp(Instant.now())
.product(product)
.price(price)
.shipping(shipping)
.address(address)
.build();
try {
synchronized (mRequests) {
mRequests.add(mFeed.request(msg));
}
} catch (Exception jex) {
// Request failed. Put recovery code here.
}
}
}
ERequestor
,
EReplier
,
EReplyFeed
Modifier and Type | Class and Description |
---|---|
static class |
ERequestFeed.Builder
EREquestFeed.Builder is now the preferred
mechanism for creating a ERequestFeed instance. |
static class |
ERequestFeed.ERequest
This class represents an individual request, tracking the
current request state and the remaining repliers.
|
static class |
ERequestFeed.RequestState
A request is either not placed, active, done, or canceled.
|
protected static class |
net.sf.eBus.client.ESingleFeed.FeedType
Enumerates the supported feed types.
|
EFeed.AbstractClientTask, EFeed.FeedScope, EFeed.NotifyTask, EFeed.StatusTask<T extends IEFeed>
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
FEED_STATUS_METHOD
ERequestor.feedStatus(EFeedState, IERequestFeed)
method name. |
protected int |
mActivationCount
Tracks the number of contra-feeds matched to this feed.
|
protected net.sf.eBus.client.ESingleFeed.FeedType |
mFeedType
Specifies whether this is a publish, subscribe, request,
or reply feed.
|
protected net.sf.eBus.client.ESubject |
mSubject
The feed interfaces with this eBus subject.
|
static java.lang.String |
REPLY_METHOD
|
FEED_IS_INACTIVE, FEED_NOT_ADVERTISED, mEClient, mFeedId, mFeedState, mInPlace, mIsActive, mScope, NO_CONDITION, NOTIFY_METHOD, NULL_CLIENT
Modifier and Type | Method and Description |
---|---|
int |
activationCount()
Returns the feed activation count.
|
static ERequestFeed.Builder |
builder()
Returns a new
ERequestFeed builder instance. |
protected void |
inactivate()
Cancels this feed's active requests
|
EMessageKey |
key()
Returns the feed message key.
|
boolean |
mayClose()
Returns
true if a request may be unilaterally
canceled by calling ERequestFeed.ERequest.cancel() and
false if not. |
java.lang.String |
messageSubject()
Returns the feed
message key subect . |
void |
replyCallback(java.lang.Class<? extends EReplyMessage> mc,
ReplyCallback cb)
Sets the callback for a specific reply message class.
|
void |
replyCallback(ReplyCallback cb)
Puts the reply message callback in place
for all reply types.
|
ERequestFeed.ERequest |
request(ERequestMessage msg)
Forwards the request to all matching repliers, returning
the request instance.
|
void |
subscribe()
Subscribes this request feed to the eBus subject.
|
java.lang.String |
toString()
Returns a containing the feed message key and data member
values.
|
void |
unsubscribe()
Retracts this request feed from the associated subject.
|
addAllKeys, addKey, addListener, checkScopes, clientId, close, createDispatcher, defaultDispatcher, eClient, equals, feedId, feedState, findKeys, findKeys, findKeys, hashCode, inPlace, isActive, isFeedUp, isOverridden, loadKeys, location, register, register, register, removeListener, scope, setExhaust, shutdown, shutdown, shutdownAll, startup, startup, startupAll, storeKeys, storeKeys, storeKeys, subclassDistance
public static final java.lang.String FEED_STATUS_METHOD
ERequestor.feedStatus(EFeedState, IERequestFeed)
method name.public static final java.lang.String REPLY_METHOD
protected final net.sf.eBus.client.ESingleFeed.FeedType mFeedType
protected final net.sf.eBus.client.ESubject mSubject
feed type
.protected int mActivationCount
protected void inactivate()
inactivate
in class EFeed
public boolean mayClose()
true
if a request may be unilaterally
canceled by calling ERequestFeed.ERequest.cancel()
and
false
if not.true
if requests may be unilaterally
canceled.public void replyCallback(ReplyCallback cb)
cb
is
not null
, replies will be passed to cb
rather than
ERequestor.reply(int, EReplyMessage, ERequestFeed.ERequest)
.
A null cb
results in replies posted to the
ERequestor.reply(int, EReplyMessage, ERequestFeed.ERequest)
override.
Note that this method call overrides all previous calls
to replyCallback(Class, ReplyCallback)
. If
the goal is to use a generic callback for all replies
except one specific message, then use this method to put
the generic callback in place first and then use
replyCallback(EMessageKey, ReplyCallback)
.
cb
- reply message callback. May be null
.java.lang.IllegalStateException
- if this feed is either closed or subscribed.replyCallback(Class, ReplyCallback)
public void replyCallback(java.lang.Class<? extends EReplyMessage> mc, ReplyCallback cb)
cb
is not null
, replies will be passed to
cb
rather than
ERequestor.reply(int, EReplyMessage, ERequestFeed.ERequest)
.
A cb
results in replies posted to the
ERequestor.reply(int, EReplyMessage, ERequestFeed.ERequest)
override.
If the goal is to set a single callback method for all
reply message types, then use
replyCallback(ReplyCallback)
. Note that method
overrides all previous set reply callbacks.
mc
- the reply message class.cb
- callback for the reply message.java.lang.NullPointerException
- if mc
is null
.java.lang.IllegalArgumentException
- if mc
is not a reply for this request.java.lang.IllegalStateException
- if this feed is either closed or subscribed.public static ERequestFeed.Builder builder()
ERequestFeed
builder instance.
This instance should be used to build a single
request feed instance and not used to create multiple
such feeds.public void subscribe()
request(ERequestMessage)
may be called.subscribe
in interface IERequestFeed
java.lang.IllegalStateException
- if this feed is closed or the client did not override
ERequestor
methods nor put the required callback
in place.unsubscribe()
,
EFeed.close()
public void unsubscribe()
Note that un-subscribing does not cancel any active requests or prevent delivery of replies to those requests.
unsubscribe
in interface IERequestFeed
subscribe()
,
EFeed.close()
public ERequestFeed.ERequest request(ERequestMessage msg)
IllegalStateException
if no matching repliers are found and, consequently, no
replies will be received. Use the returned
ERequestFeed.ERequest
instance to query the request state or
cancel the request.
Note: once the ERequestFeed.ERequest
is
returned, the application is responsible for tracking
all active request instances. This request feed does
not store or track requests on behalf of the
application. Closing
this requests feed
does not automatically cancel active requests created by
this feed. The application is responsible for canceling
active requests.
msg
- send this request message to the repliers.java.lang.NullPointerException
- if msg
is null
.java.lang.IllegalArgumentException
- if msg
does not match the request subject key.java.lang.IllegalStateException
- if there are currently no repliers for this request.ERequestFeed.ERequest.close()
public java.lang.String toString()
public final EMessageKey key()
EPublishFeed
, then a
notification
message key is returned.public final java.lang.String messageSubject()
message key subect
.public final int activationCount()
Copyright © 2001 - 2024. Charles W. Rapp. All rights reserved.