CometD参考手册 – 3.0.9

人肉翻译,不断更新中

前言

CometD是一个可伸缩的网络事件路由总线,可以用来编写低延迟、基于服务器端、事件驱动的网络程序。典型的例子如股票交易系统,网络聊天室,在线游戏及监控台。

CometD提供了实现以下消息模式的API:发布/订阅,点到点(通过服务器中转)和远程调用。这是通过使用与传输无关的Bayeux协议实现的,Bayeux可以基于HTTP或WebSocket(或其他传输协议),因此你的程序不必使用特定的传输技术。

当时机成熟时CometD将使用WebSocket(因为他是最高效的网络消息协议),而当使用HTTP时,将使用名为Comet的Ajax推送技术。

CometD项目提供了Java和JavaScript库,以便可以用一种个简单和可移植的方式编写低延迟、基于服务器端、事件驱动的网络程序。因此,你可以集中精力处理你程序的业务,而不用担心底层细节,如传输协议(HTTP或WebSocket)、鲁棒性。CometD库提供了这些特性。

如果你是CometD的新人,你可以按照以下路径学习:

1. 阅读安装章节,下载、安装CometD并试用Demo
2. 阅读入门章节,参考手把手教程使用CometD
3. 阅读概念章节理解CometD定义的抽象概念
4. 学习随CometD一起发布的Demo
5. 阅读此手册的剩余部分更进一步了解细节

你可以通过下面的方式为CometD项目贡献或参与CometD的交流:

1. 试用并将问题报告至http://bugs.cometd.org
2. 加入CometD用户和CometD开发者邮件列表
3. 通过邮件列表或上报问题帮助完善文档
4. 在你的圈子里宣传CometD

3.安装

3.1. 需求和依赖

你需要7.0或更高版本的Java开发包(JDK),符合Servlet 3.0或更高版本的Servlet容器,如Jetty。

CometD的实现依赖了几个Jetty库,如`jetty-util-ajax-<version>.jar`等。
这些Jetty依赖项通常打包你程序`.war`文件的`WEB-INF\lib`目录里,并不一定要求你将你程序的`.war`文件发布到Jetty中。你的CometD程序可以很好的在任何符合Servlet 3.0或更高版本的Servlet容器中运行。

当前CometD依赖的Jetty版本是

<jetty-version>9.2.14.v20151106</jetty-version>

3.2. 下载和安装

你可以从http://download.cometd.org/下载CometD发行包

然后将发行包解压到你指定的目录:

$ tar zxvf cometd-<version>-distribution.tgz
$ cd cometd-<version>/

3.3. 运行Demo

CometD Demo包含:

* 两个完整的聊天程序(一个使用Dojo开发,另一个使用jQuery)。
* 扩展范例,如消息通知、重载、时间同步和时间戳。
* 一个如何向指定的客户端发送私有消息的例子。
* Clustered Auction demo (using the Oort clustering).

3.3.1. 使用Maven运行Demo
如果你在设计程序原型或实验时,想要试一下CometD Demo,建议使用这种运行方式,但不推荐在生产环境中使用这种方式部署。下一章节会介绍如何在生产环境中部署你的程序。

Maven需要设置`JAVA_HOME`环境变量,并指向JDK的安装路径。

之后,就可以非常容易得运行CometD Demo了。假设`$COMETD`是CometD的安装目录,并且在此目录下可执行`mvn`:

$ cd $COMETD
$ cd cometd-demo
$ mvn jetty:run

最后一条命令启动了一个内置的Jetty监听8080端口自。现在用浏览器访问http://localhost:8080,查看CometD Demo的主页。

3.4.部署你的CometD程序

当你开发一个Comet程序时,你开发一个标准的Java EE Web程序,并打包到一个`.war`文件。你可以参考{入门章节}或{CometD教程}的例子来构建和打包你的CometD程序。

一旦你将你的CometD程序打包进了一个`.war`程序,你可以将它部署到任何支持Servlet 3.0或更高版本的Servlet容器中。

参考{此章节}了解更多关于部署到Servlet 3.0(或更高版本)具体相关说明。

3.4.1.在Jetty中部署你的CometD程序

5. 入门

5.1.准备

使用CometD API开发项目时,需要进行准备工作,特别是相关工具,可以节省你大量的时间。一个必不可少的工具是{Firebug}(如果你用Firefox做开发),或者Internet Explorer下类似的工具{Developer Tools}。

CometD项目使用{Maven}构建,因此顺理成章,也可用Maven构建你的应用。此入门教程以Maven作为建立、构建和运行你应用的基础,但其他构建工具也可以适用相同的概念。

