Skip navigation links
eBus
4.5.0
eBus
A middleware API
Updated: October 29, 2017
Version: 4.5.0

See: Description

Packages 
Package Description
net.sf.eBus.client
This package contains the top-level eBus API which applications use to interact with eBus.
net.sf.eBus.client.sysmessages
This package defines the eBus system messages.
net.sf.eBus.database
Provides a simplifying interface on to of the java.sql package.
net.sf.eBus.messages
Messages are the heart of eBus.
net.sf.eBus.messages.type
Provides classes for translating Java built-in types to and from binary encoding.
net.sf.eBus.net
Adds a layer between the application and the java.nio.channels package.
net.sf.eBus.text
Lexer takes a given input and returns a series of analyzed tokens.
net.sf.eBus.util
Contains supporting utility Java classes for net.sf.eBus.
net.sf.eBus.util.logging
Supplements java.util.logging package with a rolling log file handler, a pattern formatter and a logged status report.
net.sf.eBus.util.regex
This regular expression package is designed to perform efficient pattern matching over a TernarySearchTree.
net.sf.eBus.xml
Provides the XML document and XML tag classes used to define an XML document in computer memory.
net.sf.eBus.xml.parser
Converts an XML document from text to a XmlDocument.
net.sf.eBusx.io
This package provides the ability to monitor file/directory creation, modification, and deletion via the file notification.
net.sf.eBusx.monitor
This package provides messages to monitor both the on-going state and transient events of an eBus application at the object-level.
net.sf.eBusx.test
Provides classes supporting JUnit testing.
net.sf.eBusx.util
This package provides an eBus interface for accessing Timer API.
eBus
A middleware API
Updated: October 29, 2017
Version: 4.5.0

Introduction

eBus provides two services: message routing and object task execution. Messages are routed between objects that are either in the same process, between processes on the same host, or between processes on different hosts - and the objects are unaware of where the other object resides. 1 There are two types of message routing: Notification and Request/Reply. eBus uses a broker-less, type+topic-based message routing architecture. Broker-less means that a separate message routing server is not used. Rather messages are transmitted directly between objects via the eBus API. Type+topic message routing means that the message class together with the message subject are used to route the message to subscribers. This technique provides type safety missing from topic-only routing. eBus refers to the type+topic combination as a message key.

Object execution is when eBus passes inbound messages to an application object method asynchronously using a dispatcher thread. These threads may be grouped together and associated with one or more application classes. If an application class is not assigned to a dispatcher group, then it is assigned to the default dispatcher group. An eBus client is run by only one dispatcher thread at any given time. While the client may be run by different dispatchers over time, only one dispatcher will run the client at any given moment ("run a client" means executing the client callback tasks). See EConfigure.Dispatcher for more information on configuring eBus dispatcher threads.

Messages are delivered to eBus clients asynchronously using dispatcher threads.

eBus maintains a weak reference to application objects which act as eBus clients. When eBus detects that a client is finalized, eBus automatically cleans up all the client's extant eBus feeds. Even so, it is better for an application to explicitly close its feeds when an application object is no longer needed rather than depend on automatic clean up.

The eBus Programmer's Manual covers the following information in greater detail.

eBus Messages

An eBus message is defined by extending one of the following classes and annotating the subclass with @EFieldInfo annotation:

  1. ENotificationMessage,
  2. ERequestMessage,
  3. EReplyMessage.

The annotation provides the field ordering used for both message serialization (i.e., marshalling, encoding) and message constructor. 2 The field list must consist of public final data members whose types from the following list:

eBus supported types.
boolean Boolean BigInteger BigDecimal byte Byte char Character Class Date
double Double Duration EField EFieldList EMessage EMessageList EMessageKey enum File
float Float InetAddress InetSocketAddress Instant int Integer LocalDate LocalDateTime LocalTime
long Long MonthDay OffsetTime OffsetDateTime Period short Short String URI
YearMonth ZonedDateTime ZoneId ZoneOffset <field type>[]

Any eBus field type can be turned into an homogenous 3 array by appending [] to the field name.

An eBus message class must provide a public constructor with arguments matching EFieldInfo.fields() order starting with the root class down to the leaf class. Consider the following: a user message extends EReplyMessage with class OrderReply with the annotation @EFieldInfo(fields={"parts", "shippingCost", "totalPrice"}) where all three fields are BigDecimal instances. EReplyMessage has two fields of its own: replyStatus and replyReason. EReplyMessage extends EMessage which has another two fields: subject and timestamp. The example OrderReply message class and OrderPart message field class are:

import java.math.BigDecimal;                                     import java.math.BigDecimal;
import net.sf.eBus.messages.EFieldInfo;                          import net.sf.eBus.messages.EField;
import net.sf.eBus.messages.EReplyMessage;                       import net.sf.eBus.messages.EFieldInfo;

