Matching System Part7: 整合撮合系統

https://raw.githubusercontent.com/superj80820/system-design/master/doc/exchange-arch.png

source code:

交易系統分為兩大部分,一個是非同步的TradingUseCase()、一個是同步的syncTradingUseCase()TradingUseCase()將無序的TradingEvent蒐集後,有序的逐筆送入syncTradingUseCase(),我們會希望各種IO在TradingUseCase()完成,撮合這些需高速計算的部分在純memory的syncTradingUseCase()完成並得到trading result,再交給TradingUseCase()進行persistent,最後達到eventual consistency。

我們先介紹tradingUseCase,再介紹實作相對簡單的syncTradingUseCase

tradingUseCase需注入相關IO

type tradingUseCase struct {
	userAssetRepo domain.UserAssetRepo
	tradingRepo   domain.TradingRepo
	candleRepo    domain.CandleRepo
	quotationRepo domain.QuotationRepo
	matchingRepo  domain.MatchingRepo
	orderRepo     domain.OrderRepo

	sequenceTradingUseCase domain.SequenceTradingUseCase
	syncTradingUseCase     domain.SyncTradingUseCase

	lastSequenceID int

	// another code...
}

TradingUseCase主要實現以下幾個methods

type TradingUseCase interface {
	ConsumeTradingEvents(ctx context.Context, key string)

	ConsumeTradingResult(ctx context.Context, key string)

	ProcessTradingEvents(ctx context.Context, tradingEvents []*TradingEvent) error

	// another code...
}

ConsumeTradingEvents(): consume已經過定序模組定序過的trading events,並傳入ProcessTradingEvents(),如果成功則進行explicit commit

func (t *tradingUseCase) ConsumeTradingEvents(ctx context.Context, key string) {
	// another code...

	t.tradingRepo.ConsumeTradingEvents(ctx, key, func(events []*domain.TradingEvent, commitFn func() error) {
		if err := t.ProcessTradingEvents(ctx, events); err != nil {
			// error handle code...
		}
		if err := commitFn(); err != nil {
			// error handle code...
		}
	})
}

ProcessTradingEvents()處理trading events,需先透過sequence id是確認idempotency冪等性:

  • 如果event丟失: 透過t.sequenceTradingUseCase.RecoverEvents()從db讀取event
  • 如果event重複: 忽略event

確認訊息順序正確後,才processTradingEvent()進行處理

func (t *tradingUseCase) ProcessTradingEvents(ctx context.Context, tes []*domain.TradingEvent) error {
	err := t.sequenceTradingUseCase.CheckEventSequence(tes[0].SequenceID, t.lastSequenceID)
	if errors.Is(err, domain.ErrMissEvent) {
		t.logger.Warn("miss events. first event id", loggerKit.Int("first-event-id", tes[0].SequenceID), loggerKit.Int("last-sequence-id", t.lastSequenceID))
		t.sequenceTradingUseCase.RecoverEvents(t.lastSequenceID, func(tradingEvents []*domain.TradingEvent) error {
			for _, te := range tradingEvents {
				if err := t.processTradingEvent(ctx, te); err != nil {
					return errors.Wrap(err, "process trading event failed")
				}
			}
			return nil
		})
		return nil
	}
	for _, te := range tes {
		err := t.sequenceTradingUseCase.CheckEventSequence(te.SequenceID, t.lastSequenceID)
		if errors.Is(err, domain.ErrGetDuplicateEvent) {
			t.logger.Warn("get duplicate events. first event id", loggerKit.Int("first-event-id", tes[0].SequenceID), loggerKit.Int("last-sequence-id", t.lastSequenceID))
			continue
		}
		if err := t.processTradingEvent(ctx, te); err != nil {
			return errors.Wrap(err, "process trading event failed")
		}
	}
	return nil
}

sequence id更新至t.lastSequenceID,並依照event type呼叫對應的method給syncTradingUseCase獲取對應result,以新增訂單來說,syncTradingUseCase.CreateOrder()會獲得MatchResultTransferResult,需將他們包裝成TradingResult,透過tradingRepo.ProduceTradingResult()傳至trading result MQ

這裡的trading result MQ,是用memory的MQ,實際上是當做一個ring buffer,把trading result蒐集起來,並以batch的方式傳入下游系統MQ,以增加throughput,不然一筆一筆傳入下游,這裡會成為bottleneck。

