Estendendo o OutputStream do Apache Spark Structured Streaming

Apache Spark Structure Streaming
13 minutos para ler

Recentemente escrevi uma série de artigos explicando o funcionamento da API de streams do Apache Spark, o Structured Streaming (veja parte1). Por meio desta API é possível processar alta volumetria de dados de forma contínua, modelando sistemas que processam dados em ‘tempo real’. Esse foi o tema da minha palestra no QConSP desse ano (assista palestra na íntegra aqui!)

Algumas pessoas pediram para mostrar mais detalhes sobre como estender a funcionalidade do OutputStream do Structured Streaming, esse será o foco desse artigo, entretanto, vou dar uma breve introdução sobre a tecnologia antes de passarmos para uma análise mais aprofundada dos OutputStreams.

Vou passar de forma introdutória sobre os principais pontos da arquitetura do Structured Streaming, para um guia detalhado do funcionamento com exemplos, confira a parte2 da série de artigos sobre o tema.

Apresentando: Apache Spark Structured Streaming

Para o processamento de streams o Apache Spark disponibiliza um conjunto de bibliotecas chamado Structured Streaming. Como mencionado anteriormente trata-se de uma API de processamento, o que significa que o Structured Streaming não faz o papel de um Stream Server, como Kafka ou Knesis. A API Structured Streaming está disponível a partir da versão 2.X do Apache Spark, construída sobre as bibliotecas do Spark SQL, o que permite que sejam utilizadas as APIs dos Dataframe e Datasets (estruturas de dados distribuídas do Spark) e também executar instruções SQL sobre os dados do streaming.

Uma das grandes vantagens da API de processamento de streams, que faz parte do ecossistema do Apache Spark, é facilitar o desenvolvimento de aplicações que sejam tolerante a falhas e altamente escaláveis.

Diferentemente de uma tarefa tradicional do Apache Spark que aloca os recursos necessários, executa seu processamento e encerra, uma tarefa utilizando a API do Structured Streaming fica em execução continuamente, por tanto os recursos também estarão alocados enquanto a tarefa estiver em execução, proporcionando assim a possibilidade de processar os dados de forma contínua, à medida que os dados forem chegando nas plataformas de stream.

Internamente as consultas realizadas pelo Structured Streaming são executadas por uma engine de processamento de pequenas parcelas de dados, chamados de micro-batches. Essa engine de processamento executa pequenas tarefas continuamente e de acordo com a documentação do Spark com latência inferior a 100 milissegundos de ponta-a-ponta na arquitetura do cluster. A Figura 1 ilustra os dados chegando para uma aplicação de processamento de streams:


Figura 1. Dados de entrada InputStream sendo continuamente agregados a uma table para processamento (fonte: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts).

Os dados são recebidos por meio da definição de um InputStream que vai ler continuamente as informações de entrada e alimentar uma tabela temporária, chamada de Unbounded Table, na qual as operações de manipulação de dados são executadas. O resultado do processamento é direcionado para um OuputStream que vai persistir os dados em algum dispositivo para armazenamento (storage). Essa é a arquitetura básica de uma aplicação Structured Streaming.

Os tipos de InputStream que são suportados pela plataforma são: File Source e Kafka Source. Já os tipos de OuputStreams suportados, conhecidos como OutputSinks, são: File Sink, Kafka Sink, Foreach Sink (para estender), Console Sink (para debug). Os nomes das entradas e saídas de dados são auto-explicativos nesse caso. 

Como pôde ser observado, não temos muitas opções de Output Sinks implementados por padrão na plataforma. O que fazer se for necessário despejar o resultado do processamento em outro dispositivo que não seja um sistema de arquivos distribuídos HDFS ou um outro stream?

Estendendo o OutputStream e criando um novo OutputSink

Dependendo dos requisitos de sua aplicação, as opções de Output padrão do Apache Spark podem não ser suficientes. Nem todos desejam armazenar o resultado do processamento dos streams em um HDFS ou mesmo despejar em outro stream. E se sua necessidade for diferente? Por exemplo: armazenar a saída de dados em banco relacional, um cache distribuído ou em algum NoSQL?

