ElasticSearch 深度分页解决方案
常见深度分页方式 from+size
es 默认采用的分页方式是 from+size 的形式,在深度分页的情况下,这种使用方式效率是非常低的,比如 from = 5000, size=10, es 需要在各个分片上匹配排序并得到 5000x10 条有效数据,然后在结果集中取最后 10 条数据返回,这种方式类似于 mongo 的 skip+size。
除了效率上的问题,还有一个无法解决的问题是,es 目前支持最大的 skip 值是 max_result_window
,默认为 10000
。也就是当 from + size > max_result_window 时,es 将返回错误。
写一个简单测试演示错误。
@Test
public void testEsSearch() throws IOException
{
String authorization = "Basic " + Base64.encodeBase64String("root:111111".getBytes());
String url = "http://192.190.20.5:9200/flint_ip2location_v6/_search";
Document document = Jsoup.connect(url)
.header("Authorization", authorization)
.header("Accept", "application/json; charset=UTF-8")
.header("Content-Type", "application/json")
.requestBody("{\"from\": 10000, \"size\": 1000}")
.ignoreContentType(true)
.post();
System.out.println(document.body().text());
}
前端请求返回 500
状态:
org.jsoup.HttpStatusException: HTTP error fetching URL. Status=500, URL=http://192.190.20.5:9200/flint_ip2location_v6/_search
at org.jsoup.helper.HttpConnection$Response.execute(HttpConnection.java:776)
at org.jsoup.helper.HttpConnection$Response.execute(HttpConnection.java:722)
at org.jsoup.helper.HttpConnection.execute(HttpConnection.java:306)
at org.jsoup.helper.HttpConnection.post(HttpConnection.java:301)
at com.spinfo.soup.JsoupTest.testEsSearch(JsoupTest.java:204)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:389)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:167)
at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:163)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:110)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:83)
at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$null$2(HierarchicalTestExecutor.java:92)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:92)
at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$null$2(HierarchicalTestExecutor.java:92)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:92)
at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:51)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
es 服务日志错误:
[2018-11-12T08:15:37,328][DEBUG][o.e.a.s.TransportSearchAction] [esnode-1] [flint_ip2location_v6][0], node[e4kCsoe0R8-Bq3ZmOFyEGQ], [P], s[STARTED], a[id=SfhVgfmJTzGlyUhWfgV5_g]: Failed to execute [SearchRequest{searchType=QUERY_THEN_FETCH, indices=[flint_ip2location_v6], indicesOptions=IndicesOptions[id=38, ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false], types=[], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=5, batchedReduceSize=512, preFilterShardSize=128, allowPartialSearchResults=true, source={"from":10000,"size":1000}}]
org.elasticsearch.transport.RemoteTransportException: [esnode-1][192.190.20.5:9300][indices:data/read/search[phase/query]]
Caused by: org.elasticsearch.search.query.QueryPhaseExecutionException: Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
at org.elasticsearch.search.DefaultSearchContext.preProcess(DefaultSearchContext.java:206) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:89) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.createContext(SearchService.java:587) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:551) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:347) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:333) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:329) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1019) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
[2018-11-12T08:15:37,329][DEBUG][o.e.a.s.TransportSearchAction] [esnode-1] All shards failed for phase: [query]
org.elasticsearch.search.query.QueryPhaseExecutionException: Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
at org.elasticsearch.search.DefaultSearchContext.preProcess(DefaultSearchContext.java:206) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:89) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.createContext(SearchService.java:587) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:551) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:347) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:333) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:329) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1019) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
[2018-11-12T08:15:37,329][WARN ][r.suppressed ] path: /flint_ip2location_v6/_search, params: {index=flint_ip2location_v6}
org.elasticsearch.action.search.SearchPhaseExecutionException: all shards failed
at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:288) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:128) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:249) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.InitialSearchPhase.onShardFailure(InitialSearchPhase.java:101) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.InitialSearchPhase.access$100(InitialSearchPhase.java:48) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.InitialSearchPhase$2.lambda$onFailure$1(InitialSearchPhase.java:222) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.InitialSearchPhase.maybeFork(InitialSearchPhase.java:176) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.InitialSearchPhase.access$000(InitialSearchPhase.java:48) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.InitialSearchPhase$2.onFailure(InitialSearchPhase.java:222) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:73) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:51) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:527) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1095) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.transport.TransportService$DirectResponseChannel.processException(TransportService.java:1188) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1172) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:66) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.action.search.SearchTransportService$6$1.onFailure(SearchTransportService.java:385) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onFailure(SearchService.java:341) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:335) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:329) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1019) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: org.elasticsearch.search.query.QueryPhaseExecutionException: Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
at org.elasticsearch.search.DefaultSearchContext.preProcess(DefaultSearchContext.java:206) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:89) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.createContext(SearchService.java:587) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:551) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:347) ~[elasticsearch-6.3.0.jar:6.3.0]
at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:333) ~[elasticsearch-6.3.0.jar:6.3.0]
... 9 more
错误原因和解决方式在日志中给出了:
Result window is too large, from + size must be less than or equal to: [10000] but was [11000].See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
查看索引配置信息:
[Hadoop@Hadoop0 ~]$ curl -XGET 192.190.20.5:9200/flint_ip2location_v6/_settings?pretty
{
"flint_ip2location_v6" : {
"settings" : {
"index" : {
"number_of_shards" : "1",
"provided_name" : "flint_ip2location_v6",
"max_result_window" : "10000",
"creation_date" : "1535653837486",
"number_of_replicas" : "1",
"uuid" : "iq4bBu2jQ2KIzB5B4Wqk9w",
"version" : {
"created" : "6030099"
}
}
}
}
}
[Hadoop@Hadoop0 ~]$
如果是线上的 es 数据出现问题,当分页到几百页的时候,es 无法返回数据,此时为了恢复正常使用,只能采用了紧急规避方案,就是将 max_result_window
的值调至 100000000。
[Hadoop@Hadoop0 ~]$ curl -XPUT "192.190.20.5:9200/flint_ip2location_v6/_settings" -d
'{
"index" : {
"max_result_window" : 50000
}
}'
然后这种方式只能暂时解决问题,当 es 的使用越来越多,数据量越来越大,深度分页的场景越来越复杂时,如何解决这种问题呢?
另一种分页方式 scroll
为了满足深度分页的场景,es 提供了 scroll 的方式进行分页读取。原理上是对某次查询生成一个游标 scroll_id
,后续的查询只需要根据这个游标去取数据,直到结果集中返回的 hits
字段为空,就表示遍历结束。scroll_id 的生成可以理解为建立了一个临时的历史快照,在此之后的增删改查等操作不会影响到这个快照的结果。
使用 curl 进行分页读取过程如下:
- 先获取第一个 scroll_id,url 参数包括 /index/_type/ 和 scroll,scroll 字段指定了scroll_id 的有效生存期,以分钟为单位,过期之后会被es 自动清理。如果文档不需要特定排序,可以指定按照文档创建的时间返回会使迭代更高效。
[root@dnsserver ~]# curl -XGET 200.200.107.232:9200/product/info/_search?pretty&scroll=2m -d
'{"query":{"match_all":{}}, "sort": ["_doc"]}'
# 返回结果
{
"_scroll_id": "cXVlcnlBbmRGZXRjaDsxOzg3OTA4NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7",
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
},
"hits":{...}
}
- 后续的文档读取上一次查询返回的 scroll_id 来不断的取下一页,如果 srcoll_id 的生存期很长,那么每次返回的 scroll_id 都是一样的,直到该 scroll_id 过期,才会返回一个新的 scroll_id。请求指定的 scroll_id 时就不需要 /index/_type 等信息了。每读取一页都会重新设置 scroll_id 的生存时间,所以这个时间只需要满足读取当前页就可以,不需要满足读取所有的数据的时间,1 分钟足够。
[root@dnsserver ~]# curl -XGET '200.200.107.232:9200/_search/scroll?scroll=1m&scroll_id=cXVlcnlBbmRGZXRjaDsxOzg4NDg2OTpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7'
#返回结果
{
"_scroll_id": "cXVlcnlBbmRGZXRjaDsxOzk1ODg3NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7",
"took": 106,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
},
"hits": {
"total": 22424,
"max_score": 1.0,
"hits": [{
"_index": "product",
"_type": "info",
"_id": "did-519392_pdid-2010",
"_score": 1.0,
"_routing": "519392",
"_source": {
....
}
}
]
}
}
- 所有文档获取完毕之后,需要手动清理掉 scroll_id 。虽然es 会有自动清理机制,但是 srcoll_id 的存在会耗费大量的资源来保存一份当前查询结果集映像,并且会占用文件描述符。所以用完之后要及时清理。使用 es 提供的 CLEAR_API 来删除指定的 scroll_id
## 删掉指定的多个 srcoll_id
[root@dnsserver ~]# curl -XDELETE 127.0.0.1:9200/_search/scroll -d
'{"scroll_id" : ["cXVlcnlBbmRGZXRjaDsxOzg3OTA4NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7"]}'
## 删除掉所有索引上的 scroll_id
[root@dnsserver ~]# curl -XDELETE 127.0.0.1:9200/_search/scroll/_all
## 查询当前所有的scroll 状态
[root@dnsserver ~]# curl -XGET 127.0.0.1:9200/_nodes/stats/indices/search?pretty
{
"cluster_name" : "200.200.107.232",
"nodes" : {
"SC4fYi0CT5mIp274ZgH_fg" : {
"timestamp" : 1514346295736,
"name" : "200.200.107.232",
"transport_address" : "200.200.107.232:9300",
"host" : "200.200.107.232",
"ip" : [ "200.200.107.232:9300", "NONE" ],
"indices" : {
"search" : {
"open_contexts" : 0,
"query_total" : 975758,
"query_time_in_millis" : 329850,
"query_current" : 0,
"fetch_total" : 217069,
"fetch_time_in_millis" : 84699,
"fetch_current" : 0,
"scroll_total" : 5348,
"scroll_time_in_millis" : 92712468,
"scroll_current" : 0
}
}
}
}
}
实际测试单索引 10000000 数据,总存储数据大小为 3G,使用 from + size 的方式抽取一个小时只跑了 4000000 左右,而采用 scroll 方式在 15 分钟全部跑完。性能差距非常大。
scroll + scan
当 scroll 的文档不需要排序时,es 为了提高检索的效率,在 2.0 版本提供了 scroll + scan 的方式。随后又在 2.1.0 版本去掉了 scan 的使用,直接将该优化合入了 scroll 中。
没有具体使用过,只简单提一下。使用的 scan 的方式是指定 search_type=scan
。
# 2.0-beta 版本禁用 scroll 的排序,使遍历更加高效
[root@dnsserver ~]# curl '127.0.0.1:9200/order/info/_search?scroll=1m&search_type=scan' -d '{"query":{"match_all":{}}'
search_after 的方式
上述的 scroll search 的方式,官方的建议并不是用于实时的请求,因为每一个 scroll_id 不仅会占用大量的资源(特别是排序的请求),而且是生成的历史快照,对于数据的变更不会反映到快照上。这种方式往往用于非实时处理大量数据的情况,比如要进行数据迁移或者索引变更之类的。那么在实时情况下如果处理深度分页的问题呢?es 给出了 search_after
的方式,这是在 >= 5.0
版本才提供的功能。
search_after 分页的方式和 scroll 有一些显著的区别,首先它是根据上一页的最后一条数据来确定下一页的位置,同时在分页请求的过程中,如果有索引数据的增删改查,这些变更也会实时的反映到游标上。
为了找到每一页最后一条数据,每个文档必须有一个全局唯一值,官方推荐使用 _uid
作为全局唯一值,其实使用业务层的 id 也可以。
- 第一页的请求和正常的请求一样
curl -XGET 127.0.0.1:9200/order/info/_search
{
"size": 10,
"query": {
"term" : {
"did" : 519390
}
},
"sort": [
{"date": "asc"},
{"_uid": "desc"}
]
}
- 第二页的请求,使用第一页返回结果的最后一个数据的值,加上 search_after 字段来取下一页。注意,使用 search_after 的时候要将 from 置为 0 或 -1
curl -XGET 127.0.0.1:9200/order/info/_search
{
"size": 10,
"query": {
"term" : {
"did" : 519390
}
},
"search_after": [1463538857, "tweet#654323"],
"sort": [
{"date": "asc"},
{"_uid": "desc"}
]
}
总结:search_after 适用于深度分页 + 排序,因为每一页的数据依赖于上一页最后一条数据,所以无法跳页请求。
且返回的始终是最新的数据,在分页过程中数据的位置可能会有变更。
参考
1:blog.csdn.net/u011228889/article/details/79760167