Skip navigation links
eBus
v. 7.2.0

eBus v. 7.2.0
API Specification

eBus
A middleware API
Updated: January 20, 2024
Version: 7.2.0

See: Description

Packages 
Package Description
net.sf.eBus.client
This package contains the top-level eBus API which applications use to interact with eBus.
net.sf.eBus.client.monitor
This package is centered on RQMonitor class.
net.sf.eBus.client.sysmessages
This package defines the eBus system messages.
net.sf.eBus.config
Provides classes needed to configure eBus network and eBus core modules.
net.sf.eBus.feed.historic
This package contains a specialized feed providing notification feeds which support both historic and live messages.
net.sf.eBus.feed.historic.store
This package contains two implementations of IEMessageStore interface: InMemoryMessageStore and SqlMessageStore.
net.sf.eBus.feed.pattern
This package contains specialized feeds extending the basic feeds in net.sf.eBus.client.
net.sf.eBus.messages
Messages are the heart of eBus.
net.sf.eBus.messages.type
Provides classes for translating Java built-in types to and from binary encoding.
net.sf.eBus.net
Provides asynchronous API for Java NIO package.
net.sf.eBus.test
Provides classes supporting JUnit testing.
net.sf.eBus.text
TokenLexer takes a given input and returns a series of analyzed tokens.
net.sf.eBus.util
Contains supporting utility Java classes for net.sf.eBus.
net.sf.eBus.util.logging
Supplements java.util.logging package with a rolling log file handler, a pattern formatter and a logged status report.
net.sf.eBus.util.regex
This regular expression package is designed to perform efficient pattern matching over a TernarySearchTree.
net.sf.eBusx.geo
This package defines eBus message fields implementing GeoJSON data types as defined in GeoJSON specification..
net.sf.eBusx.io
This package provides the ability to monitor file/directory creation, modification, and deletion via the file notification.
net.sf.eBusx.monitor
This package provides messages to monitor both the on-going state and transient events of an eBus application at the object-level.
net.sf.eBusx.time
This package provides single Java class: EInterval.
net.sf.eBusx.util
This package provides an eBus interface for accessing Timer API.
eBus
A middleware API
Updated: January 20, 2024
Version: 7.2.0

Introduction

eBus provides two services: message routing and object task execution. Messages are routed between objects that are either in the same process, between processes on the same host, or between processes on different hosts - and the objects are unaware of where the other object resides. 1 There are two types of message routing: Notification and Request/Reply. eBus uses a broker-less, type+topic-based message routing architecture. Broker-less means that a separate message routing server is not used. Rather messages are transmitted directly between objects via the eBus API. Type+topic message routing means that the message class together with the message subject are used to route the message to subscribers. This technique provides type safety missing from topic-only routing. eBus refers to the type+topic combination as a message key.

Object execution is when eBus passes inbound messages to an application object method asynchronously using a dispatcher thread. These threads may be grouped together and associated with one or more application classes. If an application class is not assigned to a dispatcher group, then it is assigned to the default dispatcher group. An eBus client is run by only one dispatcher thread at any given time. While the client may be run by different dispatchers over time, only one dispatcher will run the client at any given moment ("run a client" means executing the client callback tasks). See EConfigure.Dispatcher for more information on configuring eBus dispatcher threads.

Messages are delivered to eBus clients asynchronously using dispatcher threads.

eBus maintains a weak reference to application objects which act as eBus clients. When eBus detects that a client is finalized, eBus automatically cleans up all the client's extant eBus feeds. Even so, it is better for an application to explicitly close its feeds when an application object is no longer needed rather than depend on automatic clean up.

The eBus Programmer's Manual covers the following information in greater detail.

eBus (as of release 5.0.0) is a Maven project deployed to Maven Central. All further releases will be available through Maven Central and make be accessed using the Maven dependency:

<dependency>
  <groupId>net.sf.ebus</groupId>
  <artifactId>core</artifactId>
  <version>7.2.0</version>
</dependency>

Gradle (long)

implementation group: 'net.sf.ebus', name: 'core', version: '7.2.0'

Gradle (short)

implementation 'net.sf.ebus:core:7.2.0'

Note: As of eBus release 6.0.0 this library is now based on Java 11. Previous versions are Java 1.8-based. Use appropriate version.

eBus Messages

An eBus message is defined by extending one of the following classes and providing a "builder" method to create the message from the deserialized fields:

  1. ENotificationMessage,
  2. ERequestMessage,
  3. EReplyMessage.

Message fields must consist of public final data members whose types from the following list:

eBus supported types.
boolean Boolean BigInteger BigDecimal byte Byte char Character Class Date
double Double Duration EField EFieldList EMessage EMessageList EMessageKey enum File
float Float net.sf.eBusx.geo GeoJSON Types InetAddress InetSocketAddress Instant int Integer LocalDate LocalDateTime
LocalTime long Long MonthDay OffsetTime OffsetDateTime Decimal Period short Short
String URI UUID Year YearMonth ZonedDateTime ZoneId ZoneOffset <field type>[]

Any eBus field type can be turned into an homogenous 2 array by appending [] to the field name.

An eBus message class must provide a public static method literally named builder() which returns an instance of a public static inner builder class. This builder class must extend the builder class matching the message base. That means if the message base class is:

  1. ENotificationClass, then builder extends ENotificationClass.Builder<M, B>,
  2. ERequestClass, the builder extends ERequestClass.Builder<M, B>, and
  3. EReplyClass, then builder extends EReplyClass.Builder<M, B>.

where M is the target class generated by the builder and B is the inner class builder. The reason for B is for setters defined in the super class. Builder setter methods return the builder instance which allows for setters to be chained (builder.setHeight(...).setWeight(...).setAge(...);). Super class setters maintain this chain by return ((B) this).

The reason for switching to the builder paradigm for message de-serialization is because eBus now determines message field serialization ordering based on field size. The longest fixed size fields are serialized first, followed by ever short fields. Fields of variable size are serialized last. The goal here is to maintain optimal byte alignment for each data type. Also, eBus now places all message boolean fields into a single long value where each boolean is a single bit within that long.

Consider the following: a user message extends EReplyMessage with class OrderReply. EReplyMessage has two fields of its own: replyStatus and replyReason. EReplyMessage extends EMessage which has another two fields: subject and timestamp. The example OrderReply message class and OrderPart message field class are:

The builder set method names must exactly match the message field names. The OrderReply fields names are: parts, shippingCost, and totalPrice, so the builder must have setter method names which match those three.

import java.math.BigDecimal;                                     import java.math.BigDecimal;
import net.sf.eBus.messages.EReplyMessage;                       import net.sf.eBus.messages.EField;

