ES同步PG服务
models
- Talents = 1
- Experiences = 2
- Organizations = 3
- Jobs = 4
- Values = 5
- Industries = 6
同步执行过程
- 依赖计算队列(sqs)
- 在cms的新增、更新、删除接口运行成功后,往SyncJobs表中插入数据同步job
- 在服务端的cron-job周期内,如果没有待deploy的model,则把SyncJobs中status=pending的任务发送到依赖计算队列中
- 发送到队列的场景和结构如下,dataList设计成数据的原因是后续可能支持批量操作,以及更新、删除操作的过程中可能涉及到别model数据的变更;假设有talent、exp、org三个index需要更新
- cms新增接口,新增了talent=5
`
`{ "syncJobId": 1, "dataList": [ { "action": "create" "modelId": 1, "dataId": 5 } ] } - cms更新接口,更新了talent=5,更新时把exp=6从talent上删掉了,需要把这两条数据也一起放到队列里面,因为talent数据已经更新完了,后续执行依赖计算时是无法得知exp=6需要更新
`
`{ "syncJobId": 2, "dataList": [ { "action": "update" "modelId": 1, "dataId": 5 }, { "action": "update" "modelId": 2, "dataId": 6 } ] } - cms删除接口,删除了talent=5,由于外键约束会把和talent关联的外键直接设置成null,在依赖计算时exp=7无法得知需要更新,因此需要把exp=7一起写入到队列中
`
`{ "syncJobId": 3, "dataList": [ { "action": "delete" "modelId": 1, "dataId": 5 }, { "action": "update" "modelId": 2, "dataId": 7 } ] }
- cms新增接口,新增了talent=5
`
- 依赖计算函数(lambda,见code/function-calculateDependencies.js)
- 计算因新增、更新、删除数据,而需要同时更新的依赖数据(modelId+dataId),计算完成后把涉及需要更新的modelId和dataId都发送到数据同步队列中
- 例如在上一步中新增了talent=5,新增接口写入时把exp=5和exp=6和talent关联在了一起,而由于exp=5和org=3关联,exp=6和org=4关联,因此最后更新数据时需要把talent=5、exp=5、exp=6、org=3、org=4这些数据都要同步到index,当前lambda的作用就是计算依赖数据,并把这些modelId和dataId发送到数据同步队列
`
`{ "syncJobId": 1, "dataList": [ { "modelId": 1, "dataId": 5 }, { "modelId": 2, "dataId": 5 }, { "modelId": 2, "dataId": 6 }, { "modelId": 3, "dataId": 3 }, { "modelId": 3, "dataId": 4 } ] }
- 例如在上一步中新增了talent=5,新增接口写入时把exp=5和exp=6和talent关联在了一起,而由于exp=5和org=3关联,exp=6和org=4关联,因此最后更新数据时需要把talent=5、exp=5、exp=6、org=3、org=4这些数据都要同步到index,当前lambda的作用就是计算依赖数据,并把这些modelId和dataId发送到数据同步队列
`
- 计算因新增、更新、删除数据,而需要同时更新的依赖数据(modelId+dataId),计算完成后把涉及需要更新的modelId和dataId都发送到数据同步队列中
- 数据同步队列(sqs)
- 触发数据同步函数运行
- 数据同步函数(lambda,见code/function-syncData.js)
- 计算以输入的modelId和dataId为顶点的树状数据结构,并写入到es中
- 例如计算talent=5、exp=5、exp=6、org=3、org=4的完整数据,分别写入到talent、exp、org的index中
- 计算以输入的modelId和dataId为顶点的树状数据结构,并写入到es中
(later)SyncJobs表
- fields
- appId
- modelId
- dataId
- type
- calculateDependencies,表示job将发送到依赖计算队列
- syncData,表示job将发送到数据同步队列
- status
- pending
- calculateDependenciesStart
- calculateDependenciesEnd
- calculateDependenciesError
- syncDataStart
- syncDataStartEnd
- syncDataStartError
- pause
- content (json,存储发送到队列的dataList)
- result (json,存中间运行结果)
队列相关
- sqs standard vs fifo
- 目前没有fifo的强需求
- 目前使用场景,生产者可能是多个,消费者也可能是多个,使用FIFO无法完全保证顺序
- 不同model是否拆成不同队列
- 优点
- 利用并发效率
- 缺点
- 需要维护的it资源更多,闲置资源更多,增加使用费用
- sqs容量高,并且消息触发lambda,似乎没有必要
- 每个lambda都需要进行数据库连接,可能增加其他服务的压力
- 结论
- 暂时不拆
- 优点
- 是否把dataList拆成单条message发到队列中
- 优点
- 利用并发效率
- 缺点
- 难以统计job是否完成
- 没法去重操作
- 写入到es时,请求过多容易把es搞跨
- 结论
- 暂时不拆单条,但是可以指定一个上限(例如50),超过的部分作为一下条message的dataList发送到队列中
- 优点
model deploy
- create table
- 生成deploy request
- request执行
- sql create table
- sql create trigger
- if noIndex=false
- 计算model自身的relationTree,存入db
- index create service
- add new field
- 生成deploy request
- request执行
- sql alter add field
- if noIndex=false
- index updateMappings service
- * 根据model对应的dependents,计算新增字段在mapping中的field(需要遍历relationTree),执行index updateMappings service
- create relation
- 生成deploy request
- request执行
- 关系非多对多
- sql alter add field
- 关系多对多
- sql create table
- sql create trigger
- * 重新计算所有models的relationTree+dependents
- 对于需要新增field的index,执行新增
- 根据重新生成的index,全部执行reindex
- 关系非多对多
index service
- create
- 场景
- model deploy时,model配置上的noIndex=false && model目前没有index,则执行create index
- (later)如果之前noIndex=true,并且model有index,则可先把老index删除,再创建
- model deploy时,model配置上的noIndex=false && model目前没有index,则执行create index
- 流程
- 计算mappings
- 把model、modelFields、modelRelations从db中读出来,构造成json
- ** 参考code/buildIndexMappings.js,计算出index的mappings
- 调用create index sdk
- default settings
- mappings
- alias
- 更新SearchIndexes表
- alias
- index
- 计算mappings
- 场景
- updateMappings
- 场景
- 新增model字段
- 新增model关系
- 流程
- ** 计算新增字段或者新增关系涉及的mappings json(参考model-index-sync.md文件中add new field)
- ** 计算所有需要更新的index(根据relationTree来算出依赖,例如Values表更新,需要同时更新Talents、Orgs、Jobs三个index)
- 调用update mappings sdk
- 场景
- (later)delete
- 场景
- ?model deploy时,model配置上的noIndex=true,需要把现有index删掉
- ?删除model
- 流程
- 调用delete index sdk
- 场景
- (later)reindex
- 场景
- (later)删除model字段
- (later)更新model字段
- (later)删除model关系
- (later)更新model关系
- 流程
- 计算新的mappings,以及reindex时需要运行的script
- 调用reindex sdk
- 修改alias指向新的index
- ?删除旧index
- 更新SearchIndexes表
- index
- 场景
- rebuild
- 场景
- 把整个index重建,并且重新同步数据
- 流程
- (later)把对应model相关的syncJobs状态全部置为pause,使和当前model相关的数据同步任务都中断
- 计算mappings
- 同create index,需要根据relationTree计算生成
- 调用create index sdk
- 同createindex
- 修改alias指向新的index
- ?删除旧index
- 更新SearchIndexes表
- index
- ?把model的数据id全部读出来,生成SyncJobs(type=)
- 场景
- ?thinking
- 是否需要把index相关的都封装成lambda
- 优点
- 短期内方便不会nest的开发者进行开发
- 缺点
- 本身没有什么性能问题,强行分开会导致维护上比较麻烦,架构的依赖上变得更复杂
- 优点
- 是否需要把index相关的都封装成lambda
与数据同步存在冲突的场景
- model deploy导致index结构更新
- 使用update mappings的方式(冲突可忽略)
- 场景
- 新增model字段
- 新增model关系
- 场景
- 使用reindex的方式(存在冲突)
- 场景
- (later)删除model字段
- (later)更新model字段
- (later)删除model关系
- (later)更新model关系
- 解决方案
- 把deploy的执行抽象成一个request,该request只有等到SyncJobs队列中没有正在执行的任务(或者任务启动时间>2h的卡死的任务)才允许执行
- 用户点击deploy,生成deploy request,此时不马上执行pg表和es变更
- 在cron-job运行时,检测到有等待deploy request,则不把等待同步的SyncJobs推送到队列中;如果SyncJobs表中没有正在执行数据同步的任务,则执行deploy request,如果有正在执行数据同步的任务,则不执行deploy request,等到下一个cron-job周期内在进行检测
- 把deploy的执行抽象成一个request,该request只有等到SyncJobs队列中没有正在执行的任务(或者任务启动时间>2h的卡死的任务)才允许执行
- 场景
- 使用update mappings的方式(冲突可忽略)
- rebuild导致需要重建某个model的index、同步所有数据
- 解决方案
- 把对应model相关的syncJobs状态全部置为pause(这样没放到队列中的任务不会放到队列,在队列中的任务执行lambda判断到pause的状态也会跳过)然后执行rebuild相关操作(重建index、同步所有数据)
- 解决方案
是否设计有lock
没有lock
- add mappings
- 冲突可忽略
- (later)reindex
- 极端情况下,会出现同步用的alias和实际的alias不一致,导致写入数据不准确
- rebuild
- 没有冲突,未处理完的任务就当重复了也无所谓
只使用DataChangeQueue作为lock
- 把每条id的status状态精确记录
- lambda目前做了很多合并操作,只能做整体更新,不能非常精确到具体数据
- 不对每条id的status状态精确记录
- cron-job周期
- 没有deployRequest,把DataChangeQueue=pending数据发送到队列中,并且把status设置为sendToQueue
- 有pending的deployRequest,有DataChangeQueue!=初始状态(pending)和终结状态(syncDataEnd),(并且开始时间<5分钟的任务)不把DataChangeQueue=pending数据发送到队列中,跳过等待下一个周期
- 有deployRequest=pending,没有上述任务,执行deployRequest
- rebuild
- 忽略DataChangeQueue中还在运行的数据(暂时无法停止),直接重建index,切换alias
- ?把所有数据id读出来,直接发送到队列,或者写入到DataChangeQueue中
- cron-job周期
使用SyncJobs作为lock
- 原本的设计是每一次触发队列执行,都作为一个整体的job运行,记录job的状态
- 实现和上面的类似,区别是这里可以整体控制job是否接下去运行,通过增加字段的方式,但目前需求不太明确
不依赖于cms create/update/delete接口来触发同步的方式
定期扫描 + 表中的status=pendingSynced | synced + syncId字段
- 数据更新
- status=pendingSynced && syncId={randomStr}
- cron-job中把那些status=pendingSynced的数据加入到SyncJobs表中
- lambda完成同步后,把那些status=pendingSynced && syncId的数据更新成status=synced
- 还是需要依赖于cms接口改造,更新时把status和syncId同时更新
- 数据软删除
- 同上
- 数据硬删除
- 无法检测,除非被删除时手动加入到SyncJobs中
(draft)pg trigger to collect data changes
- DataChangingLogs表
- modelId
- dataId
- status
- pending
- sendToQueue
- ...
- ?tableName
- 操作
- insert trigger
- model数据表
- 把新增modelId+dataId加入DataChangingLogs
- 多对多关系表
- 关系两端modelId+dataId加入DataChangingLogs
- model数据表
- update trigger
- model数据表
- 把更新modelId+dataId加入DataChangingLogs
- 多对多关系表
- 新增关系两端modelId+dataId加入DataChangingLogs
- 删除关系两端modelId+dataId加入DataChangingLogs
- model数据表
- delete trigger
- 硬删除
- model数据表
- 把删除modelId+dataId加入DataChangingLogs
- 有指定set null和delete的外键约束
- ** 和被删除数据有外键关联的数据会产生变动,也会触发其update trigger
- model数据表
- 软删除(isDelete=true)
- model数据表
- ** 被当成普通更新操作,需要在calDependencies阶段的sql做些特殊处理,ON和where的条件需要加上isDelete的判断
- model数据表
- 硬删除
- insert trigger
- 生命周期
- 创建model数据表时,同时创建function和insert/update/delete的trigger
- function = 把modelId+dataId写入DataChangingLogs表
- 创建model关系表时,同时创建function和insert/update/delete的trigger
- function = 把关系中相关的modelId+dataId写入DataChangingLogs表
- ** 删除model、删除关系表时,对应trigger和function需要删除
- * 如果强行修改了modelId,对应function也需要更新
- 创建model数据表时,同时创建function和insert/update/delete的trigger
- cronjob
- 定期遍历DataChangingLogs表,把status=pending的modelId+dataId发送到同步队列中,如果有重复的modelId+dataId,可以过滤后再发送
- 后续如果需要优化,发送到队列前,可以检查对应model的relationTree.dependents数据,如果为空,证明该表的数据变化不会引起任何其他index需要同步
(draft)model deploy request
- model
- model fields
- model relation