Windows用户
如果使用Windows操作系统,避免使用包含空格的目录作为你的根目录,如“C:\Document And Settings\”。使用如“C:\CometD\”的根目录。

5.2 建立项目

你可以用两种方式建立你的项目:使用{Maven方式}或{非Maven方式}。以上两者,都可以参考{建立章节}了项目中的一些文件是如何建立的。

5.2.1 Maven方式

Setting up a project based on the CometD libraries using Maven uses the Maven archetypes, which create the skeleton of the project, in a style very similar to Rails scaffolding.

Issue the following command from a directory that does not contain a pom.xml file (otherwise you will get a Maven error), for example an empty directory:

$ cd /tmp
$ mvn archetype:generate -DarchetypeCatalog=http://cometd.org

Choose archetype:
1: local -> org.cometd.archetypes:cometd-archetype-dojo-jetty9
2: local -> org.cometd.archetypes:cometd-archetype-spring-dojo-jetty9
3: local -> org.cometd.archetypes:cometd-archetype-jquery-jetty9
4: local -> org.cometd.archetypes:cometd-archetype-spring-jquery-jetty9
Choose a number:
As you can see, there are four archetypes available that build a skeleton application using the Dojo or jQuery JavaScript toolkits, both with the choice of using Jetty 9 and/or Spring. Choose Dojo with Jetty 9, which is archetype number 1. The archetype generation requires that you define several properties and generates the application skeleton for you, for example:

Choose a number: : 1
Define value for property ‘groupId’: : org.cometd.primer
Define value for property ‘artifactId’: : dojo-jetty9-primer
Define value for property ‘version’: 1.0-SNAPSHOT: :
Define value for property ‘package’: org.cometd.primer: :
[INFO] Using property: cometdVersion = 3.0.0
[INFO] Using property: jettyVersion = 9.2.0.v20140526
[INFO] Using property: slf4jVersion = 1.7.7
Confirm properties configuration:
groupId: org.cometd.primer
artifactId: dojo-jetty9-primer
version: 1.0-SNAPSHOT
package: org.cometd.primer
cometdVersion: 3.0.0
jettyVersion: 9.2.0.v20140526
slf4jVersion: 1.7.7
Y: :

[INFO] BUILD SUCCESS
Then:

$ cd dojo-jetty9-primer/
The skeleton project now exists as follows:

$ tree .
.
|– pom.xml
`– src
`– main
|– java
| `– org
| `– cometd
| `– primer
| |– CometDInitializer.java
| `– HelloService.java
`– webapp
|– application.js
|– index.jsp
`– WEB-INF
`– web.xml
The skeleton project is ready for you to run using the following command:

$ mvn install jetty:run
Now point your browser at http://localhost:8080/dojo-jetty9-primer, and you should see this message:

CometD Connection Succeeded
Server Says: Hello, World
That’s it. You have already written your first CometD application 🙂

5.2.2. 非Maven的方式

The first step is to configure your favorite JavaScript toolkit, in the example Dojo, that the web container must serve. Using the Maven Way, this is obtained automatically by overlaying the CometD Dojo bindings WAR file, cometd-javascript-dojo-<version>.war, but here you must do it manually (the cometd-javascript-dojo-<version>.war is located in the cometd-javascript/dojo/target directory of the CometD distribution).

Download Dojo from http://dojotoolkit.org

Unpack the dojo-release-<version>.tar.gz file to a directory, for example /tmp, so that you have the /tmp/dojo-release-<version> directory, called $DOJO below.

Delete the $DOJO/dojox/cometd.js and $DOJO/dojox/cometd.js.uncompressed.js files that Dojo provides (these files are empty and just stubs for the real ones that you will put in place in a moment).

Delete the $DOJO/dojox/cometd directory that Dojo provides.

Copy the dojox/cometd.js file of the cometd-javascript-dojo-<version>.war into $DOJO/.

Copy the dojox/cometd directory of the cometd-javascript-dojo-<version>.war into $DOJO/. The content of the $DOJO/dojox/cometd directory should be the following:

dojox/cometd
|– ack.js
|– main.js
|– reload.js
|– timestamp.js
`– timesync.js
Add the org directory from the cometd-javascript-dojo-<version>.war, and all its content, at the same level of the dojox directory in $DOJO/.

The final content, equivalent to that produced by the Maven way, should be like this:

