ES同步PG服务

models

  • Talents = 1
  • Experiences = 2
  • Organizations = 3
  • Jobs = 4
  • Values = 5
  • Industries = 6

同步执行过程

  • 依赖计算队列(sqs)
    • 在cms的新增、更新、删除接口运行成功后,往SyncJobs表中插入数据同步job
    • 在服务端的cron-job周期内,如果没有待deploy的model,则把SyncJobs中status=pending的任务发送到依赖计算队列中
    • 发送到队列的场景和结构如下,dataList设计成数据的原因是后续可能支持批量操作,以及更新、删除操作的过程中可能涉及到别model数据的变更;假设有talent、exp、org三个index需要更新
      • cms新增接口,新增了talent=5 `
          {
              "syncJobId": 1,
              "dataList": [
                  {
                      "action": "create"
                      "modelId": 1,
                      "dataId": 5
                  }
              ]
          }
        
        `
      • cms更新接口,更新了talent=5,更新时把exp=6从talent上删掉了,需要把这两条数据也一起放到队列里面,因为talent数据已经更新完了,后续执行依赖计算时是无法得知exp=6需要更新 `
          {
              "syncJobId": 2,
              "dataList": [
                  {
                      "action": "update"
                      "modelId": 1,
                      "dataId": 5
                  },
                  {
                      "action": "update"
                      "modelId": 2,
                      "dataId": 6
                  }
              ]
          }
        
        `
      • cms删除接口,删除了talent=5,由于外键约束会把和talent关联的外键直接设置成null,在依赖计算时exp=7无法得知需要更新,因此需要把exp=7一起写入到队列中 `
          {
              "syncJobId": 3,
              "dataList": [
                  {
                      "action": "delete"
                      "modelId": 1,
                      "dataId": 5
                  },
                  {
                      "action": "update"
                      "modelId": 2,
                      "dataId": 7                  
                  }
              ]
          }
        
        `
  • 依赖计算函数(lambda,见code/function-calculateDependencies.js)
    • 计算因新增、更新、删除数据,而需要同时更新的依赖数据(modelId+dataId),计算完成后把涉及需要更新的modelId和dataId都发送到数据同步队列中
      • 例如在上一步中新增了talent=5,新增接口写入时把exp=5和exp=6和talent关联在了一起,而由于exp=5和org=3关联,exp=6和org=4关联,因此最后更新数据时需要把talent=5、exp=5、exp=6、org=3、org=4这些数据都要同步到index,当前lambda的作用就是计算依赖数据,并把这些modelId和dataId发送到数据同步队列 `
          {
              "syncJobId": 1,
              "dataList": [
                  {
                      "modelId": 1,
                      "dataId": 5
                  },
                  {
                      "modelId": 2,
                      "dataId": 5
                  },
                  {
                      "modelId": 2,
                      "dataId": 6
                  },
                  {
                      "modelId": 3,
                      "dataId": 3
                  },
                  {
                      "modelId": 3,
                      "dataId": 4
                  }                    
              ]
          }
        
        `
  • 数据同步队列(sqs)
    • 触发数据同步函数运行
  • 数据同步函数(lambda,见code/function-syncData.js)
    • 计算以输入的modelId和dataId为顶点的树状数据结构,并写入到es中
      • 例如计算talent=5、exp=5、exp=6、org=3、org=4的完整数据,分别写入到talent、exp、org的index中

(later)SyncJobs表

  • fields
    • appId
    • modelId
    • dataId
    • type
      • calculateDependencies,表示job将发送到依赖计算队列
      • syncData,表示job将发送到数据同步队列
    • status
      • pending
      • calculateDependenciesStart
      • calculateDependenciesEnd
      • calculateDependenciesError
      • syncDataStart
      • syncDataStartEnd
      • syncDataStartError
      • pause
    • content (json,存储发送到队列的dataList)
    • result (json,存中间运行结果)