public final class OrderReply                                    public final class OrderPart
    extends EReplyMessage                                            extends EField
    implements Serializable                                          implements Serializable
{                                                                {
    @FieldDisplayIndex(index = 0)                                    @FieldDisplayIndex(index = 0)
    public final OrderPart[] parts;                                  public final String partId;

    @FieldDisplayIndex(index = 1)                                    @FieldDisplayIndex(index = 1)
    public final BigDecimal shippingCost;                            public final int quantity;

    @FieldDisplayIndex(index = 2)                                    @FieldDisplayIndex(index = 2)
    public final BigDecimal totalPrice;                              public final BigDecimal pricePerPart;

    private static final long serializeVersionUID = 0x1L;            private static final long serializeVersionUID = 0x1L;

    // Required "de-serialization" constructor.                      // Required "de-serialization" constructor.
    // Private because only the builder may access this              // Private because only the builder may access this
    // constructor.                                                  // constructor.
    private OrderReply(final OrderReplyBuilder builder) {            private OrderPart(final OrderPartBuilder builder) {
        super (builder);                                                 super(builder);

        this.parts = builder.mParts;                                     this.partId = builder.mPartId;
        this.shippingCost = builder.mShipping;                           this.quantity = builder.mQuantity;
        this.totalPrice = builder.mTotalPrice;                           this.pricePerPart = builder.mPrice;
    }                                                                }

                                                                     public BigDecimal totalPrice() {
                                                                         return (pricePerPart.multiply(BigDecimal.valueOf((long) quantity)));
                                                                     }

    public static OrderReplyBuilder builder() {                      public static final class OrderReplyBuilder  {
        return (new OrderReplyBuilder());                                return (new OrderPartBuilder());
    }                                                                }

    public static final class OrderPartBuilder                       public static final class OrderPartBuilder
        extends EReplyMessage.Builder<OrderReply, OrderReplyBuilder>     extends EField.Builder<OrderPart, OrderPartBuilder>
    {                                                                {
        private OrderPart[] mParts;                                      private String mPartId;
        private BigDecimal mShipping;                                    private int mQuantity;
        private BigDecimal mTotalPrice;                                  private BigDecimal mPrice;

        private OrderReplyBuilder() {                                    private OrderPartBuilder() {
            super (OrderReply.class)                                         super (OrderPart.class);
        }                                                                }

        public OrderReplyBuilder parts(final OrderPart[] parts) {        public OrderPartBuilder partId(final String id) {
            if (parts == null || parts.length == 0) {                        if (id == null || id.isEmpty()) {
                throw (                                                          throw (
                    new IllegalArgumentException(                                    new IllegalArgumentException(
                        "parts is null or empty"));                                      "id is null or empty"));
            }                                                                }

            mParts = parts;                                                  mPartId = id;
            return (this);                                                   return (this);
        }                                                                }

        public OrderReplyBuilder shippingCost(final double cost) {       public OrderPartBuilder quantity(final int quantity) {
            if (cost < 0d) {                                                 if (quantity < 0) {
                throw (new IllegalArgument("const < zero"));                     throw (new IllegalArgumentException("quantity < zero"));
            }                                                                }

            mShipping = BigDecimal.valueOf(cost);                            mQuantity = quantity;
            return (this);                                                   return (this);
        }                                                                }

        public OrderReplyBuilder totalPrice(final double px) {           public OrderPartBuilder pricePerPart(final double px) {
            if (px < 0d) {                                                   if (px < 0d) {
                throw (new IllegalArgumentException("px < zero"));               throw (new IllegalArgumentException("px < zero"));
            }                                                                }

            mTotalPrice = BigDecimal.valueOf(px);                            mPrice = BigDecimal.valueOf(px);
            return (this);                                                   return (this);
        }                                                                }

        // Checks if this builder contains a valid message.              // Checks if this builder contains a valid message.
        // This method is called by EMessageObject.Builder.build()       // This method is called by EMessageObject.Builder.build()
        // If there are no problems, then Builder.buildImpl()            // If there are no problems, then Builder.buildImpl()
        // is called which generates the target message;                 // is called which generates the target message;
        // otherwise a net.sf.eBus.messages.ValidationException is       // otherwise a net.sf.eBus.messages.ValidationException is
        // thrown.                                                       // thrown.
        // See EMessageObject for a
        // detailed message validation example.
        @Override                                                        @Override
        protected void validate(final Validator probs) {                 protected void validate(final Validator probs) {
            return (super.validate(probs)                                    return (super.validate(probs)
                         .requireNotNull(mParts, "parts")                                 .requireNotNull(mPartId, "partId")
                         .requireNotNull(mShipping, "shippingCost")                       .requireTrue(v -> (mQuantity > 0), "quantity", Validator.NOT_SET)
                         .requireNotNull(mTotalPrice, "totalPrice")                       .requireNotNull(mPrice, "pricePerPart")
        }                                                                }

        // Builder contains valid message. Time to create the target     // Builder contains valid message. Time to create the target
        // message instance.                                             // message instance.
        @Override protected OrderReply buildImpl()                       @Override protected OrderPart buildImpl()
        {                                                                {
            return (new OrderReply(this));                                   return (new OrderPart(this));
        }                                                                }
    }                                                                }
}                                                                }

Request messages extend ERequestMessage and include a EReplyInfo annotation which declares which messages may be sent in reply to the request. EReplyInfo is accumulative. The allowed reply messages include those listed in the current request message and its super classes. Because ERequestMessage has a EReplyInfo which includes EReplyMessage. This means that EReplyMessage may be sent in reply to all request messages because all request messages ultimately extend ERequestMessage.

(eBus release 5.6.0 introduced a new EReplyInfo attribute mayClose. To learn more about how this attribute works see Canceling an Active Request)

import net.sf.eBus.messages.EReplyInfo;
import net.sf.eBus.messages.EReplyMessage;
import net.sf.eBus.messages.ERequestMessage

@EReplyInfo (replyMessageClasses = {OrderReply.class}, mayClose = false)
public final class OrderRequest
    extends ERequestMessage
    implements Serializable
{
    public final OrderPart[] parts;
    private static final long serializeVersionUID = 0x1L;

    private OrderRequest(final OrderRequestBuilder builder)
    {
        super (builder);

        this.parts = builder.mParts;
    }

    public static OrderRequestBuilder builder() {
        return (new OrderRequestBuilder());
    }

    public static final class OrderRequestBuilder
        extends ERequestMessage.Builder<OrderRequest, OrderRequestBuilder>
    {
        private OrderPart mParts;

        private OrderRequestBuilder() {
            super (OrderRequest.class);
        }

        public OrderRequestBuilder parts(final OrderPart[] parts) {
            if (parts == null || parts.length == 0) {
                throw (new IllegalArgumentException("parts is null or empty"));
            }

            mParts = parts;
            return (this);
        }

        @Override protected OrderRequest buildImpl() {
            return (new OrderRequest(this));
        }

        @Override protected void validate(final Validator problems) {
            return (super.validate(problems)
                         .requireNotNull(mParts, "parts"));
        }
    }
}

The messages which may be sent in reply to an OrderRequest are ERequestMessage and OrderReply. If EReplyFeed.ERequest.reply(EReplyMessage) is passed a message that is not one of the two supported reply message types, then an IllegalArgumentException is thrown in response.

Local Message

At the start of this section, it was stated that message fields must be one of the supported eBus data types. This is not completely true. It is only necessary for messages which are intended for transmission to another eBus application. But if the message is intended for use within the local JVM only, then any valid Java type may be used. Such messages are marked by the @ELocalOnly class-level annotation. Local-only messages do not define public static builder method, public static inner builder class, and do not need to define a builder-accessible constructor. But the message fields must still be public final. The following code shows how to implement a local-only eBus message.

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.sf.eBus.messages.ELocalOnly;
import net.sf.eBus.message.ENotificationMessage;

@ELocalOnly public final class LocalMessage
    extends ENotificationMessage
    implements Serializable
{
    // Map is not an eBus-supported type but allowed in this local message
    public final Map<String, Integer> studentIds;
    private static final long serializeVersionUID = 0x1L;

    private LocalMessage(final Builder builder) {
        super (builder);

        studentIds = Collections.unmodifiableMap(builder.mStudentIds);
    }

    public static Builder builder() {
        return (new Builder());
    }

    public static final class Builder
        extends ENotificationMessage.Builder<LocalMessage, Builder>
    {
        private final Map<String, Integer> mStudentIds;

        private Builder() {
            super (LocalMessage.class);

            mStudentIds = new HashMap<>l;();
        }

        public Builder studentIds(final Map<String, Integer> ids) {
            mStudentIds.putAll(ids);
            return (this);
        }

        public Builder addStudentId(final String name, final int id) {
            mStudentIds.put(name, id);
            return (this);
        }

        public Validator validate(final Validator problems) {
            return (super.validate(problems)
                         .requireTrue(v -> (!v.isEmpty()), mStudentIds, "studentIds", Validator.NOT_SET));
        }

        public LocalMessage buildImpl() {
            return (new LocalMessage(this));
        }
    }
}

Message Field Annotations

The following field-level annotations may be applied to message fields of the specified type.

String: @EStringInfo

eBus uses java.nio.charset.StandardCharsets.UTF_8 as the default character set for serializing/de-serializing Java java.lang.String fields. This default character set can be overridden using the @EStringInfo annotation for String message fields:

@EStringInfo has a second attribute lineCount used to specify the number of lines that may be in the string field. If not defined, the line count defaults to a single line. lineCount is not used by eBus and is provided solely for application use. One possible use for this attribute is deciding whether to use a JavaFX TextField to input single line text or TextArea for multi-line text.

    @EStringInfo(charset="latin1") public final String lastName;

where charset should be a character set name or alias known to java.nio.charset.Charset.forName method. If forName throws an exception for the given charset name, then eBus will quietly use UTF_8 instead. No exception will be thrown or error logged.

All: @FieldDisplayIndex

eBus 5.3.2 introduced the annotation &commat;FieldDisplayIndex which is used to define field ordering for display purposes. The problem is that MessageType.fields() returns fields in serialization order. This ordering is meant for superior serialization performance but this ordering does not make human sense. This is corrected by MessageType.displayFields(). The returned field ordering is from base class EMessageObject fields down to the leaf class. Meaning super class fields are listed first, the leaf message class last.

Usage: @FieldDisplayIndex(index = n) where n ≥ zero. Field display indices do not have to start at zero or have to be strictly sequential but must be in increasing order. If multiple field display indices have the same value, the ordering is undefined among those fields.

Message Keys

eBus subjects are identified by a message key. This key consists of a message class and String subject. This is also known as type+topic referencing.

An example message key is: (com.acme.ProductUpdate.class, "JetSkates")
where com.acme.ProductUpdate is a class extending EMessage.

There is no eBus-defined subject format. Message subject format (if any) is left entirely up to the application developer. The above example could be:

  • (com.acme.ProductUpdate.class, "/retail/sporting-goods/JetSkates")
  • (com.acme.ProductUpdate.class, ".sporting-goods.retail.JetSkates")
  • (com.acme.ProductUpdate.class, "Wil E. Coyote special: JetSkates")

Message Key Dictionary

eBus release 4.5.0 allows applications to add message keys to and retrieve keys from the message key dictionary. Prior to release 4.5.0, message keys were added to the dictionary indirectly by opening publisher, subscriber, and reply feeds. Even then, this dictionary was not accessible' to applications.

Notification and request message keys are added using the methods EFeed.addKey(EMessageKey) and EFeed.addAllKeys(Collection<EMessageKey>). EFeed.addKey puts a single message key into the dictionary while EFeed.addAllKeys adds multiple keys to the dictionary. Note that both calls do not replace or overwrite keys that are already in the dictionary. If EFeed.addKey is called with a message key that is already in the dictionary, then the method does not update the existing message key entry. Attempting to add reply or system message keys results in an IllegalArgumentException because eBus does not store such keys in the message key dictionary.

Message keys are retrieved from the dictionary using the methods EFeed.findKeys(Class<? extends EMessage> and EFeed.findKeys(Class<? extends EMessage>, Pattern). The first call returns the all message keys associated with the given message class. The second call extends that to all keys for the message class and whose subject matches the pattern. Both methods return a non-null list. If the message class is either SystemMessage or ReplyMessage subclass, then an empty list is returned.

eBus provides the ability to store and load message key dictionaries with the methods EFeed.storeKeys(ObjectOutputStreawm), EFeed.storeKeys(Class<? extends EMessage>, ObjectOutputStream), and EFeed.storeKeys(Class<? extends EMessage>, Pattern, ObjectOutputStream). The first method stores the entire message key dictionary to the object output stream. The next two, like EFeed.findKeys, stores only those keys which match the message class and (optional) subject pattern. The stored message key stream is re-loaded with EFeed.loadKeys(ObjectInputStream). Like EFeed.addAllKeys, this method does not replace or overwrite existing message keys.

The purpose for the storeKeys and loadKeys methods is for applications to re-create the message key dictionary quickly upon start up. And the importance of this ability is to support multi-subject feeds also introduced in eBus release 4.5.0. Multi-subject feeds allow an eBus client to open multiple notification or reply feeds with a single feed.

Notification

Notification messages follow the advertise/subscribe paradigm together with type+topic-based routing. A notification feed is referenced by message key. A publisher opens a publish feed for a given message key and then advertises the capability of sending a notification message. When a publisher is capable of publishing the advertised notification message, it calls update feed state with an up feed state.

A subscriber informs eBus of interest in a notification message feed by opening a subscribe feed for the same message key and then subscribes to the notification feed.

eBus subject sits in the middle, tracking advertisements and subscriptions for each unique message key and deciding when the publishers should start and stop publishing based on the subscriber presence. The following table defines the states between publishers and subscribers:

0 Publishers > 0 Publishers
0 Subscribers No Feed Stop publisher feed
> 0 Subscribers Inform subscribers there are no publishers Start publisher feed

eBus uses advertisements and subscriptions to track when to:

  1. Start a feed because there is a subscriber wanting to receive the notification.
  2. Stop a feed because there are no more subscribers.
  3. Inform subscribers when a notification feed is up and to expect notifications or the feed is down and so there will be no notifications forthcoming.

Publishers and subscribers may retract their advertisements and subscriptions, respectively, at any time by calling EPublishFeed.unadvertise() and ESubscribeFeed.unsubscribe() for a particular advertisement or subscription. Both publish and subscribe feeds are still alive and may be re-advertised and re-subscribed. When an application now longer needs a feed, it should call EFeed.close() which permanently disposes the feed. The application should no longer reference a feed after closing it.

If a publisher experiences a problem preventing updates to a specific message key feed, the publisher can inform subscribers that the feed is down by calling feed.updateFeedState(EFeedState.DOWN). When the problem clears, the publisher calls feed.updateFeedState(EFeedState.UP), informing the subscribers that the feed is back up.

Note: a subscriber will see a EFeedState.DOWN feed state until a publisher both advertises the feed and calls feed.updateFeedState(EFeedState.UP). eBus does not assume that a publisher can publish notifications simply because it advertised the feed.

ENotification Fields

eBus release 5.6.0 introduced two optional fields to the ENotificationMessage class: publisherId and position. publisherId may be used to uniquely identifier the EPublisher instance responsible for generating a particular notification instance. It is the application's responsibility to define the meaning and uniqueness of publisher identifiers.

Position is used to define message ordering when notification messages are published at a rate faster than wallclock time granularity. In other words, there is more than one message per timestamp. If these message are persisted then message ordering may be lost upon retrieval. The position field may be used an an index within a given timestamp to guarantee correct ordering when retrieved from persistent store.

Combining subject, timestamp, publisherId, and position guarantees a unique index for any notification message placed into persistent store if publisher identifier and message position are correctly used.

Request, Reply

eBus uses the advertise/request/reply paradigm to forward request messages to advertised repliers. Replies are sent directly back to the requestor. A replier advertises the ability to reply to a request by:

  1. building a reply feed and
  2. advertising the feed.
  3. replyFeed(EFeedState.UP) marking the feed as available.
Like publish/subscribe, requests are routed to repliers using the advertised type+topic message key. A replier can unadvertise and re-advertise any number of times. But once the feed is closed, it may not be used again. Instead, a new feed must be opened.

A requester sends a request message by:

  1. building a request feed and
  2. posting the request message. request(msg) method returns the resulting request state. If there are no matching repliers for the request message, then an IllegalStateException is thrown. Otherwise, returns an ERequestFeed.ERequest instance which the requestor may use to cancel the request by calling ERequestFeed.ERequest.close().

The replier receives the request via the method EReplier.request(request). The replier sends a reply message via request by calling ERequestFeed.ERequest.reply(msg). The replier may send multiple replies for the same request by setting the reply status to EReplyMessage.ReplyStatus.OK_CONTINUING for all but the final reply. The last reply has the status EReplyMessage.ReplyStatus.OK_FINAL. If the request cannot be successfully handled, then the EReplyMessage.ReplyStatus.ERROR reply status should be sent. An error reply is also a final reply and no further replies may be sent afterwards. Also, an error reply may be sent even after ReplyStatus.OK_CONTINUING replies but not after ReplyStatus.OK_FINAL.

Replies are sent back to the requestor via the method ERequestor.reply(remaining, message, request) where the first argument, remaining, specifies the number of repliers which have not yet finished replying to the request. When remaining is zero, then this is the final reply and the request is finished. The request feed state is retrieved by calling ERequestFeed.ERequest.requestState().

The requestor may cancel a request any time prior to request completion. For a detailed explanation regarding request cancelation see Canceling an Active Request.

Request message definitions have an annotation: @EReplyInfo. This class-level annotation lists one or more reply message classes which may be sent in response to the request message. @EReplyInfo annotations are cumulative. This means that the list of allowed reply message classes includes those in this request message @EReplyInfo class list and those in the super class @EReplyInfo class list. Since EReplyMessage is in ERequestMessage @EReplyInfo annotation, and because ERequestMessage is the base class for all request message classes, EReplyMessage is a valid reply message for all requests.

In the following example, OrderReply and EReplyMessage messages may be sent in reply to an OrderRequest but not an Invoice reply message.

import net.sf.eBus.messages.EReplyInfo;
import net.sf.eBus.message.ERequestMessage;

@EReplyInfo (replyMessageClasses = {OrderReply.class}, mayClose = false)
public final class OrderRequest
    extends ERequestMessage

Canceling an Active Request

eBus release 5.6.0 introduced a new EReplyInfo optional attribute: mayClose. If true (which is the default setting) then a request may call ERequestFeed.ERequest.close() which immediately and completely closes the request which repliers can do nothing about. In the above example OrderRequest cannot be unilaterally closed. So how then may an OrderRequest be canceled?

Release 5.6.0 introduced a new method: ERequestFeed.ERequest.cancel() which is an optional cancel request. A new parameter boolean mayRespond was added to EReplier.cancelRequest method. If this flag is set to true the replier may respond to the cancel request and either accept or reject the cancellation. If set to false this is a unilateral cancellation and the replier may not respond.

If the replier may respond, that response is generally an EReplyMessage with the reply status set to either:

  • EReplyMessage.ReplyStatus.CANCELED: replier accepts the cancel and the request is now terminated (for the replier) or
  • EReplyMessage.ReplyStatus.CANCEL_REJECT: replier rejects the cancellation and is still working the request. More replies may be expected from this replier. EReplyMessage.replyReason should contain text explaining why the cancel request is rejected.

While a reply is not required if the cancellation is rejected it is a good idea to send back a ReplyStatus.CANCEL_REJECT to let the requester know that this is the case.

Please note that whether optionally cancel()ing or unilaterally close()ing a request, the requester may still receive replies while the cancellation process completes. 3

eBus Roles

Objects need to implement one or more of the client interfaces in order to interact with eBus:

Role Message
Subscriber Receive Notification
Publisher Send Notification
Requestor Send Request
Receive Reply
Replier Receive Request
Send Reply

Which interface(s) should be implemented depends on which eBus message types the object wants to send or receive. An application class may implement one or all four eBus interfaces.

Subscriber

The subscriber interface is implemented so a client may receive notification messages. The subscriber life cycle consists of:

  1. Opening a feed: call ESubscribeFeed.builder() obtaining a subscriber builder instance. Configure the subscribe feed via the Builder methods target, messageKey, scope, condition, statusCallback, and notifyCallback. The subscription condition is optional and may be null. If provided, then only those notifications matching the condition are forwarded to the subscriber. If statusCallback and notifyCallback are not defined, then the ESubscriber method overrides are used.

    Subscribe feed instance is created by calling Builder.build() method.

  2. Subscribing to a subject: call ESubscribeFeed.subscribe() to start receiving the messages from the notification feed.
  3. Receive feed state updates: eBus informs the subscriber of the subject's feed status via the feedStatus(feedState, feed) callback. If there are no publishers, then feedState is down. Otherwise, feedState is up and the subscriber can expect notifications from the feed. This method may be replaced using ESubscriber.Builder.statusCallback method.
  4. Receive notification messages: eBus passes the latest subject notification to a subscriber via the notify(message, feed) callback. This method may be replaced using ESubscriber.Builder.notifyCallback method.
  5. Unsubscribe from subject: call ESubscribeFeed.unsubscribe(). Once unsubscribed, the subscriber can re-subscribe by calling ESubscribeFeed.subscribe() again.
  6. Closing a feed: call EFeed.close(). If the subscription is in place when the subscribe feed is closed, the subscription is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.

Publisher

The publisher interface is implemented so a client may publish notification messages. The publisher life cycle is:

  1. Opening a feed: call EPublishFeed.builder() obtaining a publisher builder instance. Configure the feed target, message key, and scope. You can also override the default status callback method in the builder. Publish feed instance is created by calling Builder.build() method.
  2. Advertising a subject: call EPublishFeed.advertise() to register the ability to publish the given message key.
  3. Update the feed state: call EPublishFeed.updateFeedState(EFeedState) to inform eBus of the publisher's ability to publish notifications on this feed. If the publisher is capable of publishing this notification, pass in up; otherwise pass in down.
  4. Waiting for a publish status update: when the first subscriber for the notification feed arrives, eBus calls publishStatus(feedState, feed) method with EFeedState.UP. The publisher is now clear to start sending notification messages on the feed. When there are no more subscribers for the feed, publishStatus is called with a EFeedState.DOWN. The publisher should now stop posting notification messages to the feed. This mechanism results in notifications being posted to the feed only when there are subscribers registered to receive the message.
  5. Publishing notifications: notification messages are sent via EPublishFeed.publish(notification).
  6. Unadvertise when done: call ESubject.unadvertise() to retract the advertisement. Once unadvertised, the publisher may re-advertise the notification feed.
  7. Closing a feed: call EFeed.close(). If the advertisement is in place when the publish feed is closed, the advertisement is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.

Requestor

The requestor interface is implemented so a client may send requests to matching repliers. The requestor life cyle is:

  1. Opening a feed: call ERequestFeed.builder() obtaining a request feed builder instance. Use this builder to set the requestor, message key, and feed scope. The request feed instance is created by calling Builder.build() method.
  2. Subscribing to a feed: call subscribe(). Once subscribed, wait until feedStatus(EFeedState.UP, feed) is called, signifying that there is a replier for the request feed.
  3. Post a request: call request(message). This method returns the resulting ERequestFeed.RequestState. If ACTIVE is returned, then the request has repliers and the requestor will receive replies to the request. If DONE is returned, then there are no repliers to this request and the requestor will receive no replies.
  4. Receive replies: eBus sends replies back to the requestor via the reply(remaining, reply, request) callback where reply is the latest reply message and remaining parameter specifies the number of repliers still sending replies. When remaining is zero, then this represents the final reply.
  5. Cancel a request: see Canceling an Active Request for a detailed explanation regarding request cancellation.
  6. Unsubscribe: call subscribe() when finished posting requests to this feed.
Note: an ERequestFeed may be used only once. A new ERequestFeed instance must be opened for each new request.

Replier

The replier interface is implemented so a client may respond to requests. The replier life cyle is:

  1. Opening a feed: call EReplyFeed.builder() obtaining a reply feed builder instance. Calling builder set methods configures the reply feed. Once configured call Builder.build() to create the reply feed instance.
  2. Advertise a subject: call advertise() to register the ability to reply to the given request message key.
  3. Setting feed status: call updateFeedState(EFeedState.UP) to inform requestors that this replier is ready to handle requests. If a replier is temporarily prevented from responding to requests, then set the feed state to EFeedState.DOWN. When the issue is resolved, then mark the feed state UP.
  4. Wait for requests: requests are passed to this replier via the EReplier.request(request) callback. The replier does not have to reply immediately but if replying asynchronously, then request must be stored away for later use since request is used to send replies. The EReplyFeed may have receive multiple requests over time but an individual request is contained in an ERequest instance.
  5. Respond to requests: a replier sends a reply message via reply(msg).
  6. Handle canceled requests: see Canceling an Active Request for detailed explanation about how to handle a call to EReplier.cancelRequest(request, mayRespond).
  7. Unadvertise when done: call EReplyFeed.unadvertise(). Once un-advertised, a reply feed may be advertised again.
  8. Closing a feed: call EFeed.close(). If the advertisement is in place when the reply feed is closed, the advertisement is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.

Lambda Expressions Callbacks

By default, eBus passes messages to application objects by calling the appropriate role interface method, like ESubscriber.notify(ENotificationMessage, ESubscribeFeed). This is fine if a subscriber opens only a single feed. But if the subscriber opens multiple feeds, each feed's messages are posted to ESubscriber.notify. The notify method then becomes a switch, untangling the inbound notifications and routing each message to its destination method where the real work is performed. In this case, ESubscriber.notify is pure overhead.

eBus v. 4.2.0 fixes this problem by allowing applications to associate callback code with a feed. Application objects are still required to implement role interface but do not have to override the role interface methods. Instead, the application registers callback code using Java lambda expressions. The steps for putting a notification subscription in place is now:

import net.sf.eBus.client.ECondition;
import static net.sf.eBus.client.EFeed.FeedScope;
import net.sf.eBus.client.EFeedState;
import net.sf.eBus.client.ESubscribeFeed;
import net.sf.eBus.client.ESubscriber;
import net.sf.eBus.client.FeedStatusCallback;
import net.sf.eBus.client.NotifyCallback;

public class MyAppClass implements ESubscriber {

    // ESubscriber interface methods are not overridden.
    private ESubscribeFeed mFeed = null;

    public void startup() {
        EMessageKey messageKey = new EMessageKey(CatalogUpdate.class, "Spring"); // Receive updates to Spring catalog.

        // The subscription condition may now be defined using a lambda expression.
        // Only updates to the catalog camping section are accepted.
        mFeed = (ESubscribeFeed.builder()).target(this)
                                          .messageKey(messageKey)
                                          .scope(FeedScope.REMOTE_ONLY)
                                          .condition(m -> ((CatalogUpdate) m).section == CatalogSection.CAMPING))
                                          .statusCallback((feedState, feed) -> {
                                                             if (feedState == EFeedState.DOWN) {
                                                                 // Handle lost feed.
                                                             }
                                                             else {
                                                                 // Handle gained feed.
                                                             }
                                                          })
                                          .notifyCallback((msg, feed) -> {
                                                              // Process latest message.
                                                          })
                                          .build();
        feed.subscribe();
    }

    public void shutdown() {
        if (mFeed != null) {
            mFeed.close();
            mFeed = null;
        }
    }
}
As of eBus release 7.1.0, notify callback may be declared using the target notification message type rather than ENotificationMessage. Please note that the method may be private unlike overriding notification(ENotificationMessage, IESubscribeFeed).
private void onUpdate(final CatalogUpdate msg, final IESubscribeFeed feed) { ... }

Failure to override the role interface methods and put the role callbacks in place results in eBus throwing an ValidationException when building the feed.

Hybrid Object Pattern

When using eBus in a major application the central eBus objects will contain so many feeds that the object becomes convoluted and difficult to understand. As an example consider an eBus object implementing a trading algorithm. This object implements the following roles:

  • EReplier: receives algo requests to buy or sell an instrument using the algorithm's logic.
  • ERequestor: algorithm places order on one or more exchanges in order to satisfy its own request.
  • ESubscriber: algorithm subscribes to market data, dynamic configuration changes to algorithm parameters, instrument and exchange dynamic performance statistics used to guide placing exchange orders.
  • EPublisher: algorithm publishes its own dynamic performance statistics.

Placing all these feeds and associated data members into a single eBus object results in code confusion, making it difficult to separate the essential algorithm data and logic from subsidiary information supporting the algorithm. This section describes a hybrid object pattern useful for reducing this code confusion.

Firstly, what is a hybrid object? There are two types of classes: active and passive. A passive object does not initiate actions but is only acted upon. Example passive classes are java.util.ArrayList or java.time.Duration. Instances of these classes do nothing until other code initiates a method call to the instance. A passive object is contained within an active object.

An active object can initiate action by sending an eBus message. An active object does not need to wait to be acted upon before acting. An active object does not exist within another object, standing alone within an application.

A hybrid object lies between active and passive. A hybrid object sends and/or receives messages but is contained within an active object. This active object treats the hybrid object as a passive object, interacting with the hybrid object using method calls. Please note that hybrid objects should not be shared among active objects.

Back to the trading algorithm active object. The following example concentrates on the market data hybrid object only but can be readily expanded to handle the feeds as well.

public final class MakeMoneyAlgo
    implements EReplier, ERequestor, ESubscriber, EPublisher {
    // Hybrid objects created on active object start up.
    private MarketData mMktData;
    private MakeMoneyConfig mAlgoConfig;
    ...

    // Receives order requests on this
    private EReplyFeed mOrderFeed;

    public MakeMoneyAlgo(...) { ... }

    @Override public void startup() {
        // Create hybrid instances and then have hybrid create its feeds.
        // Note that this MakeMoneyAlgo reference is passed to hybrid object
        // constructor.
        mMktData = new MarketData(this, ...);
        mMktData.startup();
        mOrderFeed = (EReplyFeed.builder()).target(this)
                                           .messageKey(order request key)
                                           .scope(EFeed.FeedScope.REMOTE)
                                           .requestCallback(this::onOrder)
                                           .build();
        mOrderFeed.advertise();
        mOrderFeed.updateFeedState(EFeedState.UP);
    }

    private void onOrder(final EReplyFeed.ERequest request,
                         final EReplyFeed feed) {
        final OrderRequest newOrder = (OrderRequest) request;
        final OrderBook book =
            mMktData.getOrderBook(newOrder.instrument, mAlgoConfig.getBookDepth());

        // Validate and process order request using information retrieved from hybrid
        // objects.
    }
}

Note that MakeMoneyAlgo implements the EReplier, ERequestor, ESubscriber, and EPublisher interfaces and not the hybrid objects. This is because only active objects may implement eBus role interfaces. This includes the EObject interface.

Passing the MakeMoneyAlgo reference to the MarketData constructor is key to any hybrid object:

public final class MarketData {
    // MakeMoneyAlgo instance containing this hybrid object.
    private final MakeMoneyAlgo mOwner;

    // Market data subscription made on start up.
    private ESubscribeFeed mMktDataFeed;

    // Place order book data members here.

    public MarketData(final MakeMoneyAlgo owner, ...) {
        mOwner = owner;
        ...
    }

    public void startup() {
        // Note: hybrid feed uses MakeMoneyAlgo reference as target but uses
        // MarketData methods for feed status and message delivery.
        // This is what makes MarketData a hybrid object.
        mMktDataFeed = (ESubscribeFeed.builder()).target(mOwner)
                                                 .messageKey(market data message key)
                                                 .scope(EFeed.FeedScope.REMOTE)
                                                 .statusCallback(this::onFeedStatus)
                                                 .notifyCallback(this::onMarketData)
                                                 .build();
        mMktDataFeed.subscribe();
    }

    public OrderBook getOrderBook(final Instrument instrument, final int depth) {
        // Returns instrument's order book up to the given depth. MakeMoneyAlgo uses
        // this order book combined with other parameters to decide what orders to
        // place on what exchanges.
    }

    private void onFeedStatus(final EFeedState state, final IESubscribeFeed feed) {
        // Place feed status update code here especially when market data feed goes
        // down.
    }

    private void onMarketData(final ENotificationMessage msg, final IESubscribeFeed feed) {
        // Update market data members with latest update.
    }
}

See Dispatcher section for why hybrid object feeds must target its active object owner.

eBus Feeds

Simple Feeds

Simple feeds 1) contain a single message key and 2) connect subscribers/requestors to all the simple publisher/replier feeds that exist for the same message key. What a publisher posts to simple publish feed is forwarded to all existing and subscribed simple subscription feeds. Similarly, requests posted to a simple request feed are forwarded to all existing and advertised simple reply feeds. Messages are exchanged between simple feeds only when both sides are open and in place (advertised or subscribed).