@EFieldInfo (fields={parts, shippingCost, totalPrice})           @EFieldInfo (fields={"partId", "quantity", "price"})
public final class OrderReply                                    public final class OrderPart
    extends EReplyMessage                                            extends EField
    implements Serializable                                          implements Serializable
{                                                                {
    // Required "de-serialization" constructor.                      // Required "de-serialization" constructor.
    // The parameters match the field types and the                  // The parameters match the field types and the
    // @EFieldInfo fields order.                                     // @EFieldInfo fields order.
    public OrderReply(final String subject,                          public OrderPart(final String partId,
                      final long timestamp,                                           final int quantity,
                      final EReplyMessage.ReplyStatus status,                         final BigDecimal price)
                      final String reason,                           {
                      final OrderPart[] parts,                           this.partId = partId;
                      final BigDecimal shipping,                         this.quantity = quantity;
                      final BigDecimal totalPx)                          this.pricePerPart = price;
    {                                                                }
        super (subject, timestamp, status, reason);
                                                                     public BigDecimal totalPrice()
        this.parts = parts;                                          {
        this.shippingCost = shipping;                                    return (pricePerPart.multiply(BigDecimal.valueOf((long) quantity)));
        this.totalPrice = totalPx;                                   }
    }
                                                                     public final String partId;
    public OrderReply(final String subject,                          public final int quantity;
                      final OrderPart[] parts,                       public final BigDecimal pricePerPart;
                      final BigDecimal shipping)                 }
    {
        super (subject,
               System.currentTimeMillis(),
               EReplyMessage.ReplyStatus.OK,
               null);

        this.parts = parts;
        this.shippingCost = shipping;

        BigDecimal total = shipping;
        int i;

        for (i = 0; i < parts.length; ++i)
        {
            total = total.add(parts[i].totalPrice());
        }

        this.totalPrice = total;
    }

    public final OrderPart[] parts;
    public final BigDecimal shippingCost;
    public final BigDecimal totalPrice;
}
For this example, the message fields will be serialized in the same order, starting at EMessage root class and working down to OrderReply leaf class.
  1. subject,
  2. timestamp,
  3. replyStatus,
  4. replyReason,
  5. parts (for each part, the serialize order is partId, quantity, and pricePerPart),
  6. shippingCost, and
  7. totalPrice.
Request messages extend ERequestMessage and include a EReplyInfo annotation which declares which messages may be sent in reply to the request. Like EFieldInfo, EReplyInfo is accumulative. The allowed reply messages include those listed in the current request message and its super classes. Because ERequestMessage has a EReplyInfo which includes EReplyMessage. This means that EReplyMessage may be sent in reply to all request messages because all request messages ultimately extend ERequestMessage.
import net.sf.eBus.messages.EFieldInfo;
import net.sf.eBus.messages.EReplyInfo;
import net.sf.eBus.messages.EReplyMessage;
import net.sf.eBus.messages.ERequestMessage

@EFieldInfo (fields={parts})
@EReplyInfo (replyMessageClasses = {OrderReply.class})
public final class OrderRequest
    extends ERequestMessage
    implements Serializable
{
    public OrderRequest(final String subject,
                        final long timestamp,
                        final OrderPart[] parts)
    {
        super (subject, timestamp);

        this.parts = parts;
    }

    public OrderRequest(final String subject,
                        final OrderPart[] parts)
    {
        super (subject, System.currentTimeMillis(), parts);
    }

    public final OrderPart[] parts;
}

The messages which may be sent in reply to an OrderRequest are ERequestMessage and OrderReply. If EReplyFeed.ERequest.reply(EReplyMessage) is passed a message that is not one of the two supported reply message types, then an IllegalArgumentException is thrown in response.

Message Keys

eBus subjects are identified by a message key. This key consists of a message class and String subject. This is also known as type+topic referencing.

An example message key is: {com.acme.ProductUpdate.class, "JetSkates"}
where com.acme.ProductUpdate is a class extending EMessage.

There is no eBus-defined subject format. Message subject format (if any) is left entirely up to the application developer. The above example could be:

  • {com.acme.ProductUpdate.class, "/retail/sporting-goods/JetSkates"}
  • {com.acme.ProductUpdate.class, ".sporting-goods.retail.JetSkates"}
  • {com.acme.ProductUpdate.class, "Wil E. Coyote special: JetSkates"}

Message Key Dictionary

eBus release 4.5.0 allows applications to add message keys to and retrieve keys from the message key dictionary. Prior to release 4.5.0, message keys were added to the dictionary indirectly by opening publisher, subscriber, and reply feeds. Even then, this dictionary was not accessible' to applications.

Notification and request message keys are added using the methods EFeed.addKey(EMessageKey) and EFeed.addAllKeys(Collection<EMessageKey>). EFeed.addKey puts a single message key into the dictionary while EFeed.addAllKeys adds multiple keys to the dictionary. Note that both calls do not replace or overwrite keys that are already in the dictionary. If EFeed.addKey is called with a message key that is already in the dictionary, then the method does not update the existing message key entry. Attempting to add reply or system message keys results in an IllegalArgumentException because eBus does not store such keys in the message key dictionary.

