[JAVA] Try using Axon Framework

I happened to have the opportunity to implement an in-house business flow. It's like application → approval by the person concerned → approval by the final manager.

Normally, you can develop with CRUD architecture with MVC of Spring, but I want to make it a CQRS + Event Sourcing architecture that I have been interested in for a long time, so I will evaluate the Axon Framework that realizes it.

Axon Framework

In addition, from version 4, a server for Microservices environment called Axon Server has been added, and it seems that it is recommended to cooperate with Axon Server and Axon Framework, but this time I will not handle it because I want to understand Axon Framework.

architecture

Below is a diagram of the version 3 documentation, which is easier to understand (I think) than the latest diagram.

image.png

Processing sequence

An image understood from Axon documentation and samples.

command

コマンド・シーケンス

  1. Send commands to Aggregate via Command Bus provided by Axon.
  2. An instance of Aggregate corresponding to the command is created. If the corresponding Aggregate event has already been recorded, the event will be applied (played) to the instance.
  3. Aggregate receives the command and sends an event to Event Bus according to the state of Aggregate.
  4. Event Bus records the event and sends it to Aggregate.
  5. Aggregate changes its state to the content according to the event.

Query

クエリ・シーケンス

  1. The Event Processor detects that the event was recorded in the Event Store and sends the event to the Thin Data Layer.
  2. The Thin Data Layer stores data in a format that is easy to query.
  3. The query is sent to the Thin Data Layer via Query Bus.
  4. Thin Data Layer retrieves data from the Data Store and returns it.

sample

First, let's implement a process that just CRUDs the application for evaluation.

1. Dependencies

Add axon-spring-boot-starter (1) to the Spring Boot application. Since Axon Server is not used this time, axon-server-connector is excluded. Otherwise, you will get an error or warning about connection failure to Axon Server while the application is running.

JPA and JDBC are provided as engines for accessing the Event Store in Axon Framework. Since the processing to the Event Store is hidden by the Axon Framework, which one to choose will be a trade-off with the implementation method of Thin Data Layer. This time, we have added spring-boot-starter-data-jpa (②) to the JPA engine, which requires minimal setup.

pom.xml(Excerpt)


  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.6.RELEASE</version>
    <relativePath/>
  </parent>

  <properties>
    <java.version>11</java.version>
    <kotlin.version>1.3.71</kotlin.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.axonframework</groupId>
      <artifactId>axon-spring-boot-starter</artifactId> <!-- ① -->
      <version>4.3.2</version>
      <exclusions>
        <exclusion>
          <groupId>org.axonframework</groupId>
          <artifactId>axon-server-connector</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- Web -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>

    <!-- DB -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId> <!-- ② -->
    </dependency>
    <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
      <scope>runtime</scope>
    </dependency>

2. Aggregate This is the central part of business logic in Axon Framework. Since Aggregate is said to be a transaction boundary in DDD, it is likely that events will be managed on an Aggregate basis.

2.1. Commands and events

In the command, specify the identifier of the Aggregate to be sent in the property to which @TargetAggregateIdentifier (1) was assigned. On the other hand, there is no such specification because the event is sent to the same Aggregate instance that received the command, or to the instance-independent Thin Data Layer.

message.kt


interface IInvoiceBody {
  val invoiceTitle: String
}

interface IInvoiceHeader {
  val invoiceId: UUID
}

interface IInvoice: IInvoiceHeader, IInvoiceBody

data class CreateInvoiceCommand(
  @field:TargetAggregateIdentifier // ①
  override val invoiceId: UUID,
  val body: IInvoiceBody): IInvoiceBody by body, IInvoice

data class UpdateInvoiceCommand(
  @field:TargetAggregateIdentifier // ①
  override val invoiceId: UUID,
  val body: IInvoiceBody): IInvoiceBody by body, IInvoice

data class RemoveInvoiceCommand(
  @field:TargetAggregateIdentifier // ①
  override val invoiceId: UUID
): IInvoiceHeader

