IM 即时通讯系统 架构设计想法 #196
Replies: 11 comments
-
broker之间是集群,相互之间是对等的,网络通就行呗 |
Beta Was this translation helpful? Give feedback.
-
@sdack-cloud 接入层的broker为Gateway,就是普通的应用,然后连接到broker上,就属于app->broker这个模式,所以broker下的服务 RSocket Gateway都是可以调用的。 |
Beta Was this translation helpful? Give feedback.
-
@linux-china 就可以理解 rsocket-broker 就像 nacos 差不多的角色,只不过broker 负责转发数据。但是 IM 系统第一步是大量稳定的长连接。 在 spring-rsocket 示例项目中,会把RSocket请求者放在map中,通过保存的请求者进行双向通信,这样做不会增加内存的嘛??? 这样也承载不了多少连接啊??? private final List<RSocketRequester> CLIENTS = new ArrayList<>();
private final List<RSocketRequester> CLIENTS2 = new ArrayList<>();
@PreDestroy
void shutdown() {
log.info("Detaching all remaining clients...");
CLIENTS.stream().forEach(requester -> requester.rsocket().dispose());
log.info("Shutting down.");
}
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry(RSocketRequester requester,
@Payload String client) {
requester.rsocket()
.onClose()
.doFirst(() -> {
// Add all new clients to a client list
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester);
})
.doOnError(error -> {
// Warn when channels are closed by clients
log.warn("Channel to client {} CLOSED", client);
})
.doFinally(consumer -> {
// Remove disconnected clients from the client list
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client);
})
.subscribe();
// Callback to client, confirming connection
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.", client, s))
.subscribe();
}
/**
* This @MessageMapping is intended to be used "stream <--> stream" style.
* The incoming stream contains the interval settings (in seconds) for the outgoing stream of messages.
*
* @param settings
* @return
*/
@PreAuthorize("hasRole('USER')")
@MessageMapping("channel")
Flux<Message> channel(final Flux<Duration> settings,RSocketRequester requester, @AuthenticationPrincipal UserDetails user) {
log.info("Received channel request...");
log.info("Channel initiated by '{}' in the role '{}'", user.getUsername(), user.getAuthorities());
return settings
.doFirst(() -> {
if (requester != null) {
CLIENTS2.add(requester);
}
log.warn("客户端 total {}", CLIENTS2);
})
.doOnNext(setting -> log.info("Channel frequency setting is {} second(s).", setting.getSeconds()))
.doOnCancel(() -> log.warn("The client cancelled the channel."))
.switchMap(setting -> Flux.interval(setting)
.map(index -> new Message(SERVER, CHANNEL, index)));
} 基本架构调整 |
Beta Was this translation helpful? Give feedback.
-
如果是IM通讯的场景,RSocket协议接入层使用普通的Spring Boot应用就可以,对外暴露RSocket + WebSocket,调用后端服务走Broker。 这样Gateway对应的Spring Boot RSocket可以进行横向扩展,如10,20,或者100个实例,这样就可以承受非常大的在线连接数。 至于每一个连接的信息,如用户信息,接入的Gateway服务列表,这些你可以考虑使用一个外部的存储来保存,如Redis等。 至于会员之间相互通讯,你可以接入一个轻量级的Message Bus来做,如NATS等,Gateway会订阅对应的topic,然后再将消息转发给连接该实例的客户端。 RSocket Broker主要是做服务调用,服务的实例是有限的,而且路由相对固定,属于服务调用范畴,而IM的场景,RSocket主要负责协议接入,而用户/群组之间的通讯,可能采用一个Message Bus更合适一些,而且还涉及到消息的持久化等,还涉及到存储等,这个已经超出服务调用的范畴。 个人建议你可以使用RSocket做IM消息通讯协议,Broker做服务调用,如创建和管理用户,群组等,而IM涉及到的消息转发,存储等,这个架构已经非常成熟,参考一下网上的架构就可以。 |
Beta Was this translation helpful? Give feedback.
-
这个 Gateway 是spring-cloud-gateway 嘛??? 或是Nginx 直接对应N个 Spring Boot RSocket 接入端 ??? Spring Boot RSocket 作为接入端 可以理解, 得到 RSocketRequester (请求者)然后 RSocketRequester 用 Redis 序列化。 现在先考虑 接入问题 ,接入的稳定性,接入的并发量 @linux-china |
Beta Was this translation helpful? Give feedback.
-
不用Spring Cloud Gateway,那个主要是面向HTTP REST的,当然spring cloud gateway也是基于spring boot的,你只要基于Spring RSocket稍微扩展一下就可以。 不用担心并发的问题,这个设计主要是基于Netty + Reactive的,普通的服务器就能支持到50万并发连接,好点的服务器100万没有问题。 后续还有Java的Loom,请参考 https://github.com/ebarlas/project-loom-c5m 这个是能支持500万个并发连接的。 参考项目: https://www.illumy.com/ 就是使用RSocket的。 |
Beta Was this translation helpful? Give feedback.
-
这里有好几个RSocket 接入端 都接入 rsocket-broker ,有消息要通过 RSocket的 RSocketRequester 发送出去。 |
Beta Was this translation helpful? Give feedback.
-
RSocket Broker单个节点硬件稍微好点能支持100W个并发连接,而RSocket接入端不会有这么多节点的,不用担心。 RSocket Broker主要负责服务调用,而不是承担所有IM消息的转发,消息转发这个是由IM的架构设计,这个网上有很多的,如XMPP等都有对应的架构。 而诸如消息保存,读取消息历史记录等,这个属于API范畴,这个走RSocket Broker就可以啦。 当然你也可以让RSocket Broker继续消息路由,这个也不麻烦,只要设置好对应的路由规则就可以,如用户对应的接入点ID,然后RSocket Broker就可以转发。则个都是走Zero Copy的,RSocket Broker只是看一下路由头,然后进行消息转发,没有多少资源开销,速度非常快的。 如果你的消息量非常大,只要再添加一些Broker就可以,RSocket Broker本来就是支持负载均衡的。 关于路由的规范,你可以参考Spring Cloud Spencer编写的RSocket 路由规范进行设计 https://github.com/rsocket-broker/rsocket-broker-spec/blob/master/RSocketBrokerSpecification.md 对应IM来说,应该添加 |
Beta Was this translation helpful? Give feedback.
-
RSocket接入端 单机有100 W连接,那我想要承载更多的连接,肯定是多个RSocket接入端。RSocket Broker主要负责服务调用,承载客户端的大量连接不是Broker 业务范畴。我是想 RSocket接入端 建立起与客户端 消息接收 的信道,只接收消息。那消息是从服务端 保存后 交与 接入端 投送。Broker 服务调用时会不会投送到没有 此客户端接入的接入端上。 就对于RSocket接入端 集群 有没有什么框架,或者好的文章呢? @linux-china |
Beta Was this translation helpful? Give feedback.
-
不需要框架的的,你参考RSocket Java标准实现就可以做一个IM的, RSocket浏览器端demo基本都是基于chat这个场景的。 |
Beta Was this translation helpful? Give feedback.
-
请不要关闭,可以让大家参与一下,交流想法
这是我的想法 基本架构图
在接入端的 broker 能直接调用 服务端broker 下的服务嘛????
Beta Was this translation helpful? Give feedback.
All reactions