前言

Socket.io 实际上有点过于底层了,大量的消息交互都需要人工手写,复用性也不强。

为什么 ShareDB 这种做腾讯文档的能够用来做消息传输呢,其原理和 Socket.io 一样,都是用了 WebSocket 连接来进行双工通信。

我们的需求是建立一个共享区域,多个设备使用同一账号登入,此时协作修改会引发操作冲突。

如果仅使用纯消息传输方式,如 手机-发送修改->Socket Server=广播修改=》其他设备,会产生一些问题:

  1. 如果手机和电脑操作了同一块上下文,使用谁的修改?
  2. 如果多人同时操作了同一块上下文,操作如何合并?
  3. 如果操作之间有冲突,怎么解决冲突?

ShareDB 使用 OT 解决了这一问题,OTOperational transformation 的简称,是一种在高级协作软件系统中支持一系列协作功能的技术。OT 最初是为了在纯文本文档的协作编辑中保持一致性和并发控制而发明的。

这种保证最终数据一致性和并发控制的协作控制机制能够应用到更广的范围。

设计

最初使用 Socket.io 的场景就是 WebRTC 直播服务的流媒体更新通信和房间管理,这些存储在 Socket.io 中的数据完全可以抽象到 ShareDB 中的共享文档中。

现在需要介绍 ShareDB 中共享文档的两种类型,一种是可持久化的 doc,一种是没人订阅就清空的 state

直播场景下,对于媒体流管理和房间管理,应当是没人订阅就删除的 state 模式。

那么我们就可以初步定义数据类型了:

实现

服务端配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
export const defaultConfig: SynodeConfig = {
server: {
port: parseInt(process.env.SYNODE_PORT || "8888"),
path: process.env.SYNODE_PATH || "/server",
host: process.env.SYNODE_HOST || "localhost",
},
collections: {
share_live: { type: "state", description: "直播" }, // 这就是我们定义的共享文档
},
memoryDb: {
maxDocsPerCollection: parseInt(
process.env.SYNODE_MAX_DOCS_PER_COLLECTION || "3000"
),
maxOpsPerDoc: parseInt(process.env.SYNODE_MAX_OPS_PER_DOC || "200"),
cleanupInterval: parseInt(process.env.SYNODE_CLEANUP_INTERVAL || "30000"),
autoCleanup: process.env.SYNODE_AUTO_CLEANUP !== "false",
},
logging: {
enableDebug: process.env.SYNODE_DEBUG === "true",
logUserConnections: process.env.SYNODE_LOG_CONNECTIONS !== "false",
logOperations: process.env.SYNODE_LOG_OPERATIONS === "true",
},
};

客户端数据类型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export interface LiveSharedData extends SharedData {
streams: { streamId: string; userId: string; deviceId: string }[];
connectedUsers: { userId: string }[];
}

// 从 shardDB 获取的直播数据
const sharedLive = useShareData<LiveSharedData>(
"share_live",
liveId,
{
streams: [],
connectedUsers: [],
},
{ otType: "json0", localSubmitUpdate: true }
);

export function useShareData<T extends SharedData>(
collection: string,
docId: string,
initData: T,
options?: ShareDataOptions
) {
/** ShareDB 文档实例 */
const doc = shareDB.connection.get(collection, docId);
}

修改文档

数据定义好了,该修改文档了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建摄像头流
const createCameraStream = async (deviceId: string) => {
console.log("createCameraStream: ", deviceId);
const streamId = await liveMedia.createCameraStream(deviceId);
const op = [
{
p: ["streams", sharedLive.data.streams?.length || 0],
li: {
streamId: streamId,
userId: user.state.user.id,
deviceId: deviceId,
},
},
];
console.log("sharedLive.submitOp: ", op);
sharedLive.submitOp(op, { source: streamId });
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 销毁流
const destroyStream = async (streamId: string) => {
// 更新本地流列表
const publisher = liveMedia.liveConnections.value.publishers.find(
(p) => p.streamId === streamId
);
if (publisher) {
liveMedia.destroyStream(true, streamId);
}

// 更新 shareDB
const index = sharedLive.data.streams.findIndex(
(s) => s.streamId === streamId
);
if (index !== -1) {
const stream = sharedLive.data.streams?.[index];
const op = [{ p: ["streams", index], ld: stream }];
sharedLive.submitOp(op, { source: streamId });
}
};

关于 submitOp

这里的 op 是变更操作,遵循 ShareDB 的 json0 操作协议,有一套自己的语法:

https://github.com/ottypes/json0?tab=readme-ov-file#summary-of-operations

监听变更

变更的格式也很有意思,就是 submitOp 传入的 op:

1
2
3
4
5
6
7
8
9
10
[
{
"p": ["streams", 0],
"li": {
"streamId": 1,
"userId": 2,
"deviceId": 3
}
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 监听数据变化
sharedLive.events.on("change", (delta: any, source: any) => {
console.log("数据变化", delta, source);
if (delta[0].p[0] === "streams" && !source) {
// 是其他人修改的流数据
if (delta[0].li) {
// 新增流
const newStream = delta[0].li["streamId"];
console.log("newStream: ", newStream);
liveMedia.addPlayer(newStream, {
host: URL_CONF.host,
room: URL_CONF.room,
});
} else if (delta[0].ld) {
// 删除流
console.log("删除流: ", delta[0].ld["streamId"]);
liveMedia.removePlayer(delta[0].ld["streamId"]);
}
}
});

拓展

ShareDB 还支持多种操作格式,如 rich-text:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Insert a bolded "Text"
{ insert: "Text", attributes: { bold: true } }

// Insert a link
{ insert: "Google", attributes: { href: 'https://www.google.com' } }

// Insert an embed
{
insert: { image: 'https://octodex.github.com/images/labtocat.png' },
attributes: { alt: "Lab Octocat" }
}

// Insert another embed
{
insert: { video: 'https://www.youtube.com/watch?v=dMH0bHeiRNg' },
attributes: {
width: 420,
height: 315
}
}

想要使用这种格式,需要到服务端中注册:

1
2
const richText = require("rich-text");
ShareDB.types.register(richText.type);

这里有官方的更多操作格式:

https://github.com/ottypes/docs

总结

与其说 ShareDB 可用的范围很多,不如说 OT 协同操作给予了消息管理更多的可能性。对于这种存在冲突与并发的场景,消息队列固然是好东西,但对于如果还需共享、操作上下文,还是需要协同操作解决冲突。