data class InvoiceCreatedEvent(private val invoice: IInvoice): IInvoice by invoice

data class InvoiceUpdatedEvent(private val invoice: IInvoice): IInvoice by invoice

data class InvoiceRemovedEvent(
  override val invoiceId: UUID
): IInvoiceHeader

2.2. Aggregate Assign @AggregateIdentifier (1) to the property corresponding to @TargetAggregateIdentifier of the command. @Aggregate (②) is an annotation for Spring linkage, and also has the function of @Component annotation.

Assign @CommandHandler (③) to the method that processes the command. The command to create a new Aggregate is received by the constructor. However, the default constructor is also required when processing subsequent commands, as it only replays the event after instantiation.

The event is received by the method with @EventSourcingHandler (④) and the Aggregate status is updated. Call AggregateLifecycle.markDeleted () at the end of Aggregate life event.

InvoiceAggregate.kt


@Aggregate // ②
class InvoiceAggregate(): IInvoice {
  @AggregateIdentifier // ①
  override lateinit var invoiceId: UUID
  override lateinit var invoiceTitle: String

  @CommandHandler // ③
  constructor(command: CreateInvoiceCommand): this() {
    AggregateLifecycle.apply(InvoiceCreatedEvent(command))
  }

  @EventSourcingHandler // ④
  fun on(event: InvoiceCreatedEvent) {
    invoiceId = event.invoiceId
    invoiceTitle = event.invoiceTitle
  }

  @CommandHandler // ③
  fun handle(command: UpdateInvoiceCommand) {
    AggregateLifecycle.apply(InvoiceUpdatedEvent(command))
  }

  @EventSourcingHandler // ④
  fun on(event: InvoiceUpdatedEvent) {
    invoiceTitle = event.invoiceTitle
  }

  @CommandHandler // ③
  fun handle(command: RemoveInvoiceCommand) {
    AggregateLifecycle.apply(InvoiceRemovedEvent(command.invoiceId))
  }

  @EventSourcingHandler // ④
  fun on(event: InvoiceRemovedEvent) {
    AggregateLifecycle.markDeleted() //⑤
  }
}
  1. Thin Data Layer The event recorded in the Event Store is delivered to the method with @EventHandler (1). Please note that it is different from @EventSourcingHandler. The query is received by the method with @QueryHandler (②), the data corresponding to the condition is acquired, and it is returned.

InvoiceService.kt


@Service
class InvoiceService(val invoiceRepo: InvoiceRepository) {
  class AllInvoicesQuery

  @QueryHandler // ②
  fun handle(query: AllInvoicesQuery): List<InvoiceEntity> {
    return invoiceRepo.findAll()
  }
  @EventHandler // ①
  fun on(event: InvoiceCreatedEvent) {
    invoiceRepo.save(InvoiceEntity(event.invoiceId, event.invoiceTitle))
  }
  @EventHandler // ①
  fun on(event: InvoiceUpdatedEvent) {
    invoiceRepo.save(InvoiceEntity(event.invoiceId, event.invoiceTitle))
  }
  @EventHandler // ①
  fun on(event: InvoiceRemovedEvent) {
    invoiceRepo.deleteById(event.invoiceId)
  }
}
  1. Controller This is a Controller in Spring MVC that translates an HTTP request into a command and sends it to the Command Bus or Query Bus using the Command Gateway or Query Gateway. Sending commands to CommandGateway has an asynchronous send method and a synchronous sendAndWait method. I'm using the PRG pattern here, but I'm using sendAndWait because there may be overtaking.

InvoiceController.kt


@Controller
class InvoiceController(val commandGateway: CommandGateway, val queryGateway: QueryGateway) {

  companion object {
    const val REDIRECT_URL = "${UrlBasedViewResolver.REDIRECT_URL_PREFIX}/"
  }

  data class InvoiceBody(override val invoiceTitle: String): IInvoiceBody
  data class InvoiceRequest(
    override val invoiceId: UUID,
    override val invoiceTitle: String
  ): IInvoiceHeader, IInvoiceBody

