본문 바로가기

백엔드 엔지니어링 일지

마켓 백엔드 엔진 12 : ElasticSearch - 1

 

ElasticSearch를 단계적으로 도입합니다.

 

Docker Compose에 ES 추가

 

 

SpringBoot 의존성 설정

 

build.gradle 

 

application.yml

 

common/config/ElasticsearchStartupVerifier.java

package com.marketengine.backend.common.config;

import java.io.IOException;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.InfoResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * Step 1: confirms the app can reach Elasticsearch at startup.
 * Search features are added in later steps.
 */
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "marketengine.elasticsearch.verify-on-startup", havingValue = "true", matchIfMissing = true)
public class ElasticsearchStartupVerifier implements ApplicationListener<ApplicationReadyEvent> {

    private final ElasticsearchClient elasticsearchClient;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        try {
            InfoResponse info = elasticsearchClient.info();
            log.info(
                    "Elasticsearch connected: cluster={}, version={}",
                    info.clusterName(),
                    info.version().number()
            );
        } catch (IOException exception) {
            throw new IllegalStateException(
                    "Elasticsearch is not reachable at spring.elasticsearch.uris. "
                            + "Start the cluster (e.g. docker compose up -d elasticsearch) "
                            + "or set marketengine.elasticsearch.verify-on-startup=false.",
                    exception
            );
        }
    }
}

 

클러스터 연결만 확인합니다.

 

백엔드 구동 - ElasticSearch 연결 확인했습니다.

 

이제 Document 모델을 구현합니다.

Document는 엘라스틱서치에 물리적으로 적재되는 Json 데이터입니다.

 

디렉토리: product/infrastructure/search

 

ProductDocument.java

package com.marketengine.backend.product.infrastructure.search;

import java.math.BigDecimal;
import java.time.OffsetDateTime;

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Setting;

import com.marketengine.backend.product.domain.Product;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
 * Elasticsearch read model for product list/search (step 2: mapping only; no queries yet).
 * Document id equals PostgreSQL {@link Product#getId()}.
 */
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Document(indexName = ProductDocument.INDEX_NAME)
@Setting(replicas = 0)
public class ProductDocument {

    public static final String INDEX_NAME = "products";

    @Id
    private Long id;

    @Field(type = FieldType.Text)
    private String name;

    @Field(type = FieldType.Double)
    private BigDecimal priceAmount;

    @Field(type = FieldType.Integer)
    private int stockQuantity;

    @Field(type = FieldType.Keyword)
    private String category;

    @Field(type = FieldType.Keyword)
    private String brand;

    @Field(type = FieldType.Keyword)
    private String color;

    @Field(type = FieldType.Keyword)
    private String gender;

    @Field(type = FieldType.Integer)
    private int popularityScore;

    @Field(type = FieldType.Date)
    private OffsetDateTime createdAt;

    public ProductDocument(
            Long id,
            String name,
            BigDecimal priceAmount,
            int stockQuantity,
            String category,
            String brand,
            String color,
            String gender,
            int popularityScore,
            OffsetDateTime createdAt
    ) {
        this.id = id;
        this.name = name;
        this.priceAmount = priceAmount;
        this.stockQuantity = stockQuantity;
        this.category = category;
        this.brand = brand;
        this.color = color;
        this.gender = gender;
        this.popularityScore = popularityScore;
        this.createdAt = createdAt;
    }

    public static ProductDocument from(Product product) {
        return new ProductDocument(
                product.getId(),
                product.getName(),
                product.getPriceAmount(),
                product.getStockQuantity(),
                product.getCategory().name(),
                product.getBrand(),
                product.getColor(),
                product.getGender(),
                product.getPopularityScore(),
                product.getCreatedAt()
        );
    }
}

 

Document 정의

Index Name: products 

 

ProductSearchConfig.java

package com.marketengine.backend.product.infrastructure.search;

import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;

@Configuration
@EnableElasticsearchRepositories(basePackages = "com.marketengine.backend.product.infrastructure.search")
public class ProductSearchConfig {
}

ES Repo 디렉토리 지정 config

 

ProductSearchRepository.java

package com.marketengine.backend.product.infrastructure.search;

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**
 * Step 2: repository for index CRUD; search queries are added in later steps.
 */
public interface ProductSearchRepository extends ElasticsearchRepository<ProductDocument, Long> {
}

 

 

ProductIndexInitializer.java

package com.marketengine.backend.product.infrastructure.search;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * Step 2: create the {@value ProductDocument#INDEX_NAME} index and mapping if missing.
 */
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "marketengine.elasticsearch.index.ensure-on-startup", havingValue = "true", matchIfMissing = true)
public class ProductIndexInitializer implements ApplicationListener<ApplicationReadyEvent> {

    private final ElasticsearchOperations elasticsearchOperations;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        IndexOperations indexOperations = elasticsearchOperations.indexOps(ProductDocument.class);
        if (indexOperations.exists()) {
            log.info("Elasticsearch index already exists: {}", ProductDocument.INDEX_NAME);
            return;
        }
        indexOperations.createWithMapping();
        log.info("Elasticsearch index created: {}", ProductDocument.INDEX_NAME);
    }
}

 