Message keys are retrieved from the dictionary using the methods EFeed.findKeys(Class<? extends EMessage> and EFeed.findKeys(Class<? extends EMessage>, Pattern). The first call returns the all message keys associated with the given message class. The second call extends that to all keys for the message class and whose subject matches the pattern. Both methods return a non-null list. If the message class is either SystemMessage or ReplyMessage subclass, then an empty list is returned.

eBus provides the ability to store and load message key dictionaries with the methods EFeed.storeKeys(ObjectOutputStreawm), EFeed.storeKeys(Class<? extends EMessage>, ObjectOutputStream), and EFeed.storeKeys(Class<? extends EMessage>, Pattern, ObjectOutputStream). The first method stores the entire message key dictionary to the object output stream. The next two, like EFeed.findKeys, stores only those keys which match the message class and (optional) subject pattern. The stored message key stream is re-loaded with EFeed.loadKeys(ObjectInputStream). Like EFeed.addAllKeys, this method does not replace or overwrite existing message keys.

The purpose for the storeKeys and loadKeys methods is for applications to re-create the message key dictionary quickly upon start up. And the importance of this ability is to support multi-key feeds also introduced in eBus release 4.5.0. Multi-key feeds allow an eBus client to open multiple notification or reply feeds with a single feed.

Notification

Notification messages follow the advertise/subscribe paradigm together with type+topic-based routing. A notification feed is referenced by message key. A publisher opens a publish feed for a given message key and then advertises the capability of sending a notification message. When a publisher is capable of publishing the advertised notification message, it calls update feed state with an up feed state.

A subscriber informs eBus of interest in a notification message feed by opening a subscribe feed for the same message key and then subscribes to the notification feed.

eBus subject sits in the middle, tracking advertisements and subscriptions for each unique message key and deciding when the publishers should start and stop publishing based on the subscriber presence. The following table defines the states between publishers and subscribers:

0 Publishers > 0 Publishers
0 Subscribers No Feed Stop publisher feed
> 0 Subscribers Inform subscribers there are no publishers Start publisher feed

eBus uses advertisements and subscriptions to track when to:

  1. Start a feed because there is a subscriber wanting to receive the notification.
  2. Stop a feed because there are no more subscribers.
  3. Inform subscribers when a notification feed is up and to expect notifications or the feed is down and so there will be no notifications forthcoming.

Publishers and subscribers may retract their advertisements and subscriptions, respectively, at any time by calling EPublishFeed.unadvertise() and ESubscribeFeed.unsubscribe() for a particular advertisement or subscription. Both publish and subscribe feeds are still alive and may be re-advertised and re-subscribed. When an application now longer needs a feed, it should call EFeed.close() which permanently disposes the feed. The application should no longer reference a feed after closing it.

If a publisher experiences a problem preventing updates to a specific message key feed, the publisher can inform subscribers that the feed is down by calling feed.updateFeedState(EFeedState.DOWN). When the problem clears, the publisher calls feed.updateFeedState(EFeedState.UP), informing the subscribers that the feed is back up.

Note: a subscriber will see a EFeedState.DOWN feed state until a publisher both advertises the feed and calls feed.updateFeedState(EFeedState.UP). eBus does not assume that a publisher can publish notifications simply because it advertised the feed.

Request, Reply

eBus uses the advertise/request/reply paradigm to forward request messages to advertised repliers. Replies are sent directly back to the requestor. A replier advertises the ability to reply to a request by:

  1. opening a reply feed and
  2. advertising the feed.
  3. replyFeed(EFeedState.UP) marking the feed as available.
Like publish/subscribe, requests are routed to repliers using the advertised type+topic message key. A replier can unadvertise and re-advertise any number of times. But once the feed is closed, it may not be used again. Instead, a new feed must be opened.

A requestor sends a request message by:

  1. opening a request feed and
  2. posting the request message. request(msg) method returns the resulting request state. If there are no matching repliers for the request message, then an IllegalStateException is thrown. Otherwise, returns an ERequestFeed.ERequest instance which the requestor may use to cancel the request by calling EFeed.close().

The replier receives the request via the method EReplier.request(request). The replier sends a reply message via request by calling ERequestFeed.ERequest.reply(msg). The replier may send multiple replies for the same request by setting the reply status to EReplyMessage.ReplyStatus.OK_CONTINUING for all but the final reply. The last reply has the status EReplyMessage.ReplyStatus.OK_FINAL. If the request cannot be successfully handled, then the EReplyMessage.ReplyStatus.ERROR reply status should be sent. An error reply is also a final reply and no further replies may be sent afterwards. Also, an error reply may be sent even after ReplyStatus.OK_CONTINUING replies but not after ReplyStatus.OK_FINAL.

Replies are sent back to the requestor via the method ERequestor.reply(remaining, message, request) where the first argument, remaining, specifies the number of repliers which have not yet finished replying to the request. When remaining is zero, then this is the final reply and the request is finished. The request feed state is retrieved by calling ERequestFeed.ERequest.requestState().

The requestor may cancel a request any time prior to request completion by calling EFeed.close(). This puts the request feed in the canceled state. This means the request feed is now closed and may not be used again by the requestor. Note that the request may still receive replies posted to the requestor prior to the cancellation. 4

Request message definitions have an additional annotation: @EReplyInfo. This class-level annotation lists one or more reply message classes which may be sent in response to the request message. @EReplyInfo annotations are cumulative like @EFieldInfo. This means that the list of allowed reply message classes includes those in this request message @EReplyInfo class list and those in the super class @EReplyInfo class list. Since EReplyMessage is in ERequestMessage @EReplyInfo annotation, and because ERequestMessage is the base class for all request message classes, EReplyMessage is a valid reply message for all requests.

In the following example, OrderReply and EReplyMessage messages may be sent in reply to an OrderRequest but not an Invoice reply message.

import net.sf.eBus.messages.EFieldInfo;
import net.sf.eBus.messages.EReplyInfo;
import net.sf.eBus.message.ERequestMessage;

@EFieldInfo (fields={"parts"})
@EReplyInfo (replyMessageClasses = {OrderReply.class})
public final class OrderRequest
    extends ERequestMessage

eBus Roles

Objects need to implement one or more of the client interfaces in order to interact with eBus:

Role Message
Subscriber Receive Notification
Publisher Send Notification
Requestor Send Request
Receive Reply
Replier Receive Request
Send Reply

Which interface(s) should be implemented depends on which eBus message types the object wants to send or receive. An application class may implement one or all four eBus interfaces.

Subscriber

The subscriber interface is implemented so a client may receive notification messages. The subscriber life cycle consists of:

  1. Opening a feed: call ESubscribeFeed.open(ESubscriber, EMessageKey, EFeed.FeedScope, ECondition). The subscription condition is optional and may be null. If provided, then only those notifications matching the condition are forwarded to the subscriber.
  2. Subscribing to a subject: call ESubjectFeed.subscribe() to start receiving the messages from the notification feed.
  3. Receive feed state updates: eBus informs the subscriber of the subject's feed status via the feedStatus(feedState, feed) callback. If there are no publishers, then feedState is down. Otherwise, feedState is up and the subscriber can expect notifications from the feed.
  4. Receive notification messages: eBus passes the latest subject notification to a subscriber via the notify(message, feed) callback.
  5. Unsubscribe from subject: call ESubjectFeed.unsubscribe(). Once unsubscribed, the subscriber can re-subscribe by calling ESubscribeFeed.subscribe() again.
  6. Closing a feed: call EFeed.close(). If the subscription is in place when the subscribe feed is closed, the subscription is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.

Publisher

The publisher interface is implemented so a client may publish notification messages. The publisher life cycle is:

  1. Opening a feed: call EPublishFeed.open(EPublisher, EMessageKey, EFeed.FeedScope).
  2. Advertising a subject: call EPublishFeed.advertise() to register the ability to publish the given message key.
  3. Update the feed state: call EPublishFeed.updateFeedState(EFeedState) to inform eBus of the publisher's ability to publish notifications on this feed. If the publisher is capable of publishing this notification, pass in up; otherwise pass in down.
  4. Waiting for a publish status update: when the first subscriber for the notification feed arrives, eBus calls publishStatus(feedState, feed) method with EFeedState.UP. The publisher is now clear to start sending notification messages on the feed. When there are no more subscribers for the feed, publishStatus is called with a EFeedState.DOWN. The publisher should now stop posting notification messages to the feed. This mechanism results in notifications being posted to the feed only when there are subscribers registered to receive the message.
  5. Publishing notifications: notification messages are sent via EPublishFeed.publish(notification).
  6. Unadvertise when done: call ESubject.unadvertise() to retract the advertisement. Once unadvertised, the publisher may re-advertise the notification feed.
  7. Closing a feed: call EFeed.close(). If the advertisement is in place when the publish feed is closed, the advertisement is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.

Requestor

The requestor interface is implemented so a client may send requests to matching repliers. The requestor life cyle is:

  1. Opening a feed: call ERequestFeed.open(ERequestor, EMessageKey, EFeed.FeedScope).
  2. Subscribing to a feed: call subscribe(). Once subscribed, wait until feedStatus(EFeedState.UP, feed) is called, signifying that there is a replier for the request feed.
  3. Post a request: call request(message). This method returns the resulting ERequestFeed.RequestState. If ACTIVE is returned, then the request has repliers and the requestor will receive replies to the request. If DONE is returned, then there are no repliers to this request and the requestor will receive no replies.
  4. Receive replies: eBus sends replies back to the requestor via the reply(remaining, reply, request) callback where reply is the latest reply message and remaining parameter specifies the number of repliers still sending replies. When remaining is zero, then this represents the final reply.
  5. Cancel a request: if a requestor wants to cancel a request before all replies are back, then call ERequestFeed.ERequest.close(). It is still possible to receive a reply after making this call since eBus may be in the process of delivering the reply when the request was canceled.
    Note that calling EFeed.close() has the same effect as ERequestFeed.cancelRequest.
  6. Unsubscribe: call subscribe() when finished posting requests to this feed.
Note: an ERequestFeed may be used only once. A new ERequestFeed instance must be opened for each new request.

Replier

The replier interface is implemented so a client may respond to requests. The replier life cyle is:

  1. Opening a feed: call EReplyFeed.open(ERequestor, EMessageKey, EFeed.FeedScope, ECondition). The ECondtion argument is optional and may be null.
  2. Advertise a subject: call advertise() to register the ability to reply to the given request message key.
  3. Setting feed status: call updateFeedState(EFeedState.UP) to inform requestors that this replier is ready to handle requests. If a replier is temporarily prevented from responding to requests, then set the feed state to EFeedState.DOWN. When the issue is resolved, then mark the feed state UP.
  4. Wait for requests: requests are passed to this replier via the EReplier.request(request) callback. The replier does not have to reply immediately but if replying asynchronously, then request must be stored away for later use since request is used to send replies. The EReplyFeed may have receive multiple requests over time but an individual request is contained in an ERequest instance.
  5. Respond to requests: a replier sends a reply message via reply(msg).
  6. Handle canceled requests: when a request is canceled before a replier has responded, then eBus calls EReplier.cancelRequest(request). The replier should not post any reply message to request after this callback.
  7. Unadvertise when done: call EReplyFeed.unadvertise(). Once un-advertised, a reply feed may be advertised again.
  8. Closing a feed: call EFeed.close(). If the advertisement is in place when the reply feed is closed, the advertisement is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.

Lambda Expressions Callbacks

By default, eBus passes messages to application objects by calling the appropriate role interface method, like ESubscriber.notify(ENotificationMessage, ESubscribeFeed). This is fine if a subscriber opens only a single feed. But if the subscriber opens multiple feeds, each feed's messages are posted to ESubscriber.notify. The notify method then becomes a switch, untangling the inbound notifications and routing each message to its destination method where the real work is performed. In this case, ESubscriber.notify is pure overhead.

eBus v. 4.2.0 fixes this problem by allowing applications to associate callback code with a feed. Application objects are still required to implement role interface but do not have to override the role interface methods. Instead, the application registers callback code using Java lambda expressions. The steps for putting a notification subscription in place is now:

import net.sf.eBus.client.ECondition;
import static net.sf.eBus.client.EFeed.FeedScope;
import net.sf.eBus.client.EFeedState;
import net.sf.eBus.client.ESubscribeFeed;
import net.sf.eBus.client.ESubscriber;
import net.sf.eBus.client.FeedStatusCallback;
import net.sf.eBus.client.NotifyCallback;

public class MyAppClass implements ESubscriber {

    // ESubscriber interface methods are not overridden.
    private ESubscribeFeed mFeed = null;

    public void startup() {
        EMessageKey messageKey = new EMessageKey(CatalogUpdate.class, "Spring"); // Receive updates to Spring catalog.

        // The subscription condition may now be defined using a lambda expression.
        // Only updates to the catalog camping section are accepted.
        mFeed = ESubscribeFeed.open(this,
                                    messageKey,
                                    FeedScope.REMOTE_ONLY,
                                    m -> ((CatalogUpdate) m).section == CatalogSection.CAMPING);
        feed.statusCallback((feedState, feed) ->
        {
            if (feedState == EFeedState.DOWN) {
                // Handle lost feed.
            }
            else {
                // Handle gained feed.
            }
        });
        feed.notifyCallback((msg, feed) ->
        {
            // Process latest message.
        });
        feed.subscribe();
    }

    public void shutdown() {
        if (mFeed != null) {
            mFeed.close();
            mFeed = null;
        }
    }
}
Role callbacks must be put in place after the feed is opened and before the feed is advertised or subscribed. An attempt to set the callback when the feed is closed or advertised/subscribed results in a thrown IllegalStateException.

Failure to override the role interface methods and put the role callbacks in place results in eBus throwing an IllegalStateException when advertising or subscribing the feed.

eBus Feeds

Simple Feeds

Simple feeds 1) contain a single message key and 2) connect subscribers/requestors to all the simple publisher/replier feeds that exist for the same message key. What a publisher posts to simple publish feed is forwarded to all existing and subscribed simple subscription feeds. Similarly, requests posted to a simple request feed are forwarded to all existing and advertised simple reply feeds. Messages are exchanged between simple feeds only when both sides are open and in place (advertised or subscribed).

