批量操作

批量查询

1、批量查询的好处
就是一条一条的查询,比如说要查询100条数据,那么就要发送100次网络请求,这个开销还是很大的 如果进行批量查询的话,查询100条数据,就只要发送1次网络请求,网络请求的性能开销缩减100倍
2、mget的语法
(1)一条一条的查询
GET /test_index/test_type/1 GET /test_index/test_type/2
(2)mget批量查询
GET /_mget { "docs" : [ { "_index" : "test_index", "_type" : "test_type", "_id" : 1 }, { "_index" : "test_index", "_type" : "test_type", "_id" : 2 } ] }
{ "docs": [ { "_index": "test_index", "_type": "test_type", "_id": "1", "_version": 2, "found": true, "_source": { "test_field1": "test field1", "test_field2": "test field2" } }, { "_index": "test_index", "_type": "test_type", "_id": "2", "_version": 1, "found": true, "_source": { "test_content": "my test" } } ] }
(3)如果查询的document是一个index下的不同type种的话
GET /test_index/_mget { "docs" : [ { "_type" : "test_type", "_id" : 1 }, { "_type" : "test_type", "_id" : 2 } ] }
(4)如果查询的数据都在同一个index下的同一个type下,最简单了
GET /test_index/test_type/_mget { "ids": [1, 2] }
3、mget的重要性
可以说mget是很重要的,一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api 尽可能减少网络开销次数,可能可以将性能提升数倍,甚至数十倍,非常非常之重要

批量增删改

课程大纲
1、bulk语法
POST /_bulk //删除 { "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} //创建 { "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }} { "test_field": "test12" } // { "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }} { "test_field": "replaced test2" } { "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} }
每一个操作要两个json串,语法如下:
{"action": {"metadata"}} {"data"}
举例,比如你现在要创建一个文档,放bulk里面,看起来会是这样子的:
{"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}} {"test_field1": "test1", "test_field2": "test2"}
有哪些类型的操作可以执行呢? (1)delete:删除一个文档,只要1个json串就可以了 (2)create:PUT /index/type/id/_create,强制创建 (3)index:普通的put操作,可以是创建文档,也可以是全量替换文档 (4)update:执行的partial update操作
bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行
{ "error": { "root_cause": [ { "type": "json_e_o_f_exception", "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]" } ], "type": "json_e_o_f_exception", "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]" }, "status": 500 }
{ "took": 41, "errors": true, "items": [ { "delete": { "found": true, "_index": "test_index", "_type": "test_type", "_id": "10", "_version": 3, "result": "deleted", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } }, { "create": { "_index": "test_index", "_type": "test_type", "_id": "3", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true, "status": 201 } }, { "create": { "_index": "test_index", "_type": "test_type", "_id": "2", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[test_type][2]: version conflict, document already exists (current version [1])", "index_uuid": "6m0G7yx7R1KECWWGnfH1sw", "shard": "2", "index": "test_index" } } }, { "index": { "_index": "test_index", "_type": "test_type", "_id": "4", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true, "status": 201 } }, { "index": { "_index": "test_index", "_type": "test_type", "_id": "2", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false, "status": 200 } }, { "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_version": 3, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } } ] }
bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志
POST /test_index/_bulk { "delete": { "_type": "test_type", "_id": "3" }} { "create": { "_type": "test_type", "_id": "12" }} { "test_field": "test12" } { "index": { "_type": "test_type" }} { "test_field": "auto-generate id test" } { "index": { "_type": "test_type", "_id": "2" }} { "test_field": "replaced test2" } { "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} }
POST /test_index/test_type/_bulk { "delete": { "_id": "3" }} { "create": { "_id": "12" }} { "test_field": "test12" } { "index": { }} { "test_field": "auto-generate id test" } { "index": { "_id": "2" }} { "test_field": "replaced test2" } { "update": { "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} }
2、bulk size最佳大小
bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从1000~5000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在5~15MB之间。

bulk api奇特的json格式

 
{"action": {"meta"}}\n {"data"}\n {"action": {"meta"}}\n {"data"}\n
 
[{ "action": { }, "data": { } }]
1、bulk中的每个操作都可能要转发到不同的node的shard去执行
2、如果采用比较良好的json数组格式
允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理
(1)将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象 (2)解析json数组里的每个json,对每个请求中的document进行路由 (3)为路由到同一个shard上的多个请求,创建一个请求数组 (4)将这个请求数组序列化 (5)将序列化后的请求数组发送到对应的节点上去
3、耗费更多内存,更多的jvm gc开销
我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。
占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降 另外的话,占用内存更多,就会导致java虚拟机的垃圾回收次数更多,跟频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多
4、现在的奇特格式
{"action": {"meta"}}\n {"data"}\n {"action": {"meta"}}\n {"data"}\n
(1)不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json (2)对每两个一组的json,读取meta,进行document路由 (3)直接将对应的json发送到node上去
5、最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能