大数据-elasticsearch-csv文件的高效导入


本节主要对于100M以上的csv大数据文件高效导入到es中提供一种解决方案.

Elasticsearch中的数据来自于不同的数据源,如果是csv文件,则对大文件的读取有一定要求.

鲜艳的花儿与可人儿

fastcsv

FastCSV是基于Java语言的超快速和简单的CSV库,RFC 4180兼容,项目许可为Apache2.0版.
该项目的性能与opencsv及 Super CSV等项目基准测试相比,有如下参考(来自FastCSV项目的自主测试,本人并未验证):

FastCSV项目的自主测试

使用该项目请在maven项目中使用以下依赖

<dependency>
    <groupId>de.siegmar</groupId>
    <artifactId>fastcsv</artifactId>
    <version>1.0.3</version>
</dependency>

elasticsearch客户端

将数据传送给elasticsearch,其操作也需要es的java客户端的依赖包

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.6.2</version>
</dependency>

关键代码

  1. CSV文件的读取
File file = new File("foo.csv");
CsvReader csvReader = new CsvReader();
csvReader.setContainsHeader(true);

CsvContainer csv = csvReader.read(file, StandardCharsets.UTF_8);
for (CsvRow row : csv.getRows()) {
    System.out.println("First column of line: " + row.getField("name"));
}
  1. es的bulkProcessor处理
public class ESProcessor {

    private RestHighLevelClient client;
    private BulkProcessor bulkProcessor;

    /**
     * @param args
     */
    public static void main(String[] args) {
        ESProcessor esp = new ESProcessor();
        esp.queryTest();

    }

    public ESProcessor() {
        client = new RestHighLevelClient(RestClient.builder(new HttpHost(
                "127.0.0.1", 9200, "http")));// 初始化
    }

    /**
     * 创建索引
     * 
     * @param indexName
     * @throws IOException
     */
    private static void createIndex(String indexName) throws IOException {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost("es01", 9200, "http")));
        // ES 索引默认需要小写,故笔者将其转为小写
        CreateIndexRequest requestIndex = new CreateIndexRequest(
                indexName.toLowerCase());
        // 注: 设置副本数为0,索引刷新时间为-1对大批量索引数据效率的提升有不小的帮助
        requestIndex.settings(Settings.builder()
                .put("index.number_of_shards", 5)
                .put("index.number_of_replicas", 0)
                .put("index.refresh_interval", "-1"));

        // CreateIndexResponse createIndexResponse =
        // client.indices().create(requestIndex, RequestOptions.DEFAULT);
        client.close();
    }

    /**
     * 创建bulkProcessor并初始化
     * 
     * @param client
     * @return
     */
    public BulkProcessor createProcessor() {
        try {

            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    System.out.println("Try to insert data number : "
                            + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                        BulkResponse response) {
                    System.out
                            .println("************** Success insert data number : "
                                    + request.numberOfActions()
                                    + " , id: "
                                    + executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                        Throwable failure) {
                    System.out.println("Bulk is unsuccess : " + failure
                            + ", executionId: " + executionId);
                }
            };

            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (
                    request, bulkListener) -> client.bulkAsync(request,
                    RequestOptions.DEFAULT, bulkListener);

            // bulkProcessor = BulkProcessor.builder(bulkConsumer,
            // listener).build();


            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
                    listener);
            builder.setBulkActions(5000);   //每添加10000个request,执行一次bulk操作
            //builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); //每达到5M的请求size时,执行一次bulk操作
            builder.setConcurrentRequests(10); //默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
            builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(
                    TimeValue.timeValueSeconds(1L), 3));  //当ES由于资源不足发生异常EsRejectedExecutionException重試策略:默认(50ms, 8),
        
            bulkProcessor = builder.build();

        } catch (Exception e) {
            e.printStackTrace();
        }

        return bulkProcessor;
    }

    public void closeProcessor() {
        try {
            bulkProcessor.awaitClose(70L, TimeUnit.SECONDS);
            client.close();
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

100M以上大数据文件读取缓慢的解决方式

碰到缓慢的情况,先观察一下内存和CPU的使用情况,若内存使用不再升高,而CPU一直处于90%以上的使用率,请试着调节Java应用启动的内存分配:

java -Xms128m -Xmx3g ReaderApp

将初始化堆内存为 128M,最大堆内存为 3G,这样Java在每次gc后会重新分配大小,将Xmx规定的上限推高即可.

注:如果更大文件则可尝试用FastCSV的流式读取法

声明:梨木乔の技术栈|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 大数据-elasticsearch-csv文件的高效导入


梨木乔の技术栈