The above roles section showed application classes interacting with simple eBus feeds: EPublishFeed, ESubscribeFeed, ERequestFeed, and EReplyFeed.

Multi-Key Feeds

Multi-key feeds act as a proxy between the application object and multiple, subordinate simple feeds. The multi-key feed opens, configures, advertises/subscribes, and closes these subordinate feeds in unison. The multi-key feed makes sure the subordinate feeds are in the same state at the same time.

But it is the subordinate simple feeds which interact with the application object. The multi-key feed opens a subordinate simple feed, passing in the application object as the feed client. That means the subordinate feed calls back to the application object. If the application object creates a multi-key feed with 1,000 subordinate feeds, then the application object receives callbacks from 1,000 subordinate feeds.

Multi-key feeds behave in a similar manner to simple feeds. They are opened, may have callbacks configured, advertised/subscribed, un-advertised/un-subscribed, and closed. Note the multi-key feed configures the callback methods for the subordinate feeds based on how the multi-key feed is configured. This means that each subordinate feed calls back to the same method. It is not possible for subordinate feeds belonging to the same multi-key feed to call back to different methods.

Multi-key feeds use the same role interfaces as simple feeds:

Multi-Key Feed Role
Multi-Key Publish Feed EPublisher
Multi-Key Subscribe Feed ESubscriber
Multi-Key Reply Feed EReplier
Multi-Key Request Feed ERequestor