The above roles section showed application classes interacting with simple eBus feeds: EPublishFeed, ESubscribeFeed, ERequestFeed, and EReplyFeed.

eBus release 6.1.0 introduced feed Builder class for EPublishFeed, ESubscribeFeed, ERequestFeed, and EReplyFeed feed classes. Feed builders are now the preferred technique for creating single feed instances. The open methods are now deprecated and will be removed in a future release.

Multi-Subject Feeds

Multi-Subject feeds act as a proxy between the application object and multiple, subordinate simple feeds. The multi-subject feed opens, configures, advertises/subscribes, and closes these subordinate feeds in unison. The multi-subject feed makes sure the subordinate feeds are in the same state at the same time.

But it is the subordinate simple feeds which interact with the application object. The multi-subject feed opens a subordinate simple feed, passing in the application object as the feed client. That means the subordinate feed calls back to the application object. If the application object creates a multi-subject feed with 1,000 subordinate feeds, then the application object receives callbacks from 1,000 subordinate feeds.

Multi-subject feeds behave in a similar manner to simple feeds. They are opened, may have callbacks configured, advertised/subscribed, un-advertised/un-subscribed, and closed. Note the multi-subject feed configures the callback methods for the subordinate feeds based on how the multi-subject feed is configured. This means that each subordinate feed calls back to the same method. It is not possible for subordinate feeds belonging to the same multi-subject feed to call back to different methods.

