JAVA程序多线程批量初始化ES数据时产生的异常问题以及解惑记录

作者: admin 分类: ELK 发布时间: 2019-04-15 21:12  阅读: 818 views

在使用RestHighLevelClient客户端连接elasticsearch之后需要批量初始化一批数据。

代码如下:

//一个线程100条数据,根据记录总数进行分页共8页。这样的话,总共初始化780条记录(测试数据)
public void refresh(){
    String const_key = "const_es_all_index_operation";
    if(!cacheService.lock(const_key,60) ){ //防止快速操作多次
      logger.error("60s 内只能操作一次刷新索引处理 {}");
    }
    int count = DubboService.refreshCount();
    int pageNum=0;
    final int pageSize=100;
    final String indexName = getIndexName();
    pageNum = count%pageSize == 0 ? (count/pageSize) : (count/pageSize + 1);
    for(int i = 1;i<pageNum ;i++) {
      final int startpage = (i-1)*pageSize;
      logger.info("开始第{}页,起始行数{}",i,startpage);
      executorService.execute(new Runnable() {
        @Override
        public void run() {
          try {
            List<BBSPostVo> list = DubboService.refresh(startpage, startpage);
            for(BBSPostVo bbsPost : list) {
              updateIndex(indexName, "_doc", bbsPost.getId()+"", JSON.toJSONString(bbsPost));
            }
          }catch(Exception e) {
            logger.error("batch insert es data is failed ! the errro info is {}",e);
          }
        }
      });
    }
  }
  1. 并发问题报错
[testbbspost/-OWEPdDpRIO-1jLTwAb5AA][[testbbspost][0]] ElasticsearchStatusException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][27730]: version conflict, current version [2] is different than the one provided [1]]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1406)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1382)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1269)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1231)
	at org.elasticsearch.client.RestHighLevelClient.update(RestHighLevelClient.java:634)
	at com.kaishiba.zeus.controller.BBSSearchController.updateIndex(BBSSearchController.java:200)
	at com.kaishiba.zeus.controller.BBSSearchController$1.run(BBSSearchController.java:168)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://127.0.0.1:9200], URI [/testbbspost/_doc/27730/_update?timeout=1m], status line [HTTP/1.1 409 Conflict]
{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[_doc][27730]: version conflict, current version [2] is different than the one provided [1]","index_uuid":"-OWEPdDpRIO-1jLTwAb5AA","shard":"0","index":"testbbspost"}],"type":"version_conflict_engine_exception","reason":"[_doc][27730]: version conflict, current version [2] is different than the one provided [1]","index_uuid":"-OWEPdDpRIO-1jLTwAb5AA","shard":"0","index":"testbbspost"},"status":409}
		at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:920)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:227)
		at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1256)
		... 7 more
	Caused by: org.elasticsearch.client.ResponseException: method [POST], host [http://127.0.0.1:9200], URI [/testbbspost/_doc/27730/_update?timeout=1m], status line [HTTP/1.1 409 Conflict]
{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[_doc][27730]: version conflict, current version [2] is different than the one provided [1]","index_uuid":"-OWEPdDpRIO-1jLTwAb5AA","shard":"0","index":"testbbspost"}],"type":"version_conflict_engine_exception","reason":"[_doc][27730]: version conflict, current version [2] is different than the one provided [1]","index_uuid":"-OWEPdDpRIO-1jLTwAb5AA","shard":"0","index":"testbbspost"},"status":409}
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:540)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:529)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		... 1 more

原因:针对id相同的索引数据记录进行并发操作导致。

批量任务执行过程中,另外有人在修改数据,导致并发产生锁(这是es利用version版本号进行锁的控制,有兴趣可以去了解下)。等修改数据执行完后,重新跑批任务,没有出现上面的错误。

 

2. 异常如下:

Exception in thread "pool-2-thread-2" java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:717)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.start(CloseableHttpAsyncClientBase.java:83)
	at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:203)
	at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:221)
	at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:213)
	at com.kaishiba.zeus.controller.BBSSearchController.updateIndex(BBSSearchController.java:195)
	at com.kaishiba.zeus.controller.BBSSearchController$1.run(BBSSearchController.java:168)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

原因:内存不足。我这里使用mac 8G内存。elasticsearch占用1G,eclipse相关程序2-3G,其他乱起八糟加起来7G多。全部测试数据读取到内存中也就10多M。将本地的内存空间重新分配下,太过麻烦。我上传至开发环境中后,直接执行成功,没有报此类错误。

 

3. 异常如下:

[testbbspost/G_4pQ5c_QuKkmHW7w5gnDw][[testbbspost][2]] ElasticsearchStatusException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][26891]: version conflict, document already exists (current version [1])]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1406)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1382)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1269)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1231)
	at org.elasticsearch.client.RestHighLevelClient.update(RestHighLevelClient.java:634)
	at com.kaishiba.elasticsearch.service.impl.ElasticsearchServiceImpl.updateIndexRequest(ElasticsearchServiceImpl.java:133)
	at com.kaishiba.zeus.controller.BBSSearchController$1.run(BBSSearchController.java:145)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://127.0.0.1:9200], URI [/testbbspost/_doc/26891/_update?timeout=1m], status line [HTTP/1.1 409 Conflict]
{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[_doc][26891]: version conflict, document already exists (current version [1])","index_uuid":"G_4pQ5c_QuKkmHW7w5gnDw","shard":"2","index":"testbbspost"}],"type":"version_conflict_engine_exception","reason":"[_doc][26891]: version conflict, document already exists (current version [1])","index_uuid":"G_4pQ5c_QuKkmHW7w5gnDw","shard":"2","index":"testbbspost"},"status":409}
		at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:920)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:227)
		at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1256)
		... 7 more
	Caused by: org.elasticsearch.client.ResponseException: method [POST], host [http://127.0.0.1:9200], URI [/testbbspost/_doc/26891/_update?timeout=1m], status line [HTTP/1.1 409 Conflict]
{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[_doc][26891]: version conflict, document already exists (current version [1])","index_uuid":"G_4pQ5c_QuKkmHW7w5gnDw","shard":"2","index":"testbbspost"}],"type":"version_conflict_engine_exception","reason":"[_doc][26891]: version conflict, document already exists (current version [1])","index_uuid":"G_4pQ5c_QuKkmHW7w5gnDw","shard":"2","index":"testbbspost"},"status":409}
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:540)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:529)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		... 1 more

原因:数据已经存在。之前单个测试插入保存的时候,es中已经存在部分数据。跑批时,数据索引文档编号都没有变,报出以上错误。可以对执行逻辑增加判断:如果已经存在则修改,不存在则创建。


   原创文章,转载请标明本文链接: JAVA程序多线程批量初始化ES数据时产生的异常问题以及解惑记录

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

2条评论
  • facer

    2019年4月15日 23:29

    前辈

    1. admin

      2019年4月15日 23:48

      你们是后生可畏

发表评论

电子邮件地址不会被公开。 必填项已用*标注

更多阅读