New subordinate feeds may be added to or removed from an open multi-key feed by specifying the message subject to be added or removed. The newly added subordinate feed references the same message class as the other feeds.

Multi-key feeds are homogenous with respect to message class but heterogenous with respect to message subject. A subscriber opens and subscribes a multi-key subscribe feed, all messages received from the subordincate feeds will be the same message class. It is not possible to open a multi-key feed for more than one message class.

Note that multi-key feeds are not eBus feeds. This is because multi-key feeds are proxy feeds, posing as an interface between application objects and eBus feeds.

Dispatching Messages to Application Objects

eBus wraps application objects in a weak-reference client. This client tracks the application object's extant feeds, undelivered message queue, and the run queue associated with the application class. The run queue ties the application object to the eBus dispatcher.

An application can configure eBus with multiple run queues. Each run queue is associated with one or more application classes where a class may appear in only one run queue class set. In other words, the run queue class sets are non-intersecting. One run queue is designated as the default and has an empty class set. An application class which does not appear in any specific run queue class set is assigned to the default run queue.

Each one run queue has one or more dispatcher threads associated with it. The threads are given the same, user-configurable priority. Each thread polls for the next ready client on the run queue. When a client is acquired, the dispatcher thread posts queued messages to the application object (more detail on this below). The run queue is application-configured to poll in four ways:

  1. Blocking: the run queue uses a lock and condition, blocking on the condition until a client is posted on the run queue and the condition is signaled.
  2. Spinning: the run queue spins in a tight loop until a client is posted.
  3. Spin+Park: the run queue alternates between spinning and parking while waiting for a client posting. The application may configure the spin limit and nanosecond park time.
  4. Spin+Yield: the run queue alternates between spinning and yielding while waiting for a client posting. The application may configure the spin limit.