Multi-subject feeds use the same role interfaces as simple feeds:

Multi-Subject Feed Role
Multi-Subject Publish Feed EPublisher
Multi-Subject Subscribe Feed ESubscriber
Multi-Subject Reply Feed EReplier
Multi-Subject Request Feed ERequestor

New subordinate feeds may be added to or removed from an open multi-subject feed by specifying the message subject to be added or removed. The newly added subordinate feed references the same message class as the other feeds.

Multi-subject feeds are homogenous with respect to message class but heterogenous with respect to message subject. A subscriber opens and subscribes a multi-subject subscribe feed, all messages received from the subordincate feeds will be the same message class. It is not possible to open a multi-subject feed for more than one message class.

Note that multi-subject feeds are not eBus feeds. This is because multi-subject feeds are proxy feeds, posing as an interface between application objects and eBus feeds.

Pattern Feeds

eBus release 4.6.0 introduced pattern feeds. A pattern feed watches one or more notification feeds and reports a MatchEvent when events arriving on those feeds match the given pattern. The best way to explain how this works is by example.

A stock market algorithm reacts to the following trade pattern in stock ABCD:

  1. The pattern starts with a trade quantity > 1,000 shares.
  2. All subsequent trades must be at a price > than the average price of all previous trades. So the price is trending up. The quantity may increase or slowly decrease.
  3. The pattern ends with a trade quantity < 0.8 * the previous trade quantity. So a drop in traded quantity marks the end of the trading spurt.
  4. The time difference between the first and final trade must be ≤ 1 hour.
  5. Events which are used in one match may not be used in another match. In other words, there is no intersection between event sets for any two different matches from the same pattern. Exclusivity does not hold across patterns.