  @GetMapping("/")
  fun topPage(model: Model): String {
    val invoices = queryGateway.query(InvoiceService.AllInvoicesQuery(), MultipleInstancesResponseType(InvoiceEntity::class.java)).get()
    model.addAttribute("invoices", invoices)
    return "index"
  }

  @PostMapping("/invoices")
  fun createInvoice(invoice: InvoiceBody): String {
    commandGateway.sendAndWait<Any>(CreateInvoiceCommand(UUID.randomUUID(), invoice))
    return REDIRECT_URL
  }

  @PostMapping("/invoices/update")
  fun updateInvoice(invoice: InvoiceRequest): String {
    commandGateway.sendAndWait<Any>(UpdateInvoiceCommand(invoice.invoiceId, invoice))
    return REDIRECT_URL
  }

  @PostMapping("/invoices/delete")
  fun deleteInvoice(@RequestParam invoiceId: UUID): String {
    commandGateway.sendAndWait<Any>(RemoveInvoiceCommand(invoiceId))
    return REDIRECT_URL
  }
}

Trouble

I can't see the result of the command in the query

For example, in the sample, even if I created an application (/ invoices) and redirected to display the page (/), I could not get the data. When I debugged, the @CommandHandler method sent an event and the @EventSourcingHandler method was called immediately, but the @EventHandler method was a little later. Apparently, Axon's default Event Processor, TrackingEventProcessor, runs in a separate thread and monitors for event additions at the default value of 5 seconds.

It's probably a good idea to tune the monitoring cycle for large systems, but for smaller ones it's better to switch to another SubscribingEventProcessor.

@SpringBootApplication
class SampleAxonApplication {
  @Autowired
  fun configure(configurer: EventProcessingConfigurer) {
    configurer.usingSubscribingEventProcessors()
  }
}

With SubscribingEventProcessor, @EventHandler's method is called from the same thread as Controller, so you can avoid query overtaking.

Impressions

Since various investigations have not been completed yet, it is still unclear whether it can withstand actual development. For example

And so on. I would like to investigate these in the future.

Recommended Posts

Try using Axon Framework
Try using libGDX
Try using Maven
Try using powermock-mockito2-2.0.2
Try using GraalVM
Try using jmockit 1.48
Try using sql-migrate
Try using SwiftLint
Try using Log4j 2.0
Try using Java framework Nablarch [Web application]
Try using JobScheduler's REST-API
Try using java.lang.Math methods
Try using PowerMock's WhiteBox
Try using Talend Part 2
Try using Talend Part 1
Try using F # list
Try using each_with_index method
Try using Spring JDBC
Try using RocksDB in Java
Try using GloVe with Deeplearning4j
Try using view_component with rails
Try scraping using java [Notes]
Try using Cocoa from Ruby
Try using letter_opener_web for inquiries
[Swift] Try using Collection View
Try using IntelliJ IDEA once
Try using Spring Boot Security
Try using gRPC in Ruby
[Rails] Try using Faraday middleware
[Processing] Try using GT Force.
CQRS + Event Sourcing Framework Axon
[Programming Encyclopedia] §2 Try using Ruby
People using docker Try using docker-compose
Try using Redmine on Mac docker
Try using Redis with Java (jar)
[Java] Try to implement using generics
Try using the messaging system Pulsar
Try using IBM Java method tracing
Try using Hyperledger Iroha's Java SDK
[Java] Where did you try using java?
Try using || instead of the ternary operator
Try using the service on Android Oreo
Try using the Stream API in Java
Try using the Rails API (zip code)
Study Java Try using Scanner or Map
Try using JSON format API in Java
Try using Spring Boot with VS Code
Try using Reladomo's MT Loader (Multi-Threaded Matcher Loader)
Try using JobScheduler's REST-API --Java RestClient implementation--
Try using Kong + Konga with Docker Compose.
Try using the Emotion API from Android
Try using the Wii remote with Java
Try using simple_form / edit even child models
Try implementing GraphQL server using grahpql-java-tools (+ kotlin)