Note that since a client is associated with a single run queue, that the client is either on the run queue or not, and, if on the run queue, then the client appears only once on the run queue. A dispatcher thread acquiring that client is the only dispatcher thread to hold a reference to that client.

Putting this all together, it means that application objects are referenced by only one dispatcher thread at a time, making eBus message delivery single-threaded. If an application object is referenced solely by eBus dispatcher threads, then that application object is effectively single-threaded. That application object does not need to used synchronized, java.util.concurrent.locks.Lock, atomics, etc. to protect critical sections because a single-threaded object has no critical sections.

(Note: the above does not apply to application objects which may be referenced by non-dispatcher threads.)

The basic dispatcher thread run() method is:

public void run() {

    while (true) {
        acquire next available client (blocking)

        while (client has pending messages && client remaining quantum > 0) {
            startTime = System.nanoTime();
            deliver next message to client;
            delta = (System.nanoTime() - startTime);

            update client remaining auantum;
        }

        if (client remaining quantum ≤ 0) {
            set client quantum to run queue's max quantum.
        }

        if (client has pending messages) {
            append client to run queue.
        }
    }
}
Each run queue has a configurable maximum run quantum. Each client is assigned this maximum run quantum when instantiated. This quantum prevents a single client from dominating a run queue which the client shares. Note that the client quantum is not used to preempt client message processing. The client quantum is checked after notify(msg, feed) (for example) returns. If notify(msg, feed) goes into an infinite loop, then the dispatcher thread making the call is lost to all other application objects using that run queue.

A eBus client has four run states:

  1. IDLE: the eBus client has no pending messages and is not posted to the run queue.
  2. READY: the eBus client has pending messages and is posted to the run queue.
  3. RUNNING: a dispatcher thread acquired the eBus client and is dispatching the client's pending messages to that client. This will continue until the client has no more pending messages or quantum.
  4. DEFUNCT: the underlying application object is finalized. Any and all extant client feeds are automatically retracted when this state is entered.