An eBus pattern is created using a EventPattern.Builder. A pattern consists of two parts: 1) notification feed(s) (known as parameters) and 2) the pattern. Parameters can either be built or provided to the builder as a java.util.Map. This example builds the parameter. See EventPattern.Builder for an example of a parameter Map.

Event patterns come in two flavors: ordered and unordered. This example is ordered since events must arrive in a specified order. Again, see EventPattern for an example of an unordered pattern.

An event pattern builder is created as follows:

EventPattern.Builder builder = EventPattern.builder("/stock/pattern/TradeBlip/ABCD", EventPattern.PatternType.ORDERED);

where the first argument "/stock/pattern/TradeBlip/ABCD" is the pattern name. This name is used as the MatchEvent subject. Duplicate pattern names are allowed. The second argument tells the builder to build an ordered pattern. The optional third argument is the parameter map.

The first step is parameter map definition. Since the pattern uses only equity trade notifications, there is one parameter. The parameter definition is the information needed to build a notification subscription feed.

final String param = "t";

builder.beginParameterMap()   // start parameter map definition.
       .beginParameter(param) // define parameter "t"
       .messageKey(new EMessageKey(EquityTradeMessage.class, "/stock/trade/ABCD"))
       .scope(EFeed.FeedScope.REMOTE_ONLY)
       .endParameter()        // parameter "t" defined.
       .endParameterMap();    // all parameters defined.

With the parameter map defined, it is time to define the pattern components. The first component is a trade with a quantity > 1,000. The match condition takes two parameters: the latest event (t) and the current groups map (u).

Like Java regular expressions, matched elements may be stored within a named group. These groups are stored in a Map<String, List<ENotification>> which maps the group name to its associated list of matched events. A groups map always contains the group EventPattern.ALL_EVENTS. This group contains all matched events.

builder.singleComponent(param)
       .matchCondition((t, u) -> (((EquityTradeMessage) t).trade).size > 1_000)
       .endSingleComponent();

The second component is for the trade price trending up and the trade quantity staying at a high level. These trades are captured in group "g2". Note that multiple trades may satisfy this condition. By default the match count is exactly one event. Method avgPx is a user-defined method calculating the average trade price across all previously matched trade events.

builder.beginGroup("g2")
       .singleComponent(param)
       .matchCount(1, Integer.MAX_VALUE)
       .matchCondition((t, u) -> ((((EquityTradeMessage) t).trade).price).compareTo(avgPx(u.get(EventPattern.ALL_EVENTS))) > 0)
       .endSingleComponent()
       .endGroup("g2");

The final component detects a 20% drop in trade quantity, marking the end of the upward moving trade blip.

builder.singleComponent(param)
       .matchCondition((t, u) ->
                       {
                           final EquityTradeMessage currTrade = (EquityTradeMessage) t;
                           final List<ENotificationMessage> trades = u.get(EventPattern.ALL_EVENTS);
                           final EquityTradeMessage prevTrade = (EquityTradeMessage) trades.get(trades.size() - 1);

                           return ((currTrade.trade).size < (int) (0.8 * (prevTrade.trade).size));
                       })
       .endSingleComponent();

The one hour time limit is enforced by the until condition. Note that the until condition does not have to be time-based at all. It may be based on any event property.

builder.until((t, e) ->
              {
                  // An empty event list means that the time limit is not exceeded.
                  boolean retcode = t.isEmpty();

                  if (!retcode) {
                      final EquityTradeMessage first = (EquityTradeMessage) t.get(0);
                      final long duration = (e.timestamp - first.timestamp);

                      retcode = (duration ≤ COMPLEX_TIME_LIMIT);
                 }

                 return (retcode);
              });

The following code declares that each pattern match has exclusive use of its matching events. The default setting is false.

builder.isExclusive(true)

With the pattern defined, the EventPattern is constructed with the build method.

final EventPattern pattern = builder.build();

A pattern feed is opened just like any other feed type:

final EventPattern patFeed = EPatternFeed.open(subscriber, pattern);

patFeed.subscribe();

where subscriber is an ESubscriber instance, just as in an ESubscribeFeed. The feed status and notification methods can be either the standard ESubscriber.feedStatus and ESubscriber.notify interface methods or lambda expression callbacks set via EPatternFeed.statusCallback and EPatternFeed.notifyCallback methods.

A pattern feed state is up only when all the subordinate parameter feeds are up.

When events match a pattern, a MatchEvent is posted to the subscriber just like any other event. 4 The match event in this example is posted to the message key MatchEvent:/stock/pattern/TradeBlip/ABCD. A match event contains only one field: the group map. A helper method List<ENotificationMessage> group(String name) is provided which returns the notification list associated with the given group name. The group map and the notification lists in that map are read-only.

In this example, the group map contains two entries: EventPattern.ALL_EVENTS and "g2". The "g2" group contains the one or more trade events which occurred between the first and last events.

Historic Feeds

eBus release 6.3.0 introduced historic feeds. An historic feed allows subscribers to retrieve notification messages published in the past and into the future in a seamless manner. Historic publishers use a message store to persist and retrieve notification messages. The net.sf.eBus.feed.historic package introduces the following interfaces:

  • IEHistoricPubliser: implement this class to both publish notification messages to active subscribers and persist those messages.
  • IEHistoricSubscriber: implement this class to receive both past and future notification messages in a seamless manner.
  • IEMessageStore: implement this message to provide notification message persistence and retrieval.

There are two historic feeds: EHistoricPublishFeed and EHistoricSubscribeFeed. These feeds are used to publish/persist and receive notification messages respectively. These feeds do not implement the IEPublishFeed or IESubscribeFeed interfaces (see below). Instead, the historic feeds are implemented as hybrid objects and run on the same dispatcher as their respective historic publisher/subscriber. The historic publish feed not only publishes live notification messages but also handles requests for historic messages. Conversely, the historic subscribe feed subscribes to live notification messages and requests past messages, combining both historic and live messages as a single stream to the historic subscriber.

eBus release 7.2.0 introduces two IEMessageStore implmentations: InMemoryMessageStore and SqlMessageStore. See the javadocs for each to learn how to use these message stores with an EHistoricPublishFeed.

See the net.sf.eBus.feed.historic package docs and the programmer's manual section entitled "Don't know must about history" for a detailed explanation on using historic feeds.

Feed Interfaces

Because multi-subject feeds does not extend EFeed, it is not possible to store single- and multi-subject feeds in a common way - until now. eBus release 4.5.2 introduced the following interfaces: IEPublishFeed, IESubscribeFeed, IERequestFeed, and IEReplyFeed. These interfaces extend IEFeed. The publish, subscribe, request, and reply single- and multi-subject feeds implement the matching feed interface.