func (t *tradingUseCase) processTradingEvent(ctx context.Context, te *domain.TradingEvent) error {
	var tradingResult domain.TradingResult

	t.lastSequenceID = te.SequenceID

	switch te.EventType {
	case domain.TradingEventCreateOrderType:
		matchResult, transferResult, err := t.syncTradingUseCase.CreateOrder(ctx, te)
		if errors.Is(err, domain.LessAmountErr) || errors.Is(err, domain.InvalidAmountErr) {
			t.logger.Info(fmt.Sprintf("%+v", err))
			return nil
		} else if err != nil {
			return errors.Wrap(err, "process message get failed")
		}

		tradingResult = domain.TradingResult{
			SequenceID:          te.SequenceID,
			TradingResultStatus: domain.TradingResultStatusCreate,
			TradingEvent:        te,
			MatchResult:         matchResult,
			TransferResult:      transferResult,
		}
	case domain.TradingEventCancelOrderType:
		// another code...
	case domain.TradingEventTransferType:
		// another code...
	case domain.TradingEventDepositType:
		// another code...
	default:
		return errors.New("unknown event type")
	}

	if err := t.tradingRepo.ProduceTradingResult(ctx, &tradingResult); err != nil {
		panic(errors.Wrap(err, "produce trading result failed"))
	}

	return nil
}

ConsumeTradingResult()batch consume trading result MQ,批次將trading results傳入下游candle MQasset MQ等下游系統。

func (t *tradingUseCase) ConsumeTradingResult(ctx context.Context, key string) {
	t.tradingRepo.ConsumeTradingResult(ctx, key, func(tradingResults []*domain.TradingResult) error {
		eg, ctx := errgroup.WithContext(ctx)

		eg.Go(func() error {
			if err := t.userAssetRepo.ProduceUserAssetByTradingResults(ctx, tradingResults); err != nil {
				return errors.Wrap(err, "produce order failed")
			}
			return nil
		})

		eg.Go(func() error {
			if err := t.candleRepo.ProduceCandleMQByTradingResults(ctx, tradingResults); err != nil {
				return errors.Wrap(err, "produce candle failed")
			}
			return nil
		})

  	// another code...

		if err := eg.Wait(); err != nil {
			panic(errors.Wrap(err, "produce failed"))
		}

		return nil
	})
}

syncTradingUseCase相對簡單,需注入資產模組、訂單模組、撮合模組、清算模組來實作CreateOrder()等等methods

type syncTradingUseCase struct {
	userAssetUseCase domain.UserAssetUseCase
	orderUseCase     domain.OrderUseCase
	matchingUseCase  domain.MatchingUseCase
	clearingUseCase  domain.ClearingUseCase
}

type SyncTradingUseCase interface {
	CreateOrder(ctx context.Context, tradingEvent *TradingEvent) (*MatchResult, *TransferResult, error)

	// another code...
}

CreateOrder()即帶入TradingEvent後,依序呼叫資產模組、訂單模組、撮合模組、清算模組,並獲得各個results後回傳。

func (t *syncTradingUseCase) CreateOrder(ctx context.Context, tradingEvent *domain.TradingEvent) (*domain.MatchResult, *domain.TransferResult, error) {
	order, transferResult, err := t.orderUseCase.CreateOrder(
		ctx,
		tradingEvent.SequenceID,
		tradingEvent.OrderRequestEvent.OrderID,
		tradingEvent.OrderRequestEvent.UserID,
		tradingEvent.OrderRequestEvent.Direction,
		tradingEvent.OrderRequestEvent.Price,
		tradingEvent.OrderRequestEvent.Quantity,
		tradingEvent.CreatedAt,
	)
	if err != nil {
		return nil, nil, errors.Wrap(err, "create order failed")
	}
	matchResult, err := t.matchingUseCase.NewOrder(ctx, order)
	if err != nil {
		return nil, nil, errors.Wrap(err, "matching order failed")
	}
	clearTransferResult, err := t.clearingUseCase.ClearMatchResult(ctx, matchResult)
	if err != nil {
		return nil, nil, errors.Wrap(err, "clear match order failed")
	}

	transferResult.UserAssets = append(transferResult.UserAssets, clearTransferResult.UserAssets...)

	return matchResult, transferResult, nil
}

這樣就完成了一個交易系統,需要注意的是:

  • 下游系統是可能收到重複的消息的,需忽略這些消息,可以將last sequence id存至redis,也可將db table設有sequence id欄位來檢查
  • 交易系統是有可能崩潰的,雖然我們可以靠儲存在db的sequence trading events來recover,但每次都從頭讀取events會消耗大量時間,我們可以將交易系統memory的狀態每隔一段時間snapshot備份起來,在下次recover的時候先從snapshot讀取,再從db讀取
  • 此實現為單一交易對。如需多個交易對可以架設多個app/exchange-gitbitex
comments powered by Disqus