.
|– dijit
|– dojo
|– dojox
| |– cometd
| | |– ack.js
| | |– main.js
| | |– reload.js
| | |– timestamp.js
| | `– timesync.js
| `– cometd.js
|– org
| |– cometd
| | |– AckExtension.js
| | |– ReloadExtension.js
| | |– TimeStampExtension.js
| | `– TimeSyncExtension.js
| `– cometd.js
|– WEB-INF
| |– classes
| | `– org
| | `– cometd
| | `– primer
| | |– CometDInitializer.class
| | `– HelloService.class
| |– lib
| | |– bayeux-api-<version>.jar
| | |– cometd-java-common-<version>.jar
| | |– cometd-java-server-<version>.jar
| | |– cometd-java-websocket-common-server-<version>.jar
| | |– cometd-java-websocket-javax-server-<version>.jar
| | |– jetty-continuation-<version>.jar
| | |– jetty-http-<version>.jar
| | |– jetty-io-<version>.jar
| | |– jetty-jmx-<version>.jar
| | |– jetty-servlets-<version>.jar
| | |– jetty-util-<version>.jar
| | |– jetty-util-ajax-<version>.jar
| | |– slf4j-api-<version>.jar
| | `– slf4j-simple-<version>.jar
| `– web.xml
|– application.js
`– index.jsp
The org directory contains the CometD implementation and extensions, while the correspondent files in the dojox directory are the Dojo bindings. Other bindings exist for the jQuery toolkit, but the CometD implementation is the same.

The second step is to configure the server side. If you use Java, this means that you have to set up the CometD servlet that responds to messages from clients. The details of the server side configuration and service development are explained in the Java server library section.

The last step is to write a JSP (or HTML) file that downloads the JavaScript dependencies and the JavaScript application, as explained in the following section.

5.2.3. Setup Details
The JSP file, index.jsp, contains the reference to the JavaScript toolkit dependencies and to the JavaScript application file:

<!DOCTYPE html>
<html>
<head>
<script data-dojo-config=”async: true, deps: [‘application.js’], tlmSiblingOfDojo: true”
src=”${symbol_dollar}{pageContext.request.contextPath}/dojo/dojo.js.uncompressed.js”></script>
<script type=”text/javascript”>
var config = {
contextPath: ‘${pageContext.request.contextPath}’
};
</script>
</head>
<body>

</body>
</html>
It also configures a JavaScript configuration object, config, with variables that the JavaScript application might need. This is totally optional.

The JavaScript application, contained in the application.js file, configures the cometd object and starts the application. The archetypes provide:

require([‘dojo/dom’, ‘dojo/_base/unload’, ‘dojox/cometd’, ‘dojo/domReady!’],
function(dom, unloader, cometd) {
function _connectionEstablished() {
dom.byId(‘body’).innerHTML += ‘<div>CometD Connection Established</div>’;
}

function _connectionBroken() {
dom.byId(‘body’).innerHTML += ‘<div>CometD Connection Broken</div>’;
}

function _connectionClosed() {
dom.byId(‘body’).innerHTML += ‘<div>CometD Connection Closed</div>’;
}

// Function that manages the connection status with the Bayeux server
var _connected = false;

function _metaConnect(message) {
if (cometd.isDisconnected()) {
_connected = false;
_connectionClosed();
return;
}

var wasConnected = _connected;
_connected = message.successful === true;
if (!wasConnected && _connected) {
_connectionEstablished();
} else if (wasConnected && !_connected) {
_connectionBroken();
}
}

// Function invoked when first contacting the server and
// when the server has lost the state of this client
function _metaHandshake(handshake) {
if (handshake.successful === true) {
cometd.batch(function() {
cometd.subscribe(‘/hello’, function(message) {
dom.byId(‘body’).innerHTML += ‘<div>Server Says: ‘ + message.data.greeting + ‘</div>’;
});
// Publish on a service channel since the message is for the server only
cometd.publish(‘/service/hello’, {name: ‘World’});
});
}
}

// Disconnect when the page unloads
unloader.addOnUnload(function() {
cometd.disconnect(true);
});

var cometURL = location.protocol + “//” + location.host + config.contextPath + “/cometd”;
cometd.configure({
url: cometURL,
logLevel: ‘debug’
});

cometd.addListener(‘/meta/handshake’, _metaHandshake);
cometd.addListener(‘/meta/connect’, _metaConnect);

cometd.handshake();
});
Notice the following:

The use of the dojo/domReady! dependency to wait for the document to load up before executing the cometd object initialization.

The use of dojo.addOnUnload() to disconnect when the page is refreshed or closed.

The use of the function _metaHandshake() to set up the initial configuration on first contact with the server (or when the server has lost client information, for example because of a server restart). This is totally optional, but highly recommended and it is the recommended way to perform subscriptions.

The use of the function _metaConnect() to detect when the communication has been successfully established (or re-established). This is totally optional, but highly recommended.
Be warned that the use of the _metaConnect() along with the _connected status variable can result in your code (that in this simple example sets the innerHTML property) to be called more than once if, for example, you experience temporary network failures or if the server restarts.
Therefore the code that you put in the _connectionEstablished() function must be idempotent. In other words, make sure that if the _connectionEstablished() function is called more than one time, it will behave exactly as if it is called only once.

6.概念和结构

为了提供一个可伸缩的网络消息系统,CometD项目实现了多种{Comet技术},可以运行在HTTP或其他新兴的网络协议,如{WebSocket}。

6.1.定义

*客户端*是发起连接的实体,*服务器*是接受连接的实体。连接的建立是持久的——直到除非有一方决定关闭它。
典型的客户端是浏览器(毕竟这是一个网络环境),但也可以是其他的程序,如Java应用,浏览器插件或任何脚本语言的脚本。
根据使用的Comet技术不同,一个客户端可以打开多条到服务器的物理连接,但你可以认为在一个客户端和服务器之间只有一条逻辑*通道*。

CometD项目使用Bayeux协议(参考Bayeux协议章节)在客户端和服务器之间交换信息。信息交换的单元是{JSON}格式的Bayeux*消息*。一条消息包含多个*字段*,一些是Bayeux协议要求的,另一些是程序添加的。一个字段是一个键/值对;也就是说,如果一条消息有*foo*字段,就意味着它有一个键为字符串*foo*的字段。

所有的客户端和服务器交换的消息都有一个*channel*字段。channel字段提供了类中消息的特征。通道是CometD中的核心概念:发布者发布消息到通道中,订阅者订阅通道来接收消息。在CometD API中可以明显体会到这点。

6.1.1.通道定义

通道是一个看起来像URL路径的字符串,如`/foo/bar`,`/meta/connect`或`/service/chat`。

Bayeux规格定义了三种通道类型:*元通道*,*服务通道*和*广播通道*。

以`/meta/`开头的通道是元通道,以`/service/`开头的是服务通道,所有其他的通道是广播通道。

一条channel字段为元通道的消息看作是一条元消息,同样的还有服务消息和广播消息。

服务通道和广播通道由程序创建;程序可以根据需要在任意时间创建任意多个通道。

6.1.1.1.元通道

CometD实现创建了元通道;程序不能创建新的元通道。元通道用于给程序提供Bayeux协议(参见{此章})相关的信息;例如,握手是否成功,或是否与服务器断链或重新连接。

6.1.1.2.服务通道

程序可创建服务通道,用于客户端和程序之间请求/回应类型的通信(和广播通道的发布/订阅通信类型相对)。

6.1.1.3.广播通道

程序还可创建广播通道,具有消息主题的语义,并在发布/订阅通信方式的情况下使用,以便于一个发送者将信息广播给多个接收者。

6.1.1.4.在通道中使用通配符

你可以使用通配符来匹配多个通道:通道`/foo/*`匹配`/foo/bar`,但不匹配`/foo/bar/baz`,后者使用`/foo/**`来匹配。你可以在任何类型的通道中使用通配符:`/meta/*`匹配所有元通道,`/service/**`匹配`/service/bar`和`/service/bar/baz`,通道`/**`匹配所有通道。

你仅能在通道的最后一段中使用通配符,所以`/**/foo`和`/foo/*/bar`是无效的。

6.1.1.5.在通道中使用参数

你可以在通道中使用段参数:`/foo/{id}`。含有段参数的通道也成为*模板*通道,因为它们定义了一个匹配真实通道的模板,通过将模板通道的参数绑定到具体的值。模板通道用在注解服务中,他们的用法参见{注解监听者}和{注解订阅者}。比如,当`/news/{category}`绑定到通道`/news/sprots`时,参数`category`将被绑定到`”sport”`字符串。

模板通道仅绑定那些拥有相同段数的通道。例如,对于`/news/{category}`来说,`/news`不会绑定(参数太少),`/news/sport/athletics`不会绑定(参数太多),`/other/channel`不会绑定(非参数端不同),而`/news/football`将把`category`绑定为`”football”`.

一个模板通道不能同时是一个通配符通道,所以`/foo/{id}/*`或`/foo/{var}/**`是无效的。

6.2.上层视图

CometD实现了一个网络消息系统,尤其是基于{发布/订阅}模型的网络消息系统。

在发布/订阅模型的消息系统中,发布者发送具有分类特征的消息。订阅者关心一类或多类消息,并只接受他们订阅的所关心的消息。发送者通常不知道有哪些和多少接收者接收了它们所发的消息。

CometD实现了`轴辐`拓扑。在默认的配置中,这意味着一个中心服务器(轴),所有的客户端(辐)通过管道链接到服务器。

在CometD中,服务器从发布者接收消息,如果消息通道是广播通道,将消息转发给关注它的订阅者。对于元消息和服务消息,CometD服务器有不同的处理方法;这些消息不会被转发给任何订阅者(默认情况下,禁止订阅元通道,而订阅服务通道是一个无效操作)。

例如,假设`客户端AB`订阅了通道`/A`和`/B`,`客户端B`订阅了通道`/B`。如果一个发布者在通道`/A`上发布了一条消息,只有`客户端AB`会接收到它。另一方面,如果发布者在通道`/B`上发布了一条消息,`客户端AB`和`客户端B`都可以收到这条消息。此外,如果一个发布者在通道`/C`上发布了一条消息,`客户端AB`和`客户端B`都无法收到这条消息,这条消息将在服务器端结束旅程。转发广播消息是服务器的默认行为,无需编写任何程序代码来实现转发。

从上层看,你可以看到消息在客户端和服务器端之间通过管道来回流动。一条广播消息到达服务器后,可以能被转发给所有的客户端;你可以推想,当一条消息到达服务器后,它将被复制并发送到每个客户端(尽管,处于效率原因,实际消息并没有被复制)。如果发送者也订阅了它发送消息的那个通道,它也将收到自己发送的消息。

6.3.低层视图

下面的章节深入介绍CometD是如何实现的。

此时,我们应该清楚的知道,CometD的核心,是一个通过Bayeux协议通信的客户端/服务器系统。

在CometD的实现中,以{half-object plus protocol}设计模式捕获客户端/服务器端的通信:当客户端上的半对象与服务器建立了一个通信管道时,服务器端也会创建一个响应的半对象,这两者逻辑上可以通信。CometD使用了这种设计模式的一个变种,因为需要把携带消息进出服务器的传输层抽象出来。传输层可以基于HTTP协议,但最近的CometD版本也支持WebSocket协议(你也可以加入更多的传输层)。

广义上来说,*客户端*是由客户端半对象和客户端传输层组成的,而服务器端是由多组服务器半对象和服务器传输层组成的更复杂的实体。

6.3.1.会话

会话是CometD中的核心概念。它们是协议传输中半对象的表现形式。

有三种会话:
* 客户端会话——远程客户端侧的客户端半对象。客户端会话在JavaScript中表现为org.cometd.CometD对象,在Java中表现为org.cometd.bayeux.client.ClientSession类(更常见的是它的子类org.cometd.bayeux.client.BayeuxClient)。客户端创建一个客户端会话以建立与服务器端的Bayeux通信,以便发送和接收消息。

* 服务器会话——服务器侧的服务器半对象。服务器会话在服务器上,表现为org.cometd.bayeux.server.ServerSession类;它们与客户端会话相对应。当一个客户端创建了一个半对象时,它一开始并没有和相应的服务器会话关联。仅当客户端会话和服务器建立了Bayeux通信后,服务器才创建相应的服务器会话和两个半对象之间的关联。每个服务器会话都有一个消息队列。发布到通道中的消息必需分发给订阅了此通道的远程客户端会话。这些消息首先被加入了服务器会话的消息队列,然后分发给相应的客户端会话。

* 本地会话——服务器侧的客户端半对象,表现为org.cometd.bayeux.server.LocalSession类。本地会话可以看做是服务器上的客户端。他们并不表现为一个远程客户端,而是一个服务器侧客户端。本地会话可以像和客户端会话一样,订阅通道和发布消息,但它们存在于服务器上。服务器只知道服务器会话,创建服务器会话的唯一方法就是先创建相应的客户端会话,然后与服务器建立Bayeux通信。因此,在服务器端,本地会话有一个额外的概念。本地会话是是一个恰好在服务器上的客户端会话,因此是服务器的一部分。

例如,假设一个远程客户端每当它的状态改变时就发布一条消息。其他远程客户端可以订阅这个通道,并接受这些状态更新消息。但如果你需要在服务器上根据远程客户端的状态更新做一些操作怎么办?因此,你需要一个相当于远程客户端的东西,但是运行在服务器上,这就是本地会话。
服务器侧服务与本地会话相关联。根据服务器端创建的服务,本地会话与服务器握手并创建相应的服务器会话的半对象,因此服务器可以用相同的方式对待客户端会话和本地会话(因为他们都被看作是服务器会话)。服务器将发送到的通道的消息分发给所有订阅了这个通道的服务器会话,无论它们是远程客户端会话或本地会话。
查看{服务章节}了解更多信息。

6.3.2.服务器

服务器表现为`org.cometd.bayeux.server.BayeuxServer`的一个实例。`BayeuxServer`对象可以作为:
* 服务器会话的仓库,参考{会话概念章节}。
* 服务器传输层的仓库——表现为`org.cometd.bayeux.server.ServerTransport`类。服务器传输层是一个处理与客户端通信细节的服务器侧组件。有HTTP服务器传输层,也有WebSocket服务器传输层,你也可以加入其他类型。服务器传输层抽象了传输细节,这样程序就可以只感知Bayeux消息,而无需知道他们是如何到达服务器的。
* 服务器通道的仓库——表现为`org.cometd.bayeux.server.ServerChannel`类。一个服务器通道是通道在服务器侧的表现形式;它可以接收和发送Bayeux消息。
* 扩展仓库——表现为`org.cometd.bayeux.server.BayeuxServer.Extension`类。扩展允许程序通过修改、删除或重发传入/传出的Bayeux消息来与Bayeux协议交互。
关于扩展的更多信息,请查看{扩展章节}。
* 中央授权机关,通过一个*安全策略*的实例——表现为`org.cometd.bayeux.server.SecurityPolicy`类。CometD会查询安全策略来授权任何服务器进行的敏感操作,如握手,通道创建,通道订阅和通道发布。应用可以提供它们自己的安全策略来实现他们自己的授权逻辑。
关于安全策略的更多信息,请查看{授权章节}。
* 授权者——表现为`org.cometd.bayeux.server.Authorizer`类,允许你应用更惊喜的授权策略。
关于授权者的更多信息,请查看{授权者章节}。
* 消息处理器,协调服务器传输层、扩展和安全策略,实现了一个消息流算法(参见{消息处理章节}),以便于应用程序通过消息和通道交互来实现自己程序逻辑的。

6.3.3.监听者

程序使用*监听者*与会话通道和服务器交互。Java和JavaScript API允许应用注册不同的监听者,并且接收相应事件的通知。你可以认为扩展、安全策略和授权者是一种特殊的监听者。接下来的章节就是这样处理它们的。

6.3.3.1.客户端会话和监听者

客户端会话监听者的范例如下:
* 你可以通过`ClientSession.addExtension(ClientSession.Extension)`为客户端会话添加一个扩展来处理服务器端接收到和发送的消息。
* 客户端会话是通道的仓库;你可以通过`ClientSession.getChannel(String).addListener(ClientSessionChannel.MessageListener)`为通道添加一个消息监听者,当消息到达此通道的时候,你将会收到通知。

6.3.3.2.服务器和监听者

在服务器端,模型类似,但更加丰富。
* 你可以通过`BayeuxServer.addExtension(BayeuxServer.Extension)`为`BayeuxServer`示例添加扩展来处理所有流经服务器的消息。
* `BayeuxServer`允许你通过`BayeuxServer.addListener(BayeuxServer.ChannelListener)`添加监听者来监听通道创建或销毁时的通知,通过`BayeuxServer.addListener(BayeuxServer.SessionListener)`添加监听者来监听服务器会话创建或销毁时的通知。
* `ServerChannel`允许你通过`ServerChannel.addAuthorizer(Authjorizer)`添加授权者,当消息通过`ServerChannel.addListener(ServerChannel.MessageListener)`到达通道或客户端通过`ServerChannel.addListener(ServerChannel.SubscriptionListener)`订阅或取消订阅时,监听者会收到通知。
* `ServerSession`允许你通过`ServerSession.addExtension(ServerSession.Extension)`添加扩展来获取流经服务器的消息。
* `ServerSession`允许你通过`ServerSession.addListener(ServerSession.RemoveListener)`添加监听者,当会话被移除(如由于客户端断链,或因为客户端消失而导致相应的服务器会话过期)时将会收到通知。
* `ServerSession`允许你通过`ServerSession.addListener(ServerSession.QueueListener)`添加监听者来与服务器会话的消息队列交互,如检测何时消息被加入了队列,或通过`ServerSession.addListener(ServerSession.MaxQueueListener)`来检测合适消息队列已满,或通过`ServerSession.addListener(ServerSession.DeQueueListener)`来检测合适队列准备好了发送。
* `ServerSession`允许你通过`ServerSession.addListener(ServerSession.MessageListener)`添加监听者来接收服务器会话收到消息(无论是哪个通道)时的通知。

6.3.4.消息处理

此章节详述了客户端和服务器的消息处理过程。使用下图来理解客户端和服务器的详细构造视图。

当客户端发送消息时,它使用客户端侧的通道发送他们。客户端通过`ClientSession.getChannel(String)`从客户端会话检索客户端通道。消息首先被传递到一条一条处理消息的扩展;如果某个扩展拒绝处理消息,消息将被删除,不会发送给服务器。在扩展处理结束后,消息传递给客户端传输层。

客户端传输层把消息转换为JSON(对于Java客户端,这是由`JSONContext.Client`实例实现的,参见{JSON章节}),建立与服务器传输层的管道,然后把JSON字符串作为传输层封包载荷(如HTTP请求或WebSocket消息)通过管道发送给服务器。
封包传输到了服务器,服务器的传输层接收它。服务器传输层把消息从JSON格式转回消息对象(通过`JSONContext.Server`实例,参见{JSON章节}),然后把它们传递给`BayeuxServer`实例处理。

`BayeuxServer`按如下步骤处理每条消息:

1. 调用`BayeuxServer`扩展(`rcv()`或`rcvMeta()`方法);如果某个扩展拒绝处理消息,将会回复客户端消息已经被删除了,不会再进行处理。
2. 调用`ServerSession`扩展(`rcv()`或`rcvMeta()`方法,仅当此客户端相应的`ServerSession`存在时);如果某个扩展拒绝处理消息,将会回复客户端消息已经被删除了,不会再进行处理。
3. 调用授权检查安全策略和授权者;如果授权被拒绝,将会回复客户端消息已经被删除了,不会再进行处理。
4. 如果是服务消息或广播消息了,消息将会通过`BayeuxServer`扩展(send()或sendMeta()方法)。
5. 调用服务器通道监听者;程序在服务器上添加服务器通道监听者,并在消息最终发送给所有订阅者之前(如果是广播消息),提供修改消息的机会。所有的订阅者将会看到服务器监听者修改过的消息,因为发布者发送的是修改后的消息。在服务器通道监听者处理后,消息被*冻结*,且不应再被修改。程序不需要关心冻结这个步骤,因为API会说新消息是否可以被修改:API有一个参数标示消息对象是否可以被修改。这也是处理非广播消息的最后一步,因此它在服务器上的旅程到此为止。一条回复将会发送给发布者确认消息已经传输到了服务器(见后文),但消息还没有被广播到其他服务器会话。
6. 如果消息是一条广播消息,对每个订阅了这个通道的服务器会话来说,消息会传递给`ServerSession`扩展(send()或sendMeta()方法),然后服务器会话消息队列监听者会被调用,最后,消息被添加到服务器会话队列来发送。
7. 如果是懒汉消息(参见{懒汉消息章节}),它将被第一时间发送。否则消息会被立刻分发。如果消息所在队列的服务器会对应远程客户端会话,它将指定一个线程通过服务器传输层分发消息队列里的消息。服务器传输层清空服务器会话消息队列,把消息转换为JSON,并通过管道以传输层特定的封包(如,HTTP请求或WebSocket消息)负载发送它们。否则,如果消息所在队列的服务器会话对应本地会话,队列中得消息将直接分发给本地会话。
8. 对于广播和非广播消息,都会创建回复消息,通过`BayeuxServer`扩展和`ServerSession`扩展(send()或sendMeta()方法)。然后传递到服务器传输层,并通过`JSONContext.Server`实例转换为JSON(参见{JSON章节}),并通过管道以传输层特定的封包(如,HTTP请求或WebSocket消息)负载发送它们。
9. 封包传回到客户端,客户端传输层接收它,客户端传输层将消息从JSON转换回消息对象,对于Java客户端,用的是`JSONContext.Client`实例(参见{JSON章节})。
10. 每条消息然后传递给扩展(send()或sendMeta()方法),通道监听者和订阅者将收到消息的通知。

6.3.5.线程

当服务器接收到Bayeux消息时,将会分配一个线程处理消息,线程里会调用服务器侧的监听者。CometD的实现不产生新线程来调用服务器的监听者;通过这个方法,保持了线程模型的简洁,也和Servlet的线程模型很相似。

这个简单的线程模型暗示了如果一个服务器侧监听者花了很长时间来处理消息,再把控制权返回给实现,那么实现可能就无法处理下条消息,通常会阻塞整个服务器的处理。

这建立在Bayeux客户端使用有限的连接与服务器交互的事实上。如果服务器花了很长时间处理发送给一个连接的一条消息,客户端可能在这个连接上发送其他消息。但它们不会被处理,直到处理完前一条消息。

因此程序是否直到一条消息将会引发耗时的任务(如数据库查询)很重要,他应该在单独的线程中进行。

服务(参见{java服务器服务器章节})是配置服务器侧监听者的一种简单的方法,然将会和普通的服务器侧监听者共享相同的线程模型:如果他们需要处理耗时任务,需要在单独的线程里处理,如:

@Service
public class MyService
{
@Inject
private BayeuxServer bayeuxServer;
@Session
private LocalSession localSession;

@Listener(“/service/query”)
public void processQuery(final ServerSession remoteSession, final ServerMessage message)
{
new Thread()
{
public void run()
{
Map<String, Object> data = performTimeConsumingTask(message);

// Send data to client once the time consuming task is finished
remoteSession.deliver(localSession, message.getChannel(), responseData);
}
}.start();
}
}

6.3.6.程序交互

现在你知道程序是如何通过监听者与CometD交互的,以及客户端和服务器是如何处理消息的,你需要知道程序在处理他的业务逻辑时,需要与消息做些什么交互。

6.3.6.1.服务器侧授权

程序想要与授权交互,它必须注册一个自定义`SecurityPolicy`的实例,并重写`SecurityPolicy.canHandshake(…)`方法。`SecurityPolicy`可以通过从握手请求中取出握手回复来定制握手回复(如,给出授权失败的详细信息):

public class MySecurityPolicy extends DefaultSecurityPolicy
{
public boolean canHandshake(BayeuxServer server, ServerSession session, ServerMessage message)
{
boolean authenticated = authenticate(session, message);

if (!authenticated)
{
ServerMessage.Mutable reply = message.getAssociated();
// Here you can customize the reply
}

return authenticated;
}
}

6.3.6.2.元消息和服务器消息交互

元消息和服务器消息的旅程到服务器就结束了。程序只能通过服务器通道监听者来与这类消息交互,因此必须使用这些监听者来实现业务逻辑。

你可以通过如下方式添加服务器监听者:
* 在初始化的时候直接通过API(见{服务集成章节})。
* 间接通过继承服务(见{继承服务章节})。你可以通过调用`AbstractService.addService(…)`或通过使用`@Listener`注解来注解服务(见{注解服务章节})实现这一点。

程序应当为需要在服务器侧监听者处理的耗时任务分配单独的线程,避免阻塞其他传入消息的处理(参考{线程章节})。

6.3.6.3.广播消息交互

广播消息到达服务器并且非分发给所有订阅了此消息通道的`ServerSessions`。程序课以通过服务器通道监听者或`LocalSession`订阅消息通道与广播消息交互(与非广播消息一样,见上文)。

你在初始化的时候直接通过API(见{继承服务章节})或间接通过使用`@Subscription`注解来注解服务(见{注解服务章节})实现第二个方案。
程序应当为需要在服务器侧监听者处理的耗时任务分配单独的线程,避免阻塞其他传入消息的处理(参考{线程章节})。

6.3.6.4.与特定的远程客户端通信

想要给特定客户端分发消息的程序可以通过查找与之对应的服务器会话并使用`ServerSession.deliver()`分发消息。

例如,远程客户端`client1`想要发送消息给另外一个远程客户端`client2`。两个客户端都已经连接并且已经和服务器进行了握手。它们的握手包含了附加的`userId`信息,`client1`称为”Bob”,`client2`称为`Alice`。程序可以使用`SecurityPolicy`或`BayeuxServer.SessionListener`来映射`userId`与服务器会话id,如{授权章节}所示。

现在Bob想要给Alice发送一条私有消息。

`client1`可以使用服务通道发送私有消息(如/service/private),这样消息就不会被广播,程序设置成由一个服务器监听者将到达`/service/private`的消息路由到其他的远程客户端。

@Service
public class PrivateMessageService
{
@Session
private ServerSession session;

@Listener("/service/private")
public void handlePrivateMessage(ServerSession sender, ServerMessage message)
{
// Retrieve the userId from the message
String userId = message.get("targetUserId");

// Use the mapping established during handshake to
// retrieve the ServerSession for a given userId
ServerSession recipient = findServerSessionFromUserId(userId);

// Deliver the message to the other peer
recipient.deliver(session, message.getChannel(), message.getData());
}
}

 

1 comment — post a comment

你大爷

dabian

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注