Elasticsearch乐观锁并发控制实战

主要参数及技术点

  • Elasticsearch 乐观锁技术
  • Elasticsearch 参数:
    • version:(6.7.0 之后被弃用,需采用第二种方式)
    • if_seq_no & if_primary_term :

概念解释:

悲观锁控制(悲观并发控制)

1
2
3
4
5
6
7
8
9
10
/**
* TODO
* 总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,
* 所以每次在拿数据的时候都会上锁,这样别人想拿这个数
* 据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,
* 其它线程阻塞,用完后再把资源转让给其它线程)。
* 传统的关系型数据库里边就用到了很多这种锁机制,
* 比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
* Java中synchronized和ReentrantLock等独占锁就是悲观锁思想的实现。
*/

乐观锁控制

1
2
3
4
5
6
7
8
9
10
/**
* TODO
* 总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,
* 所以不会上锁,但是在更新的时候会判断一下在此期间别人有
* 没有去更新这个数据,可以使用版本号机制和CAS算法实现。
* 乐观锁适用于多读的应用类型,这样可以提高吞吐量,
* 像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。
* 在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了
* 乐观锁的一种实现方式CAS实现的。
*/

操作

原数据准备

1
2
3
4
5
curl -XPUT 'elasticsearch_host:9200/note_index/_doc/1' -H 'Content-Type:application/json' -d'
{
"tilte" : "init title"
}
'

First: 两客户端获取上述记录 (record._version=1 or _seq_no=1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# request
curl -XGET 'elasticsearch_host:9200/note_index/_doc/1'

# response
# _version: 6.7.0 前的版本控制字段
# _seq_no: 6.7.0 及以后的版本控制字段
# _primary_term: 6.7.0 及以后的版本控制字段
# response
{
"_shards" : {
"total" : 2,
"failed" : 0,
"successful" : 1
},
"_index" : "note_index",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 1,
"_primary_term" : 2,
"result" : "created"
}

if_seq_no

Second: 两个客户端依次更新记录

  • 6.7.0 前使用version 进行版本控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# client 1:
# client 1 request
curl -XPUT 'elasticsearch_host:9200/note_index/_doc/1?version=<record.version>' -H 'Content-Type:application/json' -d'
{
"tilte" : "init title"
}'
# client 1 response
success

# client 2:
# client 2 request
curl -XPUT 'elasticsearch_host:9200/note_index/_doc/1?version=<record.version>' -H 'Content-Type:application/json' -d'
{
"tilte" : "init title"
}
'
# client 2 response
error: version conflict
  • 6.7.0 及之后使用 if_seq_no , if_primary_term 进行版本控制。
1
2
3
4
5
6
7
/**
* TODO 原文描述
* 1. The sequence number and the primary term uniquely identify a change. By noting down the sequence number
* and primary term returned, you can make sure to only change the document if no other change was made to it since you retrieved it.
* This is done by setting the if_seq_no and if_primary_term parameters of either the Index API or the Delete API.
*
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# client 1:
# client 1 request
curl -XPUT 'elasticsearch_host:9200/note_index/_doc/1?if_seq_no=<_seq_no>&if_primary_term=<_primary_term>' -H 'Content-Type:application/json' -d'
{
"tilte" : "init title"
}
'
# client 1 response
success


# client 2:
# client 2 request
curl -XPUT 'elasticsearch_host:9200/note_index/_doc/1?if_seq_no=<_seq_no>&if_primary_term=<_primary_term>' -H 'Content-Type:application/json' -d'
{
"tilte" : "init title"
}
'
# client 2 response
error: version conflict

Third: 更新重试

1
2
3
4
5
6
7
/**
* TODO
* 1. 更新报错的客户端,需要重新获取记录的记录版本,
* 2. 重新进行业务逻辑处理。
* 3. 更新重试。(如果仍旧失败,返回 “1. 更新报错。。。”步,再一次进行更新重试直至更新成功)。
*
*/

注意:

若想记录被更新后,能及时检索到,需要设置refresh参数为:wait_for,该参数所有取值如下:

  • false: Don’t refresh after this request. The default.
  • true: Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful
    to present a consistent view to for indices with very low traffic. And it is wonderful for tests!
  • wait_for: Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is
    compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.

参考连接

参考连接:
Elasticsearch Reference [7.9] » REST APIs » Document APIs » ?refresh
Elasticsearch Document《Optimistic concurrency control》
elasticsearch学习笔记(十三)——Elasticsearch乐观锁并发控制实战
Elasticsearch系列—并发控制及乐观锁实现原理