转载

基于Spring Batch向Elasticsearch批量导入数据

1.介绍

当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。

2.示例

2.1 pom.xml

本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hfcsbc.estl</groupId>
    <artifactId>es-etl</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>es-etl</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.M7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.vanroy</groupId>
            <artifactId>spring-boot-starter-data-jest</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>

    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>


</project>

2.2 实体类及repository

package com.hfcsbc.esetl.domain;

import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;

/**
 * Create by pengchao on 2018/2/23
 */
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
    @Id
    private Long id;
    private String name;
    @OneToOne
    @Field(type = FieldType.Nested)
    private Address address;
}
package com.hfcsbc.esetl.domain;

import lombok.Data;

import javax.persistence.Entity;
import javax.persistence.Id;

/**
 * Create by pengchao on 2018/2/23
 */
@Entity
@Data
public class Address {
    @Id
    private Long id;
    private String name;
}
package com.hfcsbc.esetl.repository.jpa;

import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;

/**
 * Create by pengchao on 2018/2/23
 */
public interface PersonRepository extends JpaRepository<Person, Long> {
}
package com.hfcsbc.esetl.repository.es;

import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**
 * Create by pengchao on 2018/2/23
 */
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}

2.3 配置elasticsearchItemWriter

package com.hfcsbc.esetl.itemWriter;

import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;

import java.util.List;

/**
 * Create by pengchao on 2018/2/23
 */
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {

    private EsPersonRepository personRepository;

    public ElasticsearchItemWriter(EsPersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    @Override
    public void beforeWrite(List<? extends Person> items) {

    }

    @Override
    public void afterWrite(List<? extends Person> items) {

    }

    @Override
    public void onWriteError(Exception exception, List<? extends Person> items) {

    }

    @Override
    public void beforeStep(StepExecution stepExecution) {

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }

    @Override
    public void write(List<? extends Person> items) throws Exception {
        //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引
        personRepository.saveAll(items);
    }
}

2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemReader;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

import java.util.Iterator;

/**
 * Create by pengchao on 2018/2/24
 */
public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean {

    private final ElasticsearchOperations elasticsearchOperations;

    private final SearchQuery query;

    private final Class<? extends Person> targetType;

    public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) {
        this.elasticsearchOperations = elasticsearchOperations;
        this.query = query;
        this.targetType = targetType;
    }

    @Override
    protected Iterator<Person> doPageRead() {
        return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
    }
}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;

import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

/**
 * Create by pengchao on 2018/2/23
 */
@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Autowired
    private EsPersonRepository personRepository;

    @Bean
    public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
        String sqlQuery = "select * from person";
        try {
            JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
            queryProvider.setSqlQuery(sqlQuery);
            queryProvider.setEntityClass(Person.class);
            queryProvider.afterPropertiesSet();
            reader.setEntityManagerFactory(entityManagerFactory);
            reader.setPageSize(10000);
            reader.setQueryProvider(queryProvider);
            reader.afterPropertiesSet();
            reader.setSaveState(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return reader;
    }

    @Bean
    public ElasticsearchItemWriter itemWriter(){
        return new ElasticsearchItemWriter(personRepository);
    }

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory,
                     ItemReader itemReader,
                     ItemWriter itemWriter){
        return stepBuilderFactory
                .get("step1")
                .chunk(10000)
                .reader(itemReader)
                .writer(itemWriter)
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, Step step){
        return jobBuilderFactory
                .get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(step)
                .end()
                .build();
    }

    /**
     * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
     * @param dataSource
     * @param manager
     * @return
     */
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(manager);
        jobRepositoryFactoryBean.setDatabaseType("postgres");
        try {
            return jobRepositoryFactoryBean.getObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

2.6配置数据库及es的连接地址

spring:
  redis:
    host: 192.168.1.222
  data:
    jest:
      uri: http://192.168.1.222:9200
      username: elastic
      password: changeme

  jpa:
    database: POSTGRESQL
    show-sql: true
    hibernate:
      ddl-auto: update

  datasource:
    platform: postgres
    url: jdbc:postgresql://192.168.1.222:5433/person
    username: hfcb
    password: hfcb
    driver-class-name: org.postgresql.Driver
    max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {

    public static void main(String[] args) {
        SpringApplication.run(EsEtlApplication.class, args);
    }
}
原文  http://www.wisely.top/2018/02/24/spring-batch-elasticsearch/
正文到此结束
Loading...