队列相关

  • sqs standard vs fifo
    • 目前没有fifo的强需求
    • 目前使用场景,生产者可能是多个,消费者也可能是多个,使用FIFO无法完全保证顺序
  • 不同model是否拆成不同队列
    • 优点
      • 利用并发效率
    • 缺点
      • 需要维护的it资源更多,闲置资源更多,增加使用费用
      • sqs容量高,并且消息触发lambda,似乎没有必要
      • 每个lambda都需要进行数据库连接,可能增加其他服务的压力
    • 结论
      • 暂时不拆
  • 是否把dataList拆成单条message发到队列中
    • 优点
      • 利用并发效率
    • 缺点
      • 难以统计job是否完成
      • 没法去重操作
      • 写入到es时,请求过多容易把es搞跨
    • 结论
      • 暂时不拆单条,但是可以指定一个上限(例如50),超过的部分作为一下条message的dataList发送到队列中

model deploy

  • create table
    • 生成deploy request
    • request执行
      • sql create table
      • sql create trigger
      • if noIndex=false
        • 计算model自身的relationTree,存入db
        • index create service
  • add new field
    • 生成deploy request
    • request执行
      • sql alter add field
      • if noIndex=false
        • index updateMappings service
      • * 根据model对应的dependents,计算新增字段在mapping中的field(需要遍历relationTree),执行index updateMappings service
  • create relation
    • 生成deploy request
    • request执行
      • 关系非多对多
        • sql alter add field
      • 关系多对多
        • sql create table
        • sql create trigger
      • * 重新计算所有models的relationTree+dependents
        • 对于需要新增field的index,执行新增
        • 根据重新生成的index,全部执行reindex

index service

  • create
    • 场景
      • model deploy时,model配置上的noIndex=false && model目前没有index,则执行create index
        • (later)如果之前noIndex=true,并且model有index,则可先把老index删除,再创建
    • 流程
      • 计算mappings
        • 把model、modelFields、modelRelations从db中读出来,构造成json
        • ** 参考code/buildIndexMappings.js,计算出index的mappings
      • 调用create index sdk
        • default settings
        • mappings
        • alias
      • 更新SearchIndexes表
        • alias
        • index
  • updateMappings
    • 场景
      • 新增model字段
      • 新增model关系
    • 流程
      • ** 计算新增字段或者新增关系涉及的mappings json(参考model-index-sync.md文件中add new field)
      • ** 计算所有需要更新的index(根据relationTree来算出依赖,例如Values表更新,需要同时更新Talents、Orgs、Jobs三个index)
      • 调用update mappings sdk
  • (later)delete
    • 场景
      • ?model deploy时,model配置上的noIndex=true,需要把现有index删掉
      • ?删除model
    • 流程
      • 调用delete index sdk
  • (later)reindex
    • 场景
      • (later)删除model字段
      • (later)更新model字段
      • (later)删除model关系
      • (later)更新model关系
    • 流程
      • 计算新的mappings,以及reindex时需要运行的script
      • 调用reindex sdk
      • 修改alias指向新的index
      • ?删除旧index
      • 更新SearchIndexes表
        • index
  • rebuild
    • 场景
      • 把整个index重建,并且重新同步数据
    • 流程
      • (later)把对应model相关的syncJobs状态全部置为pause,使和当前model相关的数据同步任务都中断
      • 计算mappings
        • 同create index,需要根据relationTree计算生成
      • 调用create index sdk
        • 同createindex
      • 修改alias指向新的index
      • ?删除旧index
      • 更新SearchIndexes表
        • index
      • ?把model的数据id全部读出来,生成SyncJobs(type=)
  • ?thinking
    • 是否需要把index相关的都封装成lambda
      • 优点
        • 短期内方便不会nest的开发者进行开发
      • 缺点
        • 本身没有什么性能问题,强行分开会导致维护上比较麻烦,架构的依赖上变得更复杂

与数据同步存在冲突的场景

  • model deploy导致index结构更新
    • 使用update mappings的方式(冲突可忽略)
      • 场景
        • 新增model字段
        • 新增model关系
    • 使用reindex的方式(存在冲突)
      • 场景
        • (later)删除model字段
        • (later)更新model字段
        • (later)删除model关系
        • (later)更新model关系
      • 解决方案
        • 把deploy的执行抽象成一个request,该request只有等到SyncJobs队列中没有正在执行的任务(或者任务启动时间>2h的卡死的任务)才允许执行
          • 用户点击deploy,生成deploy request,此时不马上执行pg表和es变更
          • 在cron-job运行时,检测到有等待deploy request,则不把等待同步的SyncJobs推送到队列中;如果SyncJobs表中没有正在执行数据同步的任务,则执行deploy request,如果有正在执行数据同步的任务,则不执行deploy request,等到下一个cron-job周期内在进行检测
  • rebuild导致需要重建某个model的index、同步所有数据
    • 解决方案
      • 把对应model相关的syncJobs状态全部置为pause(这样没放到队列中的任务不会放到队列,在队列中的任务执行lambda判断到pause的状态也会跳过)然后执行rebuild相关操作(重建index、同步所有数据)