ProductDocument 기준으로 index 및 mapping 생성

 

RDBMS에 대응하면

Index - Table

Document - Row

Mapping - 스키마 정의 방식입니다.

 

elasticsearch - products 인덱스 생성 확인

 

 

이제 데이터를 ES에 적재합니다.

 

스프링부트에서 reindex 프로필을 설정해서 웹서버 없이 reindex만 하고 종료할겁니다.

reindex는 환경에 따라서 긴 시간이 소요될 수 있습니다. 

 

application-reindex.yml

# Bulk reindex profile: no HTTP server, run once and exit.
spring:
  main:
    web-application-type: none

marketengine:
  elasticsearch:
    reindex:
      batch-size: 5000
      log-every-batches: 20

 

ProductBulkReindexer.java

package com.marketengine.backend.product.infrastructure.search;

import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * Step 3: stream products from PostgreSQL and bulk-index into Elasticsearch.
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductBulkReindexer {

    private static final String COUNT_SQL = "SELECT COUNT(*) FROM products";

    private static final String BATCH_SQL = """
            SELECT id, name, price_amount, stock_quantity, category, brand, color, gender,
                   popularity_score, created_at
            FROM products
            WHERE id > ?
            ORDER BY id
            LIMIT ?
            """;

    private static final RowMapper<ProductDocument> ROW_MAPPER = (resultSet, rowNum) -> new ProductDocument(
            resultSet.getLong("id"),
            resultSet.getString("name"),
            resultSet.getBigDecimal("price_amount"),
            resultSet.getInt("stock_quantity"),
            resultSet.getString("category"),
            resultSet.getString("brand"),
            resultSet.getString("color"),
            resultSet.getString("gender"),
            resultSet.getInt("popularity_score"),
            resultSet.getObject("created_at", OffsetDateTime.class)
    );

    private final JdbcTemplate jdbcTemplate;
    private final ElasticsearchOperations elasticsearchOperations;

    @Value("${marketengine.elasticsearch.reindex.batch-size:5000}")
    private int batchSize;

    @Value("${marketengine.elasticsearch.reindex.log-every-batches:20}")
    private int logEveryBatches;

    public ReindexResult reindex(boolean recreateIndex) {
        Instant startedAt = Instant.now();

        if (recreateIndex) {
            recreateIndex();
        }

        long totalRows = jdbcTemplate.queryForObject(COUNT_SQL, Long.class);
        log.info("Starting product reindex: totalRows={}, batchSize={}", totalRows, batchSize);

        IndexCoordinates index = IndexCoordinates.of(ProductDocument.INDEX_NAME);
        long lastId = 0L;
        long indexed = 0L;
        int batchNumber = 0;

        while (true) {
            List<ProductDocument> batch = jdbcTemplate.query(BATCH_SQL, ROW_MAPPER, lastId, batchSize);
            if (batch.isEmpty()) {
                break;
            }

            elasticsearchOperations.save(batch, index);
            batchNumber++;
            indexed += batch.size();
            lastId = batch.get(batch.size() - 1).getId();

            if (batchNumber % logEveryBatches == 0 || indexed >= totalRows) {
                log.info("Reindex progress: indexed={}/{} ({}%)", indexed, totalRows, percent(indexed, totalRows));
            }
        }

        elasticsearchOperations.indexOps(ProductDocument.class).refresh();

        Duration duration = Duration.between(startedAt, Instant.now());
        log.info("Product reindex finished: indexed={}, duration={}s", indexed, duration.toSeconds());
        return new ReindexResult(indexed, duration);
    }

    private void recreateIndex() {
        IndexOperations indexOperations = elasticsearchOperations.indexOps(ProductDocument.class);
        if (indexOperations.exists()) {
            indexOperations.delete();
            log.info("Deleted Elasticsearch index: {}", ProductDocument.INDEX_NAME);
        }
        indexOperations.createWithMapping();
        log.info("Created Elasticsearch index: {}", ProductDocument.INDEX_NAME);
    }

    private static int percent(long done, long total) {
        if (total == 0L) {
            return 100;
        }
        return (int) Math.min(100L, (done * 100L) / total);
    }

    public record ReindexResult(long indexedCount, Duration duration) {
    }
}

 