Podemos estender as funcionalidades do OutputStream por meio da implementação de um ForEach Sink. Nesse ponto podemos fazer a integração com a tecnologia que necessitarmos. Na minha opinião a possibilidade de especializar a solução por meio da extensão foi uma decisão de design muito inteligente na hora da construção da plataforma, pois permite que seja bastante flexível para atender necessidades específicas dos usuários.

Para estender o OutputSink precisamos implementar a classe abstrata ForeachWriter. Essa classe é responsável por consumir os dados gerados por uma StreamingQuery. Tipicamente utilizada para enviar os dados gerados para sistemas externos, essa classe deve ser responsável por abrir conexões, transações e outros recursos relacionados com os sistemas externos ao Apache Spark.

Para ilustrar o uso do ForeachWriter, vou usar o exemplo do case de processamento e agregação de dados, no qual a saída é persistida em banco de dados relacional. Para mais detalhes sobre o case em questão confira a parte 3 da série de artigos sobre Apache Spark Strucutured Streaming. 

Métodos importantes que devem ser implementados da classe ForeachWriter:

  • open: chamado quando inicia o processamento dos dados de uma partição. Uma observação importante é que na recuperação de falhas, é possível que os registros sejam gerados multiplas vezes, porém com a mesma versão;
  • process: chamado para processar os dados efetivamente, só executa depois do método open ser executado corretamente e voltar true;
  • close: chamado deseja interromper o processamento dos dados. Em caso de falha no processo da JVM esse método pode não ser chamado. 

Nesse exemplo, será apresentada a classe ForeachWriter que vai abrir uma conexão com o banco relacional (PostgreSQL) e gravar o resultado do OutputStream de processamento. Todo código a seguir está em linguagem Scala.

O método a seguir apresenta a lógica de persistência de informação, nesse caso, assume-se que os dados serão atualizados no banco relacional, caso nenhum registro seja atualizado, então os dados são inseridos, de forma a garantir a persistência da informação: 

/**
    * Persiste uma linha na base de dados.
    * @param row
    */
  def persist(row : Row) : Unit = {
    val id = createKey(row)
    var affectedRows:Int = updateRecord(statement, id, row ,version)
    if (affectedRows <= 0) {
      try {
        printlnStr(stamp + " Falha no update, executando insert...")
        insertRecord(statement, id, row, version)
      } catch {
        case e: Exception => {
          printlnStr(stamp + " [CONFLICT] conflito no insertt: " + e.getCause)
          affectedRows = updateRecord(statement, id, row, version)
          printlnStr(stamp + " Linhas afetadas após update: " + affectedRows)
        }
      }
    }
  }

O objeto row passado como argumento do método persist é o registro resultante do processamento da StreamingQuery. Para esse caso, dados sendo processados em “tempo real”.

Para que os dados possam ser posteriormente atualizados, é importante conhecer a chave primária dos registros no banco relacional para saber o que será atualizado. Mas uma vez que se realiza processamento distribuído e não há leitura prévia da informação, como saber as chaves dos registros? Para resolver esse problema, a chave dos registros é um função de hash (SHA1), gerada com base nos dados de cada registro em tempo de execução por meio da seguintes funções: 

 def hash(s: String) = {
    MessageDigest.getInstance("SHA1").digest(s.getBytes("UTF-8")).map("%02x".format(_)).mkString
  }

  def createKey(row: Row) = {
    // carrier_id + start_date + end_date
    hash(row(0).toString + "-" + row(3)+ "-" + row(4))
  }

A função createKey(Row) retorna uma chave de hash para o registro em questão, deve-se utilizar os campos que realmente caracterizam o registro, dessa forma toda vez que o mesmo registro for processado vai gerar a mesma chave, utilizada como chave primária no banco relacional.

Os métodos de implementação obrigatória do ForeachStream podem ser vistos no fragmento de código a seguir:

