前言
Socket.io
实际上有点过于底层了,大量的消息交互都需要人工手写,复用性也不强。
为什么 ShareDB
这种做腾讯文档的能够用来做消息传输呢,其原理和 Socket.io
一样,都是用了 WebSocket
连接来进行双工通信。
我们的需求是建立一个共享区域,多个设备使用同一账号登入,此时协作修改会引发操作冲突。
如果仅使用纯消息传输方式,如 手机-发送修改->Socket Server=广播修改=》其他设备
,会产生一些问题:
- 如果手机和电脑操作了同一块上下文,使用谁的修改?
- 如果多人同时操作了同一块上下文,操作如何合并?
- 如果操作之间有冲突,怎么解决冲突?
ShareDB
使用 OT
解决了这一问题,OT
是 Operational transformation
的简称,是一种在高级协作软件系统中支持一系列协作功能的技术。OT
最初是为了在纯文本文档的协作编辑中保持一致性和并发控制而发明的。
这种保证最终数据一致性和并发控制的协作控制机制能够应用到更广的范围。
设计
最初使用 Socket.io
的场景就是 WebRTC
直播服务的流媒体更新通信和房间管理,这些存储在 Socket.io
中的数据完全可以抽象到 ShareDB
中的共享文档中。
现在需要介绍 ShareDB
中共享文档的两种类型,一种是可持久化的 doc
,一种是没人订阅就清空的 state
。
直播场景下,对于媒体流管理和房间管理,应当是没人订阅就删除的 state
模式。
那么我们就可以初步定义数据类型了:
实现
服务端配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| export const defaultConfig: SynodeConfig = { server: { port: parseInt(process.env.SYNODE_PORT || "8888"), path: process.env.SYNODE_PATH || "/server", host: process.env.SYNODE_HOST || "localhost", }, collections: { share_live: { type: "state", description: "直播" }, }, memoryDb: { maxDocsPerCollection: parseInt( process.env.SYNODE_MAX_DOCS_PER_COLLECTION || "3000" ), maxOpsPerDoc: parseInt(process.env.SYNODE_MAX_OPS_PER_DOC || "200"), cleanupInterval: parseInt(process.env.SYNODE_CLEANUP_INTERVAL || "30000"), autoCleanup: process.env.SYNODE_AUTO_CLEANUP !== "false", }, logging: { enableDebug: process.env.SYNODE_DEBUG === "true", logUserConnections: process.env.SYNODE_LOG_CONNECTIONS !== "false", logOperations: process.env.SYNODE_LOG_OPERATIONS === "true", }, };
|
客户端数据类型定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| export interface LiveSharedData extends SharedData { streams: { streamId: string; userId: string; deviceId: string }[]; connectedUsers: { userId: string }[]; }
const sharedLive = useShareData<LiveSharedData>( "share_live", liveId, { streams: [], connectedUsers: [], }, { otType: "json0", localSubmitUpdate: true } );
export function useShareData<T extends SharedData>( collection: string, docId: string, initData: T, options?: ShareDataOptions ) { const doc = shareDB.connection.get(collection, docId); }
|
修改文档
数据定义好了,该修改文档了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| const createCameraStream = async (deviceId: string) => { console.log("createCameraStream: ", deviceId); const streamId = await liveMedia.createCameraStream(deviceId); const op = [ { p: ["streams", sharedLive.data.streams?.length || 0], li: { streamId: streamId, userId: user.state.user.id, deviceId: deviceId, }, }, ]; console.log("sharedLive.submitOp: ", op); sharedLive.submitOp(op, { source: streamId }); };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| const destroyStream = async (streamId: string) => { const publisher = liveMedia.liveConnections.value.publishers.find( (p) => p.streamId === streamId ); if (publisher) { liveMedia.destroyStream(true, streamId); }
const index = sharedLive.data.streams.findIndex( (s) => s.streamId === streamId ); if (index !== -1) { const stream = sharedLive.data.streams?.[index]; const op = [{ p: ["streams", index], ld: stream }]; sharedLive.submitOp(op, { source: streamId }); } };
|
关于 submitOp
这里的 op 是变更操作,遵循 ShareDB 的 json0 操作协议,有一套自己的语法:
https://github.com/ottypes/json0?tab=readme-ov-file#summary-of-operations
监听变更
变更的格式也很有意思,就是 submitOp
传入的 op
:
1 2 3 4 5 6 7 8 9 10
| [ { "p": ["streams", 0], "li": { "streamId": 1, "userId": 2, "deviceId": 3 } } ]
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| sharedLive.events.on("change", (delta: any, source: any) => { console.log("数据变化", delta, source); if (delta[0].p[0] === "streams" && !source) { if (delta[0].li) { const newStream = delta[0].li["streamId"]; console.log("newStream: ", newStream); liveMedia.addPlayer(newStream, { host: URL_CONF.host, room: URL_CONF.room, }); } else if (delta[0].ld) { console.log("删除流: ", delta[0].ld["streamId"]); liveMedia.removePlayer(delta[0].ld["streamId"]); } } });
|
拓展
ShareDB
还支持多种操作格式,如 rich-text
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| { insert: "Text", attributes: { bold: true } }
{ insert: "Google", attributes: { href: 'https://www.google.com' } }
{ insert: { image: 'https://octodex.github.com/images/labtocat.png' }, attributes: { alt: "Lab Octocat" } }
{ insert: { video: 'https://www.youtube.com/watch?v=dMH0bHeiRNg' }, attributes: { width: 420, height: 315 } }
|
想要使用这种格式,需要到服务端中注册:
1 2
| const richText = require("rich-text"); ShareDB.types.register(richText.type);
|
这里有官方的更多操作格式:
https://github.com/ottypes/docs
总结
与其说 ShareDB
可用的范围很多,不如说 OT
协同操作给予了消息管理更多的可能性。对于这种存在冲突与并发的场景,消息队列固然是好东西,但对于如果还需共享、操作上下文,还是需要协同操作解决冲突。