JDBC RowMapper로 JDBC ResultSet -> ProductDocument 변환

(JPA로 읽는것보다, JDBC 배치로 읽는것이 훨씬 효율적)

 

배치크기 - 5000건

ElasticsearchOperations로 적재 요청을 보냅니다.

 

 

ProductReindexApplicationRunner.java

package com.marketengine.backend.product.infrastructure.search;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * Step 3: run with {@code --spring.profiles.active=reindex} (see scripts/elasticsearch/run-reindex-products.ps1).
 * Optional flag: {@code --recreate-index} deletes and recreates the products index before bulk load.
 */
@Slf4j
@Profile("reindex")
@Component
@RequiredArgsConstructor
public class ProductReindexApplicationRunner implements ApplicationRunner {

    private final ProductBulkReindexer bulkReindexer;
    private final ConfigurableApplicationContext applicationContext;

    @Override
    public void run(ApplicationArguments args) {
        boolean recreateIndex = args.containsOption("recreate-index");
        try {
            ProductBulkReindexer.ReindexResult result = bulkReindexer.reindex(recreateIndex);
            log.info(
                    "Reindex job complete: indexed={}, duration={}s",
                    result.indexedCount(),
                    result.duration().toSeconds()
            );
        } catch (RuntimeException exception) {
            log.error("Reindex job failed", exception);
            exit(1);
        }
        exit(0);
    }

    private void exit(int code) {
        int exitCode = SpringApplication.exit(applicationContext, () -> code);
        System.exit(exitCode);
    }
}

 

Reindex 실행 ApplicationRunner입니다.

recreate 여부를 입력받아 BulkReindexer를 실행합니다

 

cd backend

# 이어서 적재 (기존 문서는 id 기준 upsert)
.\gradlew.bat bootRun --no-daemon --args="--spring.profiles.active=reindex"

# 인덱스 재생성 + 전량 적재 (첫 bulk / 매핑 바꾼 뒤)
.\gradlew.bat bootRun --no-daemon --args="--spring.profiles.active=reindex --recreate-index"

 

천만건 적재에 28분 21초 소요됐습니다.

 

flyway에서 생성한 데이터들은 ES에 추가되지 않습니다. 따라서 pg를 reset한다면, reindex 과정도 한번 필요합니다.

그 이후 서버 구동 중 개별 상품 CUD에 대해서 ES 자동 동기화 코드를 적용합니다.

 

ProductSearchIndexer.java

package com.marketengine.backend.product.infrastructure.search;

import org.springframework.stereotype.Component;

import com.marketengine.backend.product.domain.Product;
import com.marketengine.backend.product.domain.ProductRepository;

import lombok.RequiredArgsConstructor;

/**
 * Step 4: keep Elasticsearch {@link ProductDocument} in sync with PostgreSQL on CUD.
 * List/search API still uses PostgreSQL until step 5.
 */
@Component
@RequiredArgsConstructor
public class ProductSearchIndexer {

    private final ProductSearchRepository productSearchRepository;
    private final ProductRepository productRepository;

    public void index(Product product) {
        Product source = resolveProductForIndex(product);
        productSearchRepository.save(ProductDocument.from(source));
    }

    public void delete(Long productId) {
        productSearchRepository.deleteById(productId);
    }

    /**
     * {@code created_at} is DB-generated ({@code insertable = false}); reload once after insert if needed.
     */
    private Product resolveProductForIndex(Product product) {
        if (product.getCreatedAt() != null || product.getId() == null) {
            return product;
        }
        return productRepository.findById(product.getId()).orElse(product);
    }
}

 

필요시 product reload 후 ES Repo에 요청 보냅니다.

 

service: save, update, delete -> flush -> indexer 코드 추가

 

이제 데이터 적재 및 CUD 동기화까지 완료했습니다.

검색을 postgreSQL -> ElasticSearch로 전환합니다.