Starting and Stopping Application Objects

When an object opens feeds, there is a race condition because eBus may call the object back while that object is still opening feeds. Consider the following method which often occurs in eBus applications:

public void start() {

    mSubFeed = ESubscribeFeed.open(this, subKey, FeedScope.LOCAL, null);
    mSubFeed.subscribe();

    // Race condition between feedStatus callback and opening, advertising publish feed.
    mPubFeed = EPublishFeed.open(this, pubKey, FeedScope.LOCAL);
    mPubFeed.advertise();
}

@Override public void feedStatus(EFeedState feedState, ESubscribeFeed feed) {
    // mPubFeed may still be null or unadvertised causing the following line to fail.
    mPubFeed.updateFeedState(feedState);
}
The problem is that the subscription feed status callback may occur before the publish feed is opened and advertised. The simplest solution is switch the subscribe and publish feed opening. But what if the example opened more feeds with a more complex relation between feeds. It may be difficult (if not impossible) to open the feeds in the correct order.

The next solution is to synchronize the start and feedStatus methods. That would prevent the eBus callback from interrupting the object start. But that would add overhead to the feedStatus callback which is only needed for the one-time object start.

So the true problem is: how to prevent eBus callbacks from interrupting an object start. eBus v. 4.3.1 offers a solution: have the object start occur on an eBus Dispatcher thread.

If the object is starting on an eBus Dispatcher thread, then any new eBus callbacks will be delivered after the object start returns. This is done by changing start to @Override public void startup()

This method is executed on the eBus Dispatcher thread by:

  1. Registering the object with eBus: EFeed.register(object) and then
  2. Having eBus start up the object: EFeed.startup(object)
See the eBus Programmer's Manual section entitled "Gentlemen, start your objects" for detailed information on object registration and start-up.

Connecting eBus Applications

eBus applications may be connected by having one application open a service and the other applications connect to that service. See EServer and ERemoteApp for complete explanation of eBus service and connection configuration.

When two eBus applications connect, each eBus sends its local advertisements to the other. This allows local subscriptions and requests to be satisfied by remote advertisements. eBus does not forward an advertisement from one remote eBus to another remote eBus. Nor does eBus forward notifications and requests from one remote eBus to another remote eBus. eBus only routes messages from:

  1. a local client to a local client,
  2. from a local client to a remote client or
  3. from a remote client to a local client.

eBus allows only one connection to exist between two running applications. Once one eBus application connects to another, any further attempts to connect by either application will fail. eBus can also be configured to accept connections only from a predefined set of Internet addresses or even particular Internet address, TCP port pairs. This filtering is positive only. Negative filtering (accept all connections except those specified) is not supported.

eBus applications can programmatically monitor eBus connections by subscribing to the message key net.sf.eBus.client.ConnectionMessage:"/eBus". Connection messages are published when a remote eBus either logs on or logs off. Subscribe to the message key net.sf.eBus.client.ServerMessage:"/eBus" to learn when new connections are accepted by the server. Note: both ConnectionMessage and ServerMessage are published locally only. Remote eBus applications cannot subscribe to these notifications.

eBus allows each remote connection to be configured independently. The first and most important remote connection property/preference key is eBus.connections. This property is a comma-separated list of remote connection names. Each name is used in eBus.connection.name property/preference key. If a eBus.connection key contains a name which does not appear in eBus.connections, then that property/preference is ignored and will result in the remote eBus connection not being established.

Note: There is only one configuration used for all remote connections accepted by an eBus service socket. The configuration keys are prefixed by eBus.service rather than eBus.connection.name

Sample eBus properties

#
# eBus settings.
#
#
# Connections to other eBus applications.
#
eBus.connections=local

# One connection to a service ot port 12345 on the local host.
# Bind the local side to any port.
eBus.connection.local.host=localhost
eBus.connection.local.port=12345

# Heartbeating is turned off.
eBus.connection.local.heartbeatDelay=0
eBus.connection.local.heartbeatReplyDelay=0

# If the connection is lost, attempt reconnecting every
# 500 milliseconds.
eBus.connection.local.reconnect=true
eBus.connection.local.reconnectTime=500

# Read up to 4,096 bytes at a time.
eBus.connection.local.inputBufferSize=4096

# Write up to 8,192 bytes at a time.
eBus.connection.local.outputBufferSize=8192

# There is no limit to the outbound message queue.
eBus.connection.local.messageQueueSize=0

#
# eBus service port.
#
# Open a TCP service on port 6789.
eBus.service.port=6789

# Send a heartbeat after 60 seconds of no input.
eBus.service.heartbeatDelay=60000

# Wait 30 seconds for a heartbeat reply.
eBus.service.heartbeatReply=30000

# Read and write up to 8,192 bytes at a time.
eBus.service.inputBufferSize=8192
eBus.service.outputBufferSize=8192

