微服务架构三大难点之一:数据同步查询

微服务的架构虽然看起来很美,但是里面有一些是需要hold住的内容,其中一项是数据同步查询。当我们把系统拆分成多个业务域后,涉及到跨域的查询就捉襟见肘了,这个时候需要另外的指导思路(CQRS)架构解决这个问题。

微服务架构三大难点之一:数据同步查询

上图查询的数据通过数据同步+搜索引擎进行解决,通过canal订阅mysql的binlog的变化,同步到elasticsearch。这个方案的优点是不会跟业务进行耦合,不需要在业务数据有变化的时候发送事件进行触发。缺点提高了系统的复杂度,需要引入canal的同步服务。

下面选择一个稍微复杂的场景讲解一下,订单数据一对多的场景。由于elasticsearch不像关系数据库一样可以跨索引查询,只能通过嵌套Object的字段类型解决这个问题。

安装的版本

  1. mysql 5.7
  2. canal-server v1.1.4
  3. canal-clent 1.1.4
  4. eleasticsearch 6.7.0
  5. kibana 6.7.0
  6. spring boot 2.2.0 RELEASE
  7. spring data eleasticseach 3.2.0 RELEASE

安装步骤

mysql

docker运行命令:MYSQL_ROOT_PASSWORD 设置mysql的root密码

docker run   --restart=always  -p 3306:3306  -v /Users/liyiye/docker/mysql/data:/var/lib/mysql -v /Users/liyiye/docker/mysql/config/my.cnf:/etc/mysql/my.cnf:ro  --name mysql -e MYSQL_ROOT_PASSWORD=123456  -d   mysql:5.7 

my.cnf文件添加下面的内容,开启binlog,并设置为row模式

[mysqld]  log-bin=mysql-bin # 开启 binlog  binlog-format=ROW # 选择 ROW 模式  server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复  

canal-server

启动地址(https://github.com/alibaba/canal/wiki/Docker-QuickStart)下载run.sh脚本后,在最后启动docker的地方把docker的版本号加上:

cmd="docker run -d -it -h $LOCALHOST $CONFIG --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.4" 

通过启动脚本在命令行启动:其中的 canal.instance.master.address 变量的ip要设置为你本机的ip。

sh run.sh -e canal.auto.scan=false \            -e canal.destinations=test \            -e canal.instance.master.address=10.11.41.191:3306  \            -e canal.instance.dbUsername=root  \            -e canal.instance.dbPassword=123456  \            -e canal.instance.connectionCharset=UTF-8 \            -e canal.instance.tsdb.enable=true \            -e canal.instance.gtidon=false  \ 

canal-client

地址(https://github.com/alibaba/canal/wiki/Sync-ES)下载后修改一下application.yml文件:

canal.conf:   mode: tcp # kafka rocketMQ   canalServerHost: 127.0.0.1:11111   #  zookeeperHosts: slave1:2181   #  mqServers: 127.0.0.1:9092 #or rocketmq   #  flatMessage: true   batchSize: 500   syncBatchSize: 1000   retries: 0   timeout:   accessKey:   secretKey:   srcDataSources:     defaultDS:       url: jdbc:mysql://localhost:3306/test?useUnicode=true       username: root       password: 123456   canalAdapters:     - instance: test # canal instance Name or mq topic name       groups:         - groupId: g1           outerAdapters:             - name: es               hosts: 127.0.0.1:9200               properties:                 mode: rest                 cluster.name: docker-cluster 

在es/conf目录添加一个yml文件,添加内容如下

dataSourceKey: defaultDS destination: test groupId: g1 esMapping:   _index: order   _type: _doc   _id: _id   upsert: true   #pk: orderId   sql: "select concat(d.id,'') as orderId,d.id as _id,d.shop_code as shopCode, d.extend_props as extendProds, c.confirmPackage as confirmPackage         from order d                  left join (select p.order_id as _id,concat(p.order_id,'') as order_id, JSON_ARRAYAGG(JSON_OBJECT('waybillNo', p.waybill_no, 'packageId', p.id)) as package from order_package p group by p.order_id) c         on c.order_id=d.id"   objFields:     extendProds: object     #confirmPackage: array:;     confirmPackage: object   #etlCondition: "where a.c_time>={}"   commitBatch: 3000 

eleasticsearch 和 kibana

通过docker-compose进行安装

version: '2.2' services:   elasticsearch:     image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0     container_name: elasticsearch     environment:       - cluster.name=docker-cluster       - bootstrap.memory_lock=true       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"     ulimits:       memlock:         soft: -1         hard: -1     volumes:       - esdata1:/usr/share/elasticsearch/data     ports:       - 9200:9200     networks:       - esnet   elasticsearch2:     image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0     container_name: elasticsearch2     environment:       - cluster.name=docker-cluster       - bootstrap.memory_lock=true       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"       - "discovery.zen.ping.unicast.hosts=elasticsearch"     ulimits:       memlock:         soft: -1         hard: -1     volumes:       - esdata2:/usr/share/elasticsearch/data     networks:       - esnet   kibana:     image: kibana:6.7.0     container_name: kibana     environment:       - SERVER_NAME=kibana       - ELASTICSEARCH_URL=http://elasticsearch:9200       ports:       - 5601:5601     networks:       - esnet volumes:   esdata1:     driver: local     driver_opts:       type: none       device: /Users/liyiye/docker/es/esdata01       o: bind   esdata2:     driver: local     driver_opts:       type: none       device: /Users/liyiye/docker/es/esdata02       o: bind networks:   esnet:     driver: bridge 

安装后通过http://localhost:5601/ 进行访问kibana。同步数据前需要新建一下索引

PUT /order {   "mappings": {       "_doc": {         "properties": {           "confirmPackage": {             "type": "nested",             "properties": {               "packageId": {                 "type": "long"               },               "waybillNo": {                 "type": "text",                 "fields": {                   "keyword": {                     "type": "keyword",                     "ignore_above": 256                   }                 }               }             }           },           "extendProds": {             "properties": {               "shopName": {                 "type": "text",                 "fields": {                   "keyword": {                     "type": "keyword",                     "ignore_above": 256                   }                 }               },               "test": {                 "type": "text",                 "fields": {                   "keyword": {                     "type": "keyword",                     "ignore_above": 256                   }                 }               }             }           },           "orderId": {             "type": "text"           },           "shopCode": {             "type": "text",             "fields": {               "keyword": {                 "type": "keyword",                 "ignore_above": 256               }             }           }         }       }     }   } 

elasticsearch的客户端查询

通过生成代码选择添加spring-data-elasticsearch根据前面提到的版本进行修改。通过在application.properties文件添加

spring.elasticsearch.rest.uris=localhost:9200 

在单元测试内添加下面的属性,通过该属性就可以查询

@Autowired ElasticsearchRestTemplate elasticsearchTemplate; 

验证步骤

在数据库创建两个表,字段根据前面配置的sql就可以的,对订单表,包裹表进行数据的新增更新删除,可以在kibana查看数据是否有相应更新。es本身是非实时的,就是数据插入后不能里面就能查询出来,有1s的延迟,这个方案不能用于实现性要求比较高的场景。

您可能还会对下面的文章感兴趣: