source code:
交易系統分為兩大部分,一個是非同步的TradingUseCase()
、一個是同步的syncTradingUseCase()
,TradingUseCase()
將無序的TradingEvent
蒐集後,有序的逐筆送入syncTradingUseCase()
,我們會希望各種IO在TradingUseCase()
完成,撮合這些需高速計算的部分在純memory的syncTradingUseCase()
完成並得到trading result,再交給TradingUseCase()
進行persistent,最後達到eventual consistency。
我們先介紹tradingUseCase
,再介紹實作相對簡單的syncTradingUseCase
。
tradingUseCase
需注入相關IO
type tradingUseCase struct {
userAssetRepo domain.UserAssetRepo
tradingRepo domain.TradingRepo
candleRepo domain.CandleRepo
quotationRepo domain.QuotationRepo
matchingRepo domain.MatchingRepo
orderRepo domain.OrderRepo
sequenceTradingUseCase domain.SequenceTradingUseCase
syncTradingUseCase domain.SyncTradingUseCase
lastSequenceID int
// another code...
}
TradingUseCase
主要實現以下幾個methods
type TradingUseCase interface {
ConsumeTradingEvents(ctx context.Context, key string)
ConsumeTradingResult(ctx context.Context, key string)
ProcessTradingEvents(ctx context.Context, tradingEvents []*TradingEvent) error
// another code...
}
ConsumeTradingEvents()
: consume已經過定序模組定序過的trading events,並傳入ProcessTradingEvents()
,如果成功則進行explicit commit
func (t *tradingUseCase) ConsumeTradingEvents(ctx context.Context, key string) {
// another code...
t.tradingRepo.ConsumeTradingEvents(ctx, key, func(events []*domain.TradingEvent, commitFn func() error) {
if err := t.ProcessTradingEvents(ctx, events); err != nil {
// error handle code...
}
if err := commitFn(); err != nil {
// error handle code...
}
})
}
ProcessTradingEvents()
處理trading events,需先透過sequence id是確認idempotency冪等性:
- 如果event丟失: 透過
t.sequenceTradingUseCase.RecoverEvents()
從db讀取event - 如果event重複: 忽略event
確認訊息順序正確後,才processTradingEvent()
進行處理
func (t *tradingUseCase) ProcessTradingEvents(ctx context.Context, tes []*domain.TradingEvent) error {
err := t.sequenceTradingUseCase.CheckEventSequence(tes[0].SequenceID, t.lastSequenceID)
if errors.Is(err, domain.ErrMissEvent) {
t.logger.Warn("miss events. first event id", loggerKit.Int("first-event-id", tes[0].SequenceID), loggerKit.Int("last-sequence-id", t.lastSequenceID))
t.sequenceTradingUseCase.RecoverEvents(t.lastSequenceID, func(tradingEvents []*domain.TradingEvent) error {
for _, te := range tradingEvents {
if err := t.processTradingEvent(ctx, te); err != nil {
return errors.Wrap(err, "process trading event failed")
}
}
return nil
})
return nil
}
for _, te := range tes {
err := t.sequenceTradingUseCase.CheckEventSequence(te.SequenceID, t.lastSequenceID)
if errors.Is(err, domain.ErrGetDuplicateEvent) {
t.logger.Warn("get duplicate events. first event id", loggerKit.Int("first-event-id", tes[0].SequenceID), loggerKit.Int("last-sequence-id", t.lastSequenceID))
continue
}
if err := t.processTradingEvent(ctx, te); err != nil {
return errors.Wrap(err, "process trading event failed")
}
}
return nil
}
sequence id更新至t.lastSequenceID
,並依照event type呼叫對應的method給syncTradingUseCase
獲取對應result,以新增訂單來說,syncTradingUseCase.CreateOrder()
會獲得MatchResult
、TransferResult
,需將他們包裝成TradingResult
,透過tradingRepo.ProduceTradingResult()
傳至trading result MQ
。
這裡的trading result MQ
,是用memory的MQ,實際上是當做一個ring buffer
,把trading result蒐集起來,並以batch的方式傳入下游系統MQ,以增加throughput,不然一筆一筆傳入下游,這裡會成為bottleneck。
func (t *tradingUseCase) processTradingEvent(ctx context.Context, te *domain.TradingEvent) error {
var tradingResult domain.TradingResult
t.lastSequenceID = te.SequenceID
switch te.EventType {
case domain.TradingEventCreateOrderType:
matchResult, transferResult, err := t.syncTradingUseCase.CreateOrder(ctx, te)
if errors.Is(err, domain.LessAmountErr) || errors.Is(err, domain.InvalidAmountErr) {
t.logger.Info(fmt.Sprintf("%+v", err))
return nil
} else if err != nil {
return errors.Wrap(err, "process message get failed")
}
tradingResult = domain.TradingResult{
SequenceID: te.SequenceID,
TradingResultStatus: domain.TradingResultStatusCreate,
TradingEvent: te,
MatchResult: matchResult,
TransferResult: transferResult,
}
case domain.TradingEventCancelOrderType:
// another code...
case domain.TradingEventTransferType:
// another code...
case domain.TradingEventDepositType:
// another code...
default:
return errors.New("unknown event type")
}
if err := t.tradingRepo.ProduceTradingResult(ctx, &tradingResult); err != nil {
panic(errors.Wrap(err, "produce trading result failed"))
}
return nil
}
ConsumeTradingResult()
batch consume trading result MQ
,批次將trading results傳入下游candle MQ
、asset MQ
等下游系統。
func (t *tradingUseCase) ConsumeTradingResult(ctx context.Context, key string) {
t.tradingRepo.ConsumeTradingResult(ctx, key, func(tradingResults []*domain.TradingResult) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
if err := t.userAssetRepo.ProduceUserAssetByTradingResults(ctx, tradingResults); err != nil {
return errors.Wrap(err, "produce order failed")
}
return nil
})
eg.Go(func() error {
if err := t.candleRepo.ProduceCandleMQByTradingResults(ctx, tradingResults); err != nil {
return errors.Wrap(err, "produce candle failed")
}
return nil
})
// another code...
if err := eg.Wait(); err != nil {
panic(errors.Wrap(err, "produce failed"))
}
return nil
})
}
syncTradingUseCase
相對簡單,需注入資產模組、訂單模組、撮合模組、清算模組來實作CreateOrder()
等等methods
type syncTradingUseCase struct {
userAssetUseCase domain.UserAssetUseCase
orderUseCase domain.OrderUseCase
matchingUseCase domain.MatchingUseCase
clearingUseCase domain.ClearingUseCase
}
type SyncTradingUseCase interface {
CreateOrder(ctx context.Context, tradingEvent *TradingEvent) (*MatchResult, *TransferResult, error)
// another code...
}
CreateOrder()
即帶入TradingEvent
後,依序呼叫資產模組、訂單模組、撮合模組、清算模組,並獲得各個results後回傳。
func (t *syncTradingUseCase) CreateOrder(ctx context.Context, tradingEvent *domain.TradingEvent) (*domain.MatchResult, *domain.TransferResult, error) {
order, transferResult, err := t.orderUseCase.CreateOrder(
ctx,
tradingEvent.SequenceID,
tradingEvent.OrderRequestEvent.OrderID,
tradingEvent.OrderRequestEvent.UserID,
tradingEvent.OrderRequestEvent.Direction,
tradingEvent.OrderRequestEvent.Price,
tradingEvent.OrderRequestEvent.Quantity,
tradingEvent.CreatedAt,
)
if err != nil {
return nil, nil, errors.Wrap(err, "create order failed")
}
matchResult, err := t.matchingUseCase.NewOrder(ctx, order)
if err != nil {
return nil, nil, errors.Wrap(err, "matching order failed")
}
clearTransferResult, err := t.clearingUseCase.ClearMatchResult(ctx, matchResult)
if err != nil {
return nil, nil, errors.Wrap(err, "clear match order failed")
}
transferResult.UserAssets = append(transferResult.UserAssets, clearTransferResult.UserAssets...)
return matchResult, transferResult, nil
}
這樣就完成了一個交易系統,需要注意的是:
- 下游系統是可能收到重複的消息的,需忽略這些消息,可以將last sequence id存至redis,也可將db table設有sequence id欄位來檢查
- 交易系統是有可能崩潰的,雖然我們可以靠儲存在db的sequence trading events來recover,但每次都從頭讀取events會消耗大量時間,我們可以將交易系統memory的狀態每隔一段時間snapshot備份起來,在下次recover的時候先從snapshot讀取,再從db讀取
- 此實現為單一交易對。如需多個交易對可以架設多個
app/exchange-gitbitex