是否设计有lock

没有lock

  • add mappings
    • 冲突可忽略
  • (later)reindex
    • 极端情况下,会出现同步用的alias和实际的alias不一致,导致写入数据不准确
  • rebuild
    • 没有冲突,未处理完的任务就当重复了也无所谓

只使用DataChangeQueue作为lock

  • 把每条id的status状态精确记录
    • lambda目前做了很多合并操作,只能做整体更新,不能非常精确到具体数据
  • 不对每条id的status状态精确记录
    • cron-job周期
      • 没有deployRequest,把DataChangeQueue=pending数据发送到队列中,并且把status设置为sendToQueue
      • 有pending的deployRequest,有DataChangeQueue!=初始状态(pending)和终结状态(syncDataEnd),(并且开始时间<5分钟的任务)不把DataChangeQueue=pending数据发送到队列中,跳过等待下一个周期
      • 有deployRequest=pending,没有上述任务,执行deployRequest
    • rebuild
      • 忽略DataChangeQueue中还在运行的数据(暂时无法停止),直接重建index,切换alias
      • ?把所有数据id读出来,直接发送到队列,或者写入到DataChangeQueue中

使用SyncJobs作为lock

  • 原本的设计是每一次触发队列执行,都作为一个整体的job运行,记录job的状态
  • 实现和上面的类似,区别是这里可以整体控制job是否接下去运行,通过增加字段的方式,但目前需求不太明确

不依赖于cms create/update/delete接口来触发同步的方式

定期扫描 + 表中的status=pendingSynced | synced + syncId字段

  • 数据更新
    • status=pendingSynced && syncId={randomStr}
    • cron-job中把那些status=pendingSynced的数据加入到SyncJobs表中
    • lambda完成同步后,把那些status=pendingSynced && syncId的数据更新成status=synced
    • 还是需要依赖于cms接口改造,更新时把status和syncId同时更新
  • 数据软删除
    • 同上
  • 数据硬删除
    • 无法检测,除非被删除时手动加入到SyncJobs中

(draft)pg trigger to collect data changes

  • DataChangingLogs表
    • modelId
    • dataId
    • status
      • pending
      • sendToQueue
      • ...
    • ?tableName
  • 操作
    • insert trigger
      • model数据表
        • 把新增modelId+dataId加入DataChangingLogs
      • 多对多关系表
        • 关系两端modelId+dataId加入DataChangingLogs
    • update trigger
      • model数据表
        • 把更新modelId+dataId加入DataChangingLogs
      • 多对多关系表
        • 新增关系两端modelId+dataId加入DataChangingLogs
        • 删除关系两端modelId+dataId加入DataChangingLogs
    • delete trigger
      • 硬删除
        • model数据表
          • 把删除modelId+dataId加入DataChangingLogs
          • 有指定set null和delete的外键约束
            • ** 和被删除数据有外键关联的数据会产生变动,也会触发其update trigger
      • 软删除(isDelete=true)
        • model数据表
          • ** 被当成普通更新操作,需要在calDependencies阶段的sql做些特殊处理,ON和where的条件需要加上isDelete的判断
  • 生命周期
    • 创建model数据表时,同时创建function和insert/update/delete的trigger
      • function = 把modelId+dataId写入DataChangingLogs表
    • 创建model关系表时,同时创建function和insert/update/delete的trigger
      • function = 把关系中相关的modelId+dataId写入DataChangingLogs表
    • ** 删除model、删除关系表时,对应trigger和function需要删除
    • * 如果强行修改了modelId,对应function也需要更新
  • cronjob
    • 定期遍历DataChangingLogs表,把status=pending的modelId+dataId发送到同步队列中,如果有重复的modelId+dataId,可以过滤后再发送
    • 后续如果需要优化,发送到队列前,可以检查对应model的relationTree.dependents数据,如果为空,证明该表的数据变化不会引起任何其他index需要同步

(draft)model deploy request

  • model
  • model fields
  • model relation

results matching ""

    No results matching ""