These feed interfaces allow applications to store feed references in a common way. If an eBus client opens a mix of single- and multi-subject subscriptions, then the client can store these feeds in a common List<IESubscribeFeed>, not forced to handle single- and multi-subject feeds separately.

Dispatching Messages to Application Objects

eBus wraps application objects in a weak-reference client. This client tracks the application object's extant feeds, undelivered message queue, and the run queue associated with the application class. The run queue ties the application object to the eBus dispatcher.

An application can configure eBus with multiple run queues. Each run queue is associated with one or more application classes where a class may appear in only one run queue class set. In other words, the run queue class sets are non-intersecting. One run queue is designated as the default and has an empty class set. An application class which does not appear in any specific run queue class set is assigned to the default run queue.

Each one run queue has one or more dispatcher threads associated with it. The threads are given the same, user-configurable priority. Each thread polls for the next ready client on the run queue. When a client is acquired, the dispatcher thread posts queued messages to the application object (more detail on this below). The run queue is application-configured to poll in four ways:

  1. Blocking: the run queue uses a lock and condition, blocking on the condition until a client is posted on the run queue and the condition is signaled.
  2. Spinning: the run queue spins in a tight loop until a client is posted.
  3. Spin+Park: the run queue alternates between spinning and parking while waiting for a client posting. The application may configure the spin limit and nanosecond park time.
  4. Spin+Yield: the run queue alternates between spinning and yielding while waiting for a client posting. The application may configure the spin limit.
Note that since a client is associated with a single run queue, that the client is either on the run queue or not, and, if on the run queue, then the client appears only once on the run queue. A dispatcher thread acquiring that client is the only dispatcher thread to hold a reference to that client.

Putting this all together, it means that application objects are referenced by only one dispatcher thread at a time, making eBus message delivery single-threaded. If an application object is referenced solely by eBus dispatcher threads, then that application object is effectively single-threaded. That application object does not need to used synchronized, java.util.concurrent.locks.Lock, atomics, etc. to protect critical sections because a single-threaded object has no critical sections.

(Note: the above does not apply to application objects which may be referenced by non-dispatcher threads.)

The basic dispatcher thread run() method is:

public void run() {
    while (true) {
        acquire next available client (blocking)

        while (client has pending messages && client remaining quantum > 0) {
            startTime = System.nanoTime();
            deliver next message to client;
            delta = (System.nanoTime() - startTime);

            update client remaining auantum;
        }

        if (client remaining quantum ≤ 0) {
            set client quantum to run queue's max quantum.
        }

        if (client has pending messages) {
            append client to run queue.
        }
    }
}
Each run queue has a configurable maximum run quantum. Each client is assigned this maximum run quantum when instantiated. This quantum prevents a single client from dominating a run queue which the client shares. Note that the client quantum is not used to preempt client message processing. The client quantum is checked after notify(msg, feed) (for example) returns. If notify(msg, feed) goes into an infinite loop, then the dispatcher thread making the call is lost to all other application objects using that run queue.

A eBus client has four run states:

  1. IDLE: the eBus client has no pending messages and is not posted to the run queue.
  2. READY: the eBus client has pending messages and is posted to the run queue.
  3. RUNNING: a dispatcher thread acquired the eBus client and is dispatching the client's pending messages to that client. This will continue until the client has no more pending messages or quantum.
  4. DEFUNCT: the underlying application object is finalized. Any and all extant client feeds are automatically retracted when this state is entered.

Active and Hybrid Objects

The reason why hybrid object feeds must target their parent active object should now be apparent: because the hybrid object shares its parent active object's Dispatcher. While the hybrid object's method process the message, that message is posted to the active object's queue. So the active object remains single threaded but subordinate message processing to its hybrid objects.

(See hybrid object section for more about active and hybrid objects.)

Starting and Stopping Application Objects

When an object opens feeds, there is a race condition because eBus may call the object back while that object is still opening feeds. Consider the following method which often occurs in eBus applications:

public void start() {
    mSubFeed = ESubscribeFeed.open(this, subKey, FeedScope.LOCAL, null);
    mSubFeed.subscribe();

    // Race condition between feedStatus callback and opening, advertising publish feed.
    mPubFeed = EPublishFeed.open(this, pubKey, FeedScope.LOCAL);
    mPubFeed.advertise();
}

@Override public void feedStatus(EFeedState feedState, IESubscribeFeed feed) {
    // mPubFeed may still be null or unadvertised causing the following line to fail.
    mPubFeed.updateFeedState(feedState);
}
The problem is that the subscription feed status callback may occur before the publish feed is opened and advertised. The simplest solution is switch the subscribe and publish feed opening. But what if the example opened more feeds with a more complex relation between feeds. It may be difficult (if not impossible) to open the feeds in the correct order.

The next solution is to synchronize the start and feedStatus methods. That would prevent the eBus callback from interrupting the object start. But that would add overhead to the feedStatus callback which is only needed for the one-time object start.

So the true problem is: how to prevent eBus callbacks from interrupting an object start. eBus v. 4.3.1 offers a solution: have the object start occur on an eBus Dispatcher thread.

If the object is starting on an eBus Dispatcher thread, then any new eBus callbacks will be delivered after the object start returns. This is done by changing start to @Override public void startup()

This method is executed on the eBus Dispatcher thread by:

  1. Registering the object with eBus: EFeed.register(object) and then
  2. Having eBus start up the object: EFeed.startup(object)
See the eBus Programmer's Manual section entitled "Gentlemen, start your objects" for detailed information on object registration and start-up.

Configuring Dispatchers

Dispatchers are be set up when an eBus application starts and only then. Dispatchers can only be configured using a properties file and referencing that file in the command line parameter -Dnet.sf.eBus.config.jsonFile=<conf file path>. What follows is a sample properties file which defines two Dispatchers: mdDispatcher and defaultDispatcher. See the eBus Programmer's Manual section on Dispatchers for a detailed explanation of the properties.

As of eBus release 6.0.0 Java properties are no longer supported with respect to eBus configuration. Only typesfafe JSON configuration files may be used.

// Two separate dispatchers are used: a spinning dispatcher for market data
// clients and a spin+park selector for all others.
dispatchers : [
    {
        name : mdDispatcher
        numberThreads : 1
        runQueueType : spinning
        priority : 9
        quantum : 10000ns
        isDefault : false
        classes : [ "com.acme.trading.MDHandler" ]
        threadAffinity {          // optional, selector thread core affinity
            affinityType : CPU_ID // required, core selection type.
            cpuId : 7             // required for CPU_ID affinity type
            bind : true           // optional, defaults to false
            wholeCore : true      // optional, defaults to false
        }
    },
    {
        name : defaultDispatcher
        numberThreads : 8
        runQueueType : "spin+park"
        spinLimit : 2500000
        parkTime : 1000ns
        priority : 4
        quantum : 100000ns
        isDefault : true
    }
]

eBus release 5.8.0 introduced the ability to create an affinity between a thread and a particular CPU. This affinity is created using the OpenHFT Thread Affinity Library. Please see the OpenHFT Thread Affinity documentation and ThreadAffinityConfigure for more information regarding using thread affinity.

Monitoring Dispatchers

eBus release 6.1.0 introduced access to dispatcher run-time statistics by adding the static method EClient.runTimeStats() which returns a EBusObjectInfo list. A EBusObjectInfo instance applies to a single eBus EObject and contains the following (all times are in nanoseconds):

  • eBus object name,
  • eBus-assigned client identifier,
  • minimum run time,
  • maximum run time,
  • total run time,
  • number of times object was executed in Dispatcher,
  • average run time,
  • Dispatcher maximum quantum,
  • number of times this eBus object exceeded Dispatcher maximum quantum, and
  • current thread denial alarm condition.

Sample output of the Dispatcher run-time stats is:

ConnectionPublisher (id 0)
    min run time: 1,364 nanos
    max run time: 32,743,678 nanos
  total run time: 34,189,949 nanos
       run count: 4
    avg run time: 8,547,487 nanos
      dispatcher: general
     max quantum: 100,000 nanos
quantum overruns: 3

MulticastConnectionPublisher (id 1)
    min run time: 613 nanos
    max run time: 751,792 nanos
  total run time: 763,513 nanos
       run count: 3
    avg run time: 254,504 nanos
      dispatcher: general
     max quantum: 100,000 nanos
quantum overruns: 1

PingPong Main (id 2)
    min run time: 10,541 nanos
    max run time: 3,700,790 nanos
  total run time: 3,711,331 nanos
       run count: 2
    avg run time: 1,855,665 nanos
      dispatcher: general
     max quantum: 100,000 nanos
quantum overruns: 1

Ping! Pong! Timer (id 3)
    min run time: 1,260 nanos
    max run time: 9,877,401 nanos
  total run time: 10,195,402 nanos
       run count: 5
    avg run time: 2,039,080 nanos
      dispatcher: general
     max quantum: 100,000 nanos
quantum overruns: 2

Pinger (id 4)
    min run time: 61 nanos
    max run time: 33,913,494 nanos
  total run time: 953,601,532 nanos
       run count: 338,447
    avg run time: 2,817 nanos
      dispatcher: ping
     max quantum: 100,000 nanos
quantum overruns: 179

Ponger (id 5)
    min run time: 164 nanos
    max run time: 4,439,180 nanos
  total run time: 926,228,288 nanos
       run count: 132,905
    avg run time: 6,969 nanos
      dispatcher: pong
     max quantum: 100,000 nanos
quantum overruns: 235

Monitoring Run Queue Threads