/*** the interface methods ***/
  override def open(partitionId: Long, version: Long): Boolean = {
    stamp = "[partition: " + partitionId + "- v" + version + "]"
    printlnStr(stamp + "[BEGIN] creating jdbc connection")
    Class.forName(driver)
    connection = DriverManager.getConnection(url, user, pwd)
    statement = connection.createStatement()
    this.version = version
    true
  }

  override def process(row: Row): Unit = {
    val recordInstant = row.getTimestamp(3).toInstant
    if (validInstant(recordInstant)){
      printlnStr(stamp + "[PERSIST] row: " + row)
      persist(row)
    } else {
      printlnStr(stamp + "[NOT PERSISTED] record is older than 2h to be persisted: " + recordInstant)
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    if (!connection.isClosed) {
      printlnStr(stamp + " [CLOSE] Closing JDBC connection, partition: " + version)
      connection.close()
    } else {
      printlnStr(stamp + " [CLOSE] Connection already closed")
    }
  }

Nesse exemplo, durante a execução do método open é estabelecida a conexão com o banco de dados relacional, no process é realizada a persistência dos dados, os registros antigos são descartados usando o método validInstant. Por fim, o método close que encerra a conexão com o banco relacional.

Colocando tudo junto, temos a classe completa em detalhes, chamada JDBCSink. Essa classe possui os métodos geradores da chave, os outros responsáveis pela persistência e os métodos da interface, confira todo código:

package com.movile.stream

import java.security.MessageDigest
import java.sql.{Connection, DriverManager, Statement}
import java.time.{Duration, Instant}

import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.{ForeachWriter, Row}

class JDBCSink(url: String, user: String, pwd: String) extends ForeachWriter[Row]{

  val driver = "org.postgresql.Driver"
  val table = "transaction_log_consolidated"
  var connection:Connection = _
  var statement:Statement = _
  var version:Long = 0
  var stamp:String = _

  def hash(s: String) = {
    MessageDigest.getInstance("SHA1").digest(s.getBytes("UTF-8")).map("%02x".format(_)).mkString
  }

  def createKey(row: Row) = {
    // carrier_id + start_date + end_date
    hash(row(0).toString + "-" + row(3)+ "-" + row(4))
  }

  def printlnStr(str: String) = {
      // println(str) -- disable print logs for while
  }

  def updateRecord(statement: Statement, id:String, row: Row, version:Long) : Int =  {
    val sql = "UPDATE " + table + " SET " +
      "carrier_id=" + row(0) +
      ", hour_of_day=" + row(2) +
      ", start_date='" + row(3) + "'" +
      ", end_date='" + row(4) + "'" +
      ", avg_response_time=" + row(5) +
      ", successful_charges=" + row(6) +
      ", no_credit=" + row(7) +
      ", error=" + row(8) +
      ", total_attempts=" + row(9) +
      ", batch_version=" + version +
      ", control_date = DEFAULT" +
      " WHERE id='" + id + "';"

    printlnStr(stamp + "[UPDATE] " + sql)
    statement.executeUpdate(sql)
  }

  def insertRecord(statement: Statement, id:String, row: Row, version:Long) : Int =  {
     val sql = "INSERT INTO " + table + " (id, carrier_id, hour_of_day, start_date, end_date, avg_response_time, successful_charges, no_credit, error, total_attempts, batch_version) " +
       "VALUES('" + id + "', " +
       row(0) + ", " +
       row(2) + ", '" +
       row(3) + "', '" +
       row(4) + "', " +
       row(5) + ", " +
       row(6) + ", " +
       row(7) + ", " +
       row(8) + " , " +
       row(9) + ", " +
       version + ");"

     //println(stamp + "[INSERT] " + sql)
     statement.executeUpdate(sql)
  }

  /**
    *
    * @param recordInstant the time instant of the record creation
    * @return true if record was created up to 2 hours ago or a fresh new record
    * */
  def validInstant(recordInstant: Instant): Boolean = {
    val threshold = -120; // 2 hours
    val today = Instant.now()
    val duration = Duration.between(today, recordInstant)
    printlnStr(today + " - " + recordInstant + " = " + duration.toMinutes + "minutes")

    if (duration.toMinutes >= threshold)
      true //valid record
    else
      false // old record, it is not valid for update
  }

  /**
    * persist a row to the database
    * @param row
    */
  def persist(row : Row) : Unit = {
    val id = createKey(row)
    var affectedRows:Int = updateRecord(statement, id, row ,version)
    if (affectedRows <= 0) {
      try {
        printlnStr(stamp + " Not updated, executing insert instead...")
        insertRecord(statement, id, row ,version)
      } catch {
        case e: Exception => {
          printlnStr(stamp + " [CONFLICT] insert conflict: " + e.getCause)
          affectedRows = updateRecord(statement, id, row ,version)
          printlnStr(stamp + " Rows affected after update: " + affectedRows)
        }
      }
    }
  }

  /*** the interface methods ***/
  override def open(partitionId: Long, version: Long): Boolean = {
    stamp = "[partition: " + partitionId + "- v" + version + "]"
    printlnStr(stamp + "[BEGIN] creating jdbc connection")
    Class.forName(driver)
    connection = DriverManager.getConnection(url, user, pwd)
    statement = connection.createStatement()
    this.version = version
    true
  }

  override def process(row: Row): Unit = {
    val recordInstant = row.getTimestamp(3).toInstant
    if (validInstant(recordInstant)){
      printlnStr(stamp + "[PERSIST] row: " + row)
      persist(row)
    } else {
      printlnStr(stamp + "[NOT PERSISTED] record is older than 2h to be persisted: " + recordInstant)
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    if (!connection.isClosed) {
      printlnStr(stamp + " [CLOSE] Closing JDBC connection, partition: " + version)
      connection.close()
    } else {
      printlnStr(stamp + " [CLOSE] Connection already closed")
    }
  }
}

Para utilizar a classe, vamos especificar no OutputStream da seguinte forma, inicialmente fazendo a leitura dos parâmetros de conexão com o banco relacional, depois instanciando a classe JDBCSink e informando no WriteStream, parâmetro foreach a classe:

val username = properties.getString("postgres.username")
    val password = properties.getString("postgres.password")
    val resource = properties.getString("postgres.url")

    import org.apache.spark.sql.streaming.Trigger
    val jdbcWriter = new JDBCSink(resource, username, password)

    val foreachStream = query
      .writeStream
      .foreach(jdbcWriter)
      .outputMode(OutputMode.Update())
      .trigger(Trigger.ProcessingTime("2 minute"))
      .option("checkpointLocation", '<YOUR_LOCATION>')
      .start

Considerações finais

Foi mostrado nesse artigo como estender um OutputStream do Apache Spark Structured Streaming. 

A criação do ponto de extensão na plataforma Apache Spark por meio da classe ForeachWriter permite ao desenvolvedor conectar o resultado da saída do processamento da ferramenta de stream a qualquer outro sistema externo, como: bancos de dados relacionais, não relacionais, caches e qualquer outra necessidade do desenvolvedor.

Por meio do exemplo apresentado, qualquer desenvolvedor que necessite salvar os dados de saída de um stream em banco de dados relacional pode se beneficiar da classe apresentada, bastando apenas implementar os comandos SQL para salvar e inserir dados em sua tabela específica. 

Uma observação importante quanto a implementação é que o foco principal foi mostrar um exemplo funcional, o que não significa que o código está pronto para produção. Por exemplo, há necessidade de sanitizar os campos, como usar um PreparedStatement para melhorar a segurança e desempenho dos comandos SQL de forma a evitar SQL Injection, e assim por diante.

Dica: É importante salientar que o driver de acesso ao banco de dados (arquivo jar) deve ser adicionado no classpath junto à instalação do Apache Spark no cluster para acessar o recurso externo.

Sobre o Autor

Eiti Kimura é Coordenador de TI e Arquiteto de Sistemas Distribuídos de alto desempenho na Wavy. Eiti tem 17 anos de experiência em desenvolvimento de software. É entusiasta de tecnologias open-source, MVP do Apache Cassandra desde 2014 e tem vasta experiência com sistemas de back-end, em especial plataformas de tarifação e mensageria para as principais operadoras de telefonia do Brasil. 


Você também pode gostar

Deixe uma resposta

-