# Queue up to 10,000 outbound messages.
eBus.service.messageQueueSize=10000

eBus Protocol Stack

The following demonstrates how the eBus protocol stack works and how the configuration impacts the protocol stack.

eBus Binary Protocol Stack
Level Description Input Output
ERemoteApp

Responsible for maintaining a connection to a remote eBus application.

eBus.connection.name.host:
the remote eBus application service is open on this host.

eBus.connection.name.port:
the remote eBus applciation service is open on this port.

eBus.connection.name.bindPort:
bind the connection local side to this port.

Forwards messages to the target ESubject or ERequest instances except the logon, logon reply, and logoff messages.

Passes outbound system and user messages to the associated EConnection. If the EConnection outbound message queue overflows, then the connection is closed and all queued messages are discarded. If the connection is set to reconnect, then the reconnect timer is set.

eBus.connection.name.reconnect: if true, then a lost connection is re-established. The default value is false.

eBus.connection.name.reconnectTime: specifies the millisecond rate at which reconnect attempts are made. The default value is 5 seconds. This setting is ignored if the reconnect is set to false.

ETCPConnection Responsible for serializing outbound messages and deserializing inbound messages. Also responsible for queuing up outbound messages when the socket output buffer is full. Sends the enqueued messages when the buffer overflow condition clears.

De-serializes eBus messages directly from the AsyncSocket input buffer. Posts de-serialized messages to an associated callback thread. There is one callback thread per EConnection instance.

eBus.connection.name.heartbeatReplyDelay: if > zero, then wait this many milliseconds for a reply to a heartbeat. This timer is reset every time data is received from the far-end. This setting is ignored if heartbeatDelay is not set.

Serializes outbound message directly to the socket output buffer using the BufferWriter interface. Throws a BufferOverflowException if the socket output buffer overflows. This exception is caught by EConnection and the message is posted to the message queue. When the socket output buffer is no longer full, forwards enqueued messages until the queue is either empty or the socket output buffer is again full.

eBus.connection.name.messageQueueSize: if set and this size is exceeded, then the connection is immediately closed and the upstream ERemoteApp notified.

AsyncSocket Interface between EConnection and the Selector Thread. Encapsulates the SelectableChannel.

Inbound bytes are retrieved directly to a socket input buffer. When a read occurs, the newly updated input buffer is directly passed to net.sf.eBus.comm.EConnection for processing via the SocketListener.handleInput callback.

eBus.connection.name.inputBufferSize: specifies the AsyncSocket input buffer size which is the maximum number of bytes read at any one time. Defaults to 2,048 bytes.

Outbound messages are serialized to a socket output buffer. If the output buffer size is exceeded, then throws a BufferOverflowException.

The message is transmitted immediately if the socket output buffer has no other pending messages.

eBus.connection.name.outputBufferSize: specifies the socket output buffer fixed size. Defaults to 2,048 bytes.

Selector Thread

Watches SelectableChannel instances and performs the actual read, write and accept operations.

Note: there is only one SelectorThread instance in an eBus application. The selector thread properties/preferences settings apply to all remote eBus application connections.

Reads bytes from a SocketChannel into the AsyncSocket input buffer.

The Selector Thread passes the AsyncSocket output buffer to the SocketChannel(ByteBuffer) for transmission.

The one weakness to this design is if SelectorThread loses its CPU, then all remote eBus messaging stops until the Selector thread re-gains a CPU. For this reason, the Selector thread prior is set to Thread.MAX_PRIORITY in an attempt to keep the thread on a CPU.

eBus Extensions

eBus currently has three extension packages: net.sf.eBusx.io, net.sf.eBusx.monitor, and net.sf.eBusx.util.

net.sf.eBusx.io

EFileWatcher is an eBus publisher notifying subscribers when the subscribed file or directory is created, modified, or deleted via the EFileNotification message. Subscribers use the target file or directory name as the subscription subject. Note: the file watcher service supports only subscribers within the same JVM. The file watcher is not advertised to other JVMs, whether on the same host or on a remote host.

net.sf.eBusx.monitor

This package provides eBus applications the ability to monitor other eBus application and to be monitored. Monitoring is on the object level via the Monitorable interface.

net.sf.eBusx.util

This package provides the eBus timer service. This service provides an eBus request/reply interface to the Timer service.


1 An application object can control whether it communicates with another object that is either local to its JVM and/or in another JVM using EFeed.FeedScope.

2 The reason for @EFieldInfo is because the reflection call Class.getFields() does not return the class fields in any particular order. @EFieldInfo has the developer define the message field ordering for serialization purposes.

3 As homogenous as Java allows. For example, it is possible to declare a Number[] array and place Integer and Double instances into the array.

4 A reply message already posted to the requestor's message queue will be delivered. Reply messages not yet posted to that queue will not be delivered.

Author:
Charles Rapp

eBus
4.5.0