eBus release 6.2.0 introduced the singleton class RQMonitor which monitors Dispatcher run queue thread performance. This differs from EClient.runTimeStats() method which focuses on eBus object performance. This monitoring includes:

  • Reporting run queue thread statistics at a configurable interval. These reports are RQThreadReport notifications on the RQMonitor.REPORT_SUBJECT.
  • Reporting eBus object statistics at the same configurable interval as the run queue thread report. These reports are EBusObjectReport notifications on the RQMonitor.REPORT_SUBJECT. Note that this report contains the same information as EClient.runTimeStats() method.
  • Reporting when an eBus object is overrunning the dispatcher maximum quantum (plus a configurable limit). An alarm is raised via the ThreadOverrunUpdate notification on the RQMonitor.RQALARM_SUBJECT.
  • Reporting when a run-ready eBus object is denied access to a run queue thread for too long (again how long is configurable). An alarm is raised via the ThreadDenialUpdate notification on the RQMonitor.RQALARM_SUBJECT.

Alarms are raised when a condition transitions from cleared to alarmed or from alarmed to cleared. The run queue report contains the current thread alarm state.

RQMonitor also implements StatusReporter interface and will add run queue thread and eBus object stats to the StatusReport if the RQMonitor is started and registered with StatusReport.

For more information on how to use the run queue thread monitor, see RQMonitor.

Connecting eBus Applications

eBus applications may be connected by having one application open a service and the other applications connect to that service. See EServer and ERemoteApp for complete explanation of eBus service and connection configuration.

When two eBus applications connect, each eBus sends its local advertisements with remote scope to the other. This allows local subscriptions and requests with remote scope to be satisfied by remote advertisements. eBus does not forward an advertisement from one remote eBus to another remote eBus. Nor does eBus forward notifications and requests from one remote eBus to another remote eBus. eBus only routes messages from:

  1. a local client to a local client,
  2. from a local client to a remote client or
  3. from a remote client to a local client.

eBus allows only one connection to exist between two running applications. Once one eBus application connects to another, any further attempts to connect by either application will fail. eBus can also be configured to accept connections only from a predefined set of Internet addresses or even particular Internet address, TCP port pairs. This filtering is positive only. Negative filtering (accept all connections except those specified) is not supported.

eBus applications can programmatically monitor eBus connections by subscribing to the message key net.sf.eBus.client.ConnectionMessage:"/eBus". Connection messages are published when a remote eBus either logs on or logs off. Subscribe to the message key net.sf.eBus.client.ServerMessage:"/eBus" to learn when new connections are accepted by the server. Note: both ConnectionMessage and ServerMessage are published locally only. Remote eBus applications cannot subscribe to these notifications.

eBus allows each remote connection to be configured independently. The first and most important remote connection property/preference key is eBus.connections. This property is a comma-separated list of remote connection names. Each name is used in eBus.connection.name property/preference key. If a eBus.connection key contains a name which does not appear in eBus.connections, then that property/preference is ignored and will result in the remote eBus connection not being established.

eBus Connection Types

eBus supports three connection type: TCP, SECURE_TCP, UDP, and secure UDP. TCP is the default connection type. UDP was introduced in eBus v. 5.3.0 providing unreliable messaging between eBus applications. Secure TCP was introduced in eBus v. 4.7.0 and provides SSL/TLS-based secure socket. This secure socket requires that the user provides an SSLContext which is used to create the SSLEngine used to establish the secure SSL/TLS connection over a TCP connection. The good news is that AsyncSecureSocket has the same interface as AsyncSocket with both classes now extending AbstractAsyncSocket. Because eBus remote connections reference the underlying async socket as AbstractAsyncSocket, users can easily switch between plain-text, open socket connection and a secure connection.

Secure UDP introduced in eBus v. 6.0.0 provides DTLS-based secure datagram socket. Like secure TCP user is required to provide a SSLContext used to create the SSLEngine. The major difference between a plain datagram socket and secure datagram socket is that the user must connect the secure datagram socket to a peer in order to create a secure "connection" between the two sockets.

The bad news is that secure connection cannot be configured using eBus properties. The reason is that such configuration would require storing sensitive information in clear text, making the secure connection unsecure. This means secure connections can only be created using the connection builder API.

UDP also functions like a connection-based protocol in eBus rather than its natural connection-less manner. That is because eBus itself is connection-based and so requires the protocols it uses to behave in the same way. This means it is necessary for UDP "services" to be opened. Similar

eBus allows for multiple services to be opened. Each service is uniquely defined by connection type and port. The service configuration is used to define accepted connection configuration. This includes the same properties such as buffer sizes, heartbeat, and connection pause. Reconnection is not supported since accepted connections are not initiated by the local eBus application.

eBus now supports multicast "connections" for transmitting notification messages between eBus applications. Unlike the connection types mentioned above multicasting is not used to form a hard connection between applications. Instead notification messages are posted to the joined multicast group without knowing whether anyone is there to receive the packet. While this is a departure from how eBus works it is exactly how multicast works. See Multicast Connections for a more detailed explanation.

Sample eBus Connection Configuration

eBus can be configured on start-up by setting the eBus properties in a file and references that file in the java command line: -Dnet.sf.eBus.config.jsonFile=<conf file path> eBus release 5.1.0 supports using typesafe to configure eBus.

services : [
    {
        name : tcp-remote
        connectionType : TCP
        port : 6789
        inputBufferSize : 8192
        outputBufferSize : 8192
        messageQueueSize : 100
        canPause : true
        pause : {
            pauseTime : 10m
            maxBacklogSize : 50
        }
    },
    {
        name : udp-remote
        connectionType : UDP
        port : 6789
        inputBufferSize : 1024
        outputBufferSize : 1024
        messageQueueSize : 10
        canPause : true
    }
]

connections : [
    {
        name : local
        host : localhost
        port : 12345
        heartbeatDelay : 0
        heartbeatReplyDelay : 0
        reconnect : true
        reconnectTime : 500ms
        messageQueueSize : 0
        inputBufferSize : 4096
        outputBufferSize : 8192
        canPause : true
        pause : {
            pauseTime : 5m
            maxBacklogSize : 100
            discardPolicy : YOUNGEST_FIRST
            idleTime : 1m
            maxConnectTime : 2m
            resumeOnBacklogSize : 10
        }
    }
]

Building eBus Connections

eBus service and connection configurations can be programmatically created using the service builder and connection builder, respectively. These builders allow the above parameters to be set as shown in the above properties file example. This includes input and output buffer sizes, byte order, maximum message queue size, selector thread, heartbeating rates, and other parameters. Once the service or connection configuration is set, the EConfigure.Service or EConfigure.RemoteConnection configuration is built. For more information, see the service and remote connection documentation.

Example service and remote connection builders:

final AddressFilter filter = ...;                                               final InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 12345);
final SSLContext secureContext = ...;                                           final SSLContext secureContext = ...;
final EConfigure.ServerBuilder builder = EConfigure.serverBuilder();            final EConfigure.ConnectionBuilder builder = EConfigure.connectionBuilder();

EServer.openServer(builder.name("AppService")                                   ERemoteApp.openConnection(builder.name("Conn0")
                          .port(6789)                                                                            .address(address)
                          .connectionType(EConfigure.ConnectionType.SECURE_TCP)                                  .bindPort(0)
                          .sslContext(secureContext)                                                             .connectionType(EConfigure.ConnectionType.SECURE_TCP)
                          .addressFilter(filter)                                                                 .sslContext(secureContext)
                          .inputBufferSize(1_024)                                                                .inputBufferSize(4_096)
                          .outputBufferSize(1_024)                                                               .outputBufferSize(8_192)
                          .byteOrder(ByteOrder.BIG_ENDIAN)                                                       .byteOrder(ByteOrder.BIG_ENDIAN)
                          .messageQueueSize(10_000)                                                              .messageQueueSize(0)
                          .serviceSelector("svcSelector")                                                        .selector("connSelector")
                          .connectionSelector("connSelector")                                                    .reconnect(true)
                          .heartbeatDelayDuration.ofSeconds(60L))                                                .reconnectTime(Duration.ofMillis(500L))
                          .heartbeatReplyDelay(Duration.ofSeconds(30L))                                          .heartbeatDelay(Duration.ZERO)
                          .build());                                                                             .heartbeatReplyDelay(Duration.ZERO)
                          .build())

Both the server and remote connection are configured to use the SSL/TLS-based secure TCP connection. This connection type requires that the user provides the necessary SSLContext needed to establish the secure connection. How that SSLContext is securely created is a user responsibility.

Pausing eBus Connections

eBus release 5.1.0 introduces connection pause. A client eBus may request that a connection be paused for a given duration and that the far-end queue up undelivered messages to a maximum backlog size. The far-end server eBus may accept or reject the pause request. If accepted, the far-end response contains the pause duration and backlog size allowed. These values will be ≤ the requested values. When the client receives this acceptance pause response, the connection is closed. The client then reconnects after the agreed upon delay and receives the undelivered messages (if any).

The client may resume the connection earlier than the agreed upon pause delay if it's local message queue reaches a configured limit. For example, both ends agree to pause for 5 minutes but the client eBus is configured to reconnect when the backlog contains 10 application messages (system messages are not used to calculate queue size). When the backlog has 10 application messages after 3 minutes, the client eBus resumes the paused connection.

Client connections are automatically paused after a specified idle time during which no messages are sent or received or after a maximum connect time. This is to prevent a busy connection from keeping the connection up permanently.

The pause feature is targeted for mobile devices which cannot maintain a network connection for any length of time without excessive battery drain.

eBus Protocol Stack

The following demonstrates how the eBus protocol stack works and how the configuration impacts the protocol stack.

eBus Binary Protocol Stack
Level Description Input Output
ERemoteApp

Responsible for maintaining a connection to a remote eBus application.

eBus.connection.name.host:
the remote eBus application service is open on this host.

eBus.connection.name.port:
the remote eBus applciation service is open on this port.

eBus.connection.name.bindPort:
bind the connection local side to this port.

Forwards messages to the target ESubject or ERequest instances except the logon, logon reply, and logoff messages.

Passes outbound system and user messages to the associated ERemoteApp. If the EConnection outbound message queue overflows, then the connection is closed and all queued messages are discarded. If the connection is set to reconnect, then the reconnect timer is set.

eBus.connection.name.reconnect: if true, then a lost connection is re-established. The default value is false.

eBus.connection.name.reconnectTime: specifies the rate at which reconnect attempts are made. The default value is 5 seconds. This setting is ignored if the reconnect is set to false.

ETCPConnection Responsible for serializing outbound messages and deserializing inbound messages. Also responsible for queuing up outbound messages when the socket output buffer is full. Sends the enqueued messages when the buffer overflow condition clears.

De-serializes eBus messages directly from the AsyncSocket input buffer. Posts de-serialized messages to an associated callback thread. There is one callback thread per EConnection instance.

eBus.connection.name.heartbeatReplyDelay: if > zero, then wait this many milliseconds for a reply to a heartbeat. This timer is reset every time data is received from the far-end. This setting is ignored if heartbeatDelay is not set.

Serializes outbound message directly to the socket output buffer using the BufferWriter interface. Throws a BufferOverflowException if the socket output buffer overflows. This exception is caught by EConnection and the message is posted to the message queue. When the socket output buffer is no longer full, forwards enqueued messages until the queue is either empty or the socket output buffer is again full.

eBus.connection.name.messageQueueSize: if set and this size is exceeded, then the connection is immediately closed and the upstream ERemoteApp notified.

AsyncSocket Interface between EConnection and the Selector Thread. Encapsulates the SelectableChannel.

Inbound bytes are retrieved directly to a socket input buffer. When a read occurs, the newly copy input buffer is directly passed to ERemoteApp connection for processing via the SocketListener.handleInput callback.

eBus.connection.name.inputBufferSize: specifies the AsyncSocket input buffer size which is the maximum number of bytes read at any one time. Defaults to 2,048 bytes.

Outbound messages are serialized to a socket output buffer. If the output buffer size is exceeded, then throws a BufferOverflowException.

The message is transmitted immediately if the socket output buffer has no other pending messages.

eBus.connection.name.outputBufferSize: specifies the socket output buffer fixed size. Defaults to 2,048 bytes.

Selector Thread

Watches SelectableChannel instances and performs the actual read, write and accept operations.

Note: there is only one SelectorThread instance in an eBus application. The selector thread properties/preferences settings apply to all remote eBus application connections.

Reads bytes from a SocketChannel into the AsyncSocket input buffer.

The Selector Thread passes the AsyncSocket output buffer to the SocketChannel(ByteBuffer) for transmission.

The one weakness to this design is if SelectorThread loses its CPU, then all remote eBus messaging stops until the Selector thread re-gains a CPU. For this reason, the Selector thread prior is set to Thread.MAX_PRIORITY in an attempt to keep the thread on a CPU.

Multicast Connections

eBus multicast connections are built around one or more notification multi-feeds. A multicast connection either acts as a publisher or a subscriber. If a publisher then subscription multi-feeds are used to receive notifications with remote scope and the send those messages to the joined multicast group. If a subscriber then publish multi-feeds are used to post received multicast notifications to the local eBus. Again these multi-feeds have a remote feed scope.

Like TCP and UDP, multicast connections may be created using either typesafe configuration or the eBus API. Java Properties configuration is not supported. Also (while this may be obvious) multicast connections cannot be paused.

Sample eBus Multicast Configuration

These two configurations are for a publisher and subscriber multicast connection designed to communicate across the same group. Required properties are in bold.

multicastGroups : [                                            multicastGroups : [
  {                                                              {
    name : MCAST-PUB-0                                             name : MCAST-SUB-0
    multicastRole : PUBLISHER                                      multicastRole : SUBSCRIBER
    multicastGroup : "230.0.0.0"                                   multicastGroup : "230.0.0.0"
    targetPort : 5000                                              targetPort : 5001
    networkIF : en8                                                networkIF : en8
    protocolFamily : INET                                          protocolFamily : INET
    bindPort : 5001                                                bindPort : 5000
    order : LITTLE_ENDIAN                                          order : LITTLE_ENDIAN
    selector : "mcastSelector"                                     selector : "mcastSelector"
    inputSize : 512                                                inputSize : 512
    outputSize : 512                                               outputSize : 512

    # EMultiPublishFeeds defining notification messages            # EMultiSubscribeFeeds defining notification messages
    # posted to the multicast group.                               # received from multicast group and published to eBus.
    multicastKeys : [                                              multicastKeys : [
      {                                                              {
        multifeedType : LIST                                           multifeedType : QUERY
        messageClass : com.acme.personal.TextMessage                   messageClass : com.acme.personal.TextMessage
        subjectList : [                                                subjectQuery : "[A-M][A-Z]*"
          "Tom",                                                       isDynamic : false
          "Dick",                                                    },
          "Harry"                                                    {
        ]                                                              multifeedType : LIST
      },                                                               messageClass : com.acme.MarketData.Trade
      {                                                                subjectList : [
        multifeedType : QUERY                                            "ABCD",
        messageClass : com.acme.marketData.Trade                         "EFG",
        subjectQuery : "[A-M][A-Z]*"                                     "STU",
        isDynamic : true                                               ]
      }                                                               }
    ]                                                               ]
  }                                                              }
]

Building eBus Multicast Connections

Multicast connections are created in two steps: first the multicast configuration is created and then that configuration is used to join the multicast group. This is demonstrated below creating MCAST-PUB-0 shown above.

final EConfigure.MulticastBuilder mcastBuilder = EConfigure.multicastBuilder();
final InetAddress group = InetAddress.getByName("230.0.0.0");
final NetworkInterface netIF = NetworkInterface.getByName("en8");
final List<String> textSubjects = com.google.common.collect.ImmutableList<>.of("Tom", "Dick", "Harry");
final EConfigure.McastNotifyConfig listFeed =
    (EConfigure.notificationBuilder()).feedType(EConfigure.MultifeedType.LIST)
                                      .messageClass(com.acme.personal.TextMessage.getCanonicalName())
                                      .subjectList(textSubjects)
                                      .build();
final EConfigure.McastNotifyConfig queryFeed =
    (EConfigure.notificationBuilder()).feedType(EConfigure.MultifeedType.QUERY)
                                      .messageClass(com.acme.marketData.Trade.getCanonicalName())
                                      .subjectQuery("[A-M][A-Z]*")
                                      .build();
final List<EConfigure.McastNotifyConfig> mcastKeys = com.google.common.collect.ImmutableList<>.of(listFeed, queryFeed);
final EConfigure.MulticastConnection mcastConfig =
    mcastBuilder.name("MCAST-PUB-0")
                .role(EConfigure.MulticastRole.PUBLISHER)
                .group(group)
                .targetPort(5001)
                .networkInterface(netIF)
                .protocolFamily(StandardProtocolFamily.INET)
                .bindPort(5000)
                .byteOrder(ByteOrder.LITTLE_ENDIAN)
                .inputBufferSize(512)
                .outputBufferSize(512)
                .notifications(mcastKeys)
                .build();

EMulticastConnection.openConnection(mcastConfig);

Monitoring eBus Connections

If there is a need to dynamically monitor eBus connection status (client, server, or multicast) then subscribe to the following message keys, one for each eBus connection type:

Subscribe to the messages as follows (note - these messages are published to the local eBus JVM only):


ESubscribeFeed.open(subscriber,
                    ConnectionMessage.MESSAGE_KEY,
                    FeedScope.LOCAL_ONLY,
                    condition);

eBus sends updates as a connection goes through the process of connecting, disconnecting, accepting connections, or joining a multicast group. Since there is no way to retrieve the current status (at this time) it is best to put the subscription(s) in place before opening connections or services.

eBus Extensions

eBus currently has three extension packages: net.sf.eBus.IMessageExhaust, net.sf.eBusx.io, net.sf.eBusx.monitor, and net.sf.eBusx.util.

Message Exhuast

eBus release 6.2.0 adds the interface IMessageExhaust. An application implements IMessageExhaust and passes an instance of that implementation to EFeed.setExhaust(net.sf.eBus.client.IMessageExhaust). The local eBus then passes all notification, request, and reply messages flowing through eBus to the registered message exhaust.

Note: only one exhaust instance may be may be registered at a time. If the application does not want to exhaust all messages, then the IMessageExhaust implementation is responsible for filtering out unwanted messages. If the application needs to exhaust messages to different persistent stores, again the registered exhaust is responsible for interfacing with those multiple stores.

The application is responsible for opening and closing exhaust persistent stores when appropriate. Message exhaust is "turned off" by passing a null IMessageExhaust value to EFeed.setExhaust. resulting in the default message exhaust which does nothing with the given eBus message.

IMessageExhaust.exhaust(EMessage) method is called from an eBus dispatcher thread. This means that message exhaust does not interfere with eBus message forwarding - meaning that the exhaust process is not in-line with the message forwarding process.

net.sf.eBusx.io

EFileWatcher is an eBus publisher notifying subscribers when the subscribed file or directory is created, modified, or deleted via the EFileNotification message. Subscribers use the target file or directory name as the subscription subject. Note: the file watcher service supports only subscribers within the same JVM. The file watcher is not advertised to other JVMs, whether on the same host or on a remote host.

net.sf.eBusx.monitor

This package provides eBus applications the ability to monitor other eBus application and to be monitored. Monitoring is on the object level via the Monitorable interface.

net.sf.eBusx.util

This package provides the eBus timer service. This service provides an eBus request/reply interface to the Timer service.


1 An application object can control whether it communicates with another object that is either local to its JVM and/or in another JVM using EFeed.FeedScope.

2 As homogenous as Java allows. For example, it is possible to declare a Number[] array and place Integer and Double instances into the array.

3 A reply message already posted to the requestor's message queue will be delivered. Reply messages not yet posted to that queue will not be delivered.

4 java.util.Map is not a supported eBus field type. eBus is able to use an unsupported type here is because MatchEvent is never transported to a remote application and this unsupported type is not detected.

Author:
Charles Rapp
eBus
v. 7.2.0

Copyright © 2001 - 2024. Charles W. Rapp. All rights reserved.