Indexando dados sem indisponibilidade

Indexando dados sem indisponibilidade

No artigo anterior, Indexando dados no Elasticsearch, vimos como indexar dados no Elasticsearch através de uma aplicação Ruby on Rails. Porém como o foco era apenas apresentar o processo de indexação, a performance envolvida no processo não foi abordada, e, com isso, a forma como foi implementada não é escalável para bases com centenas de milhares de registros.

Neste artigo será apresentado uma abordagem que se assemelha a um deploy blue-green, criando um novo índice que receberá os registros. E assim que o processo estiver concluído, este novo índice irá substituir o índice anterior, o que pode vir a zerar o impacto gerado pela necessidade da reindexação completa de uma base.

Antes de começarmos, algumas observações deve ser notadas. Em nosso projeto de exemplo estamos indexando um candidato por vez, ou seja, a cada requisição HTTP apenas um candidato é indexado. Isso pode ser melhorado indexando batches de registros a cada requisição HTTP. Dito isso, apenas a indexação em batches não é o suficiente para bases com centenas de milhares ou até milhões de registros, fazendo com que a indexação demore horas para ser concluída.

Entendendo o problema

Inicialmente, iremos fazer download e configurar nosso projeto executando os comandos a seguir. Este projeto é uma aplicação Ruby on Rails que possui um modelo chamado Candidate, esse é persistido em nossa base de dados local (PostgreSQL) e indexado no Elasticsearch. O projeto está descrito detalhadamente no artigo anterior.

# Faz o download do projeto
git clone git@github.com:joaofelipesus/reindex-blue-green.git
# Navega até o diretório do projeto
cd reindex-blue-green
# Faz o build do containers
docker-compose build
# Inicia os containers
docker-compose up

Você pode acessar o container em que nossa aplicação `web` está rodando e, com isso, criar nosso banco de dados e tabela candidates executando os seguintes comandos:

# Conecta ao container em que a aplicação web está rodando
docker compose exec web bash
# Cria o banco de dados
bundle exec rails db:create
# Executa as migrações pendentes
bundle exec rails db:migrate

Caso tudo corra bem ao acessar localhost:3000, será exibida a página de boas vindas do Rails. Com nosso ambiente configurado, inicialmente iremos gerar e indexar apenas cem candidatos executando as rake tasks a seguir:

# Cria cem candidatos
bundle exec rails candidates:seed
# Indexa todos os candidatos da base
bundle exec rails candidates:mass_indexation

Agora acessando http://localhost:5601/app/dev_tools#/console e executando a query GET candidates/_count, vemos que temos cem candidatos indexados em um índice chamado candidates.

Para visualizar o problema de performance, iremos aumentar o tamanho da nossa base de candidatos. Altere o número de candidatos gerados pela rake task candidates:seed de 100 para 5.000 e execute novamente as tasks candidates:seed e candidates:mass_indexation.

Note que o tempo de execução do processo de indexação subiu consideravelmente. Imagine um cenário em que temos uma feature que faz buscas em um índice Elasticsearch e os apresenta para nossos usuários. Neste contexto, caso tenhamos a necessidade de reindexar nossa base completamente devido a inserção de um novo atributo, esta funcionalidade sofrerá instabilidade, uma vez que os resultados de buscas feitas por nossos usuários irá retornar resultados apenas entre os candidatos que já foram indexados.

Processo de reindexação blue-green

Iniciaremos adicionando um novo atributo chamado favorite_language ao modelo Candidate. Devemos mapeá-lo no índice candidates que está definido no arquivo app/repositories/candidate_repository.rb. Por fim, adicionaremos alguns valores para serem salvos durante a rake task candidates:seed nesse novo atributo.

# Gerar a migration que adiciona a nova coluna
bundle exec rails g migration add-favorite-lnguage-to-candidate

# Código da migration
class AddFavoriteLnguageToCandidate < ActiveRecord::Migration[7.0]
  def change
    add_column :candidates, :favorite_language, :string, default: nil

	# Set an value to :favorite_language attribute
	Candidate.all.find_each { |c| c.update(favorite_language: %w[elixir ruby javascript].sample) }
  end
end

# Adicionar novo atributo ao mapeamento utilizado no Elasticsearch
class CandidateRepository
  # NOTE: defines the host of elasticsearch, we use elasticsearch instead of localhost due to docker network.
  ELASTICSEARCH_HOST = 'http://elasticsearch:9200/'.freeze
  # NOTE: defines the mapping with candidate attributes and its types.
  INDEX_MAPPING = {
    mappings: {
      properties: {
        name: { type: "text" },
        email: { type: "text" },
        experience_time: { type: "integer" },
        focus: { type: "text" },
        favorite_language: { type: "text" }
      }
    }
  }.freeze
  ...
end

# Adicionar valor à favorite_language durante a geração de candidatos
namespace :candidates do
  desc 'Create candidates'
  task seed: :environment do
    5_000.times do |index|
      Candidate.create!(
        name: Faker::Name.name,
        email: Faker::Internet.email,
        experience_time: rand(1..10),
        focus: %w[backend frontend fullstack].sample,
        favorite_language: %w[elixir ruby javascript].sample
      )
    puts "#{index + 1} of 5000"
  end
end

# Executar a nova migração
docker-compose exec web bash
bundle exec rails db:migrate

Agora com o novo mapeamento do índice, poderíamos deletar o índice anterior e na sequência criaríamos um novo índice com o novo atributo sendo indexado.  Entretanto, esta abordagem iria impactar nossos usuários fictícios, uma vez que as buscas retornariam resultados inconsistentes até o momento em que o índice seja indexado completamente.

Para lidar com este problema iremos utilizar uma abordagem baseada em Alias, uma vez que um alias é um ponteiro para um índice. Com isso, nossa aplicação pensa que está fazendo inserções ou buscas em um índice chamado candidates_index, porém este é o nome do alias que aponta para algum outro índice, como ilustrado na imagem abaixo.

Para que seja possível criar um novo índice de forma paralela e indexar candidatos a este novo índice, precisamos adicionar um método que crie um novo alias que aponte para um índice. Dito isso, adicionaremos uma rake task que, ao criar um novo índice, também irá criar um alias que aponte para este novo índice. Também devemos adaptar o código da task candidates:mass_indexation para que trabalhe com Alias, de forma que, ao ser executada crie o Alias de produção que aponte para o novo índice.

class CandidateRepository
  ...

  # Create an alias with received name and index.
  #
  # @param alias_name [String] alias name.
  # @param index_name [String] index name.
  def create_alias(alias_name, index_name)
    response = conn.post(
      '_aliases',
      {
        actions: [ { add: { index: index_name, alias: alias_name } } ]
      }.to_json
    )
  
    return 'Alias created' if response.status == 200
  
    response.body
  end
end

#######################################################################

# Rake tasks
desc 'Index candidates'
task mass_indexation: :environment do
  splitted_date_time = DateTime.current.to_s.split('T')
  date_values = splitted_date_time.first.split('-')
  time_values = splitted_date_time.last.split(':')
  index_name = "candidates_#{date_values[0]}_#{date_values[1]}_#{date_values[2]}_#{time_values[0]}_#{time_values[1]}"
  
  puts 'Creating index ...'
  CandidateRepository.new.create_index(index_name)
  puts 'Index created'
  
  puts 'Creating alias ...'
  CandidateRepository.new.create_alias(
    CandidateRepository::PRODUCTION_ALIAS, 
    index_name
  )
  puts 'Alias created'
  
  Candidate.all.find_each.with_index do |candidate, index|
    CandidateRepository.new.index_candidate(candidate, index_name) 
    puts "#{index + 1} of #{Candidate.count}"
  end
end

Agora iremos executar a task candidates:mass_indexation que irá criar um novo índice nomeado candidates_ seguido de um timestamp, e um alias chamado candidates_index que aponta para o novo índice. A partir do novo Alias serão feitas as interações com o índice, de forma que o índice apontado pelo Alias candidates_index pode mudar sem que haja necessidade de alteração em nossa aplicação.

Com a conclusão de tais comandos, iremos adicionar uma nova task chamada candidates:blue_green_indexation. Essa task irá criar um novo índice, também nomeado candidates_ seguido de um timestamp. Com o fim desta etapa, será criado um Alias de nome candidates_processing, que apontará para o novo índice e, ao final desta etapa, será iniciada a indexação de todos os candidatos da base.

desc 'Background indexation'
task blue_green_indexation: :environment do
  splitted_date_time = DateTime.current.to_s.split('T')
  date_values = splitted_date_time.first.split('-')
  time_values = splitted_date_time.last.split(':')
  index_name = "candidates_#{date_values[0]}_#{date_values[1]}_#{date_values[2]}_#{time_values[0]}_#{time_values[1]}"

  puts 'Creating index ...'
  CandidateRepository.new.create_index(index_name)
  puts 'Index created'

  CandidateRepository.new.create_alias(
    CandidateRepository::PROCESSING_ALIAS, 
    index_name
  )

  Candidate.all.find_each.with_index do |candidate, index|
    CandidateRepository.new.index_candidate(
      candidate, 
      CandidateRepository::PROCESSING_ALIAS
    )
    puts "#{index + 1} of #{Candidate.count}"
  end 
end

Após a execução da nova task, temos dois índices em que candidates_index emula um índice consumido por nossa aplicação para fazer buscas e candidates_processing que é um novo índice de novo mapeamento e que teve seus dados inseridos em paralelo. Desta forma, o ambiente de produção será pouco afetado.

O fato da aplicação estar interagindo com um alias, e não diretamente com o índice, nos dá a possibilidade de alterar para qual índice o alias aponta. Uma vez que os candidatos estejam indexados faremos com que o alias candidates_index passe a apontar para o novo índice e, na sequência, delete o índice anterior, como ilustrado na figura abaixo.

Iremos iniciar adicionando à classe CandidateRepository um método chamado switch_production_alias, que irá implementar o fluxo descrito acima.

class CandidateRepository
  ...
  
  # Switch indexes pointed by production and processing alias, and at the end removes old production index.
  def switch_production_alias
    production_index_name = get_index_name(PRODUCTION_ALIAS)
    processing_index_name = get_index_name(PROCESSING_ALIAS)

    point_alias_to_index(
      alias_name: PRODUCTION_ALIAS, 
      index_name: processing_index_name
    )

    remove_index_pointer(
      alias_name: PRODUCTION_ALIAS, 
      index_name: production_index_name
    )

    remove_index_pointer(
      alias_name: PROCESSING_ALIAS, 
      index_name: processing_index_name
    )
    drop_index(production_index_name)
  end

  private

  ...

  # Return the index name that received alias points.
  #
  # @param alias_name [String] alias name.
  def get_index_name(alias_name)
    response_body = conn.get("#{alias_name}/_alias").body
    JSON.parse(response_body).keys.first
  end

  # Points received alias to received index.
  #
  # @param alias_name [String] alias name.
  # @param index_name [String] index name.
  def point_alias_to_index(alias_name:, index_name:)
    conn.post(
      '_aliases',
      {
        actions: [
          {
            add: {
              index: index_name,
              alias: alias_name
            }
          }
        ]
      }.to_json
    )
  end

  # Remove index pointer from received alias.
  #
  # @param alias_name [String] alias name.
  # @param index_name [String] index name.
  def remove_index_pointer(alias_name:, index_name:)
    conn.post(
      '_aliases',
      {
        actions: [
          {
            remove: {
              index: index_name,
              alias: alias_name
            }
          }
        ]
      }
    )
  end

  # Drop received index.
  #
  # @param index_name [String] index name.
  def drop_index(index_name)
    conn.delete(index_name)
  end
end

Agora com a lógica que faz a substituição dos índices implementada, iremos criar uma nova task de nome candidates:switch_production_alias que faça esta chamada.

desc 'Switch indexes pointed by alias candidates_index'
task switch_indexes: :environment do
  puts 'Starting switch indexes'
  CandidateRepository.new.switch_production_alias
  puts 'Switch indexes finished'
end

Antes de executarmos a nova task, rode o comando GET candidates_index e guarde o nome do índice que está sendo apontado por este alias. Ao executarmos a task, note a rapidez para fazer o chaveamento entre os índices. Ao executar a chamada GET candidates_index podemos confirmar que o índice apontado pelo alias candidates_index de fato mudou.

Conclusão

A abordagem proposta neste artigo nos permite criar e preencher um novo índice enquanto a aplicação continua a utilizar o índice anterior. Uma vez concluído o processamento dos dados, podemos executar a lógica que troca o índice apontado pelo alias de produção, removendo o índice anterior, sem qualquer impacto na produção durante essa operação. Essa transição ocorre em uma janela de execução muito curta, consistindo em apenas algumas chamadas HTTP ao Elasticsearch.

Alguns pontos que valem ser mencionados são: o fato de não ter sido utilizado Sidekiq para processar as chamadas em background e a não utilização da indexação em batches que é suportada pelo Elasticsearch. Vale ressaltar que em aplicações reais o uso destas ferramentas é válido em conjunto com o processo de indexação em paralelo sugerido neste artigo.

Todos os códigos utilizados durante o artigo estão presentes neste repositório.

Sucesso!

💡
As opiniões e comentários expressos neste artigo são de propriedade exclusiva de seu autor e não representam necessariamente o ponto de vista da Revelo.

A Revelo Content Network acolhe todas as raças, etnias, nacionalidades, credos, gêneros, orientações, pontos de vista e ideologias, desde que promovam diversidade, equidade, inclusão e crescimento na carreira dos profissionais de tecnologia.