diff options
Diffstat (limited to 'backend/game/hub.go')
| -rw-r--r-- | backend/game/hub.go | 70 |
1 files changed, 33 insertions, 37 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go index d918543..9c193f2 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -7,25 +7,31 @@ import ( "regexp" "strings" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "albatross-2026-backend/db" "albatross-2026-backend/taskqueue" ) +type TaskQueueInterface interface { + EnqueueTaskRunTestcase(gameID, userID, submissionID, testcaseID int, language, code, stdin, stdout string) error +} + +type TaskWorkerInterface interface { + Run() error + Results() chan taskqueue.TaskResult +} + type Hub struct { - q *db.Queries - pool *pgxpool.Pool + q db.Querier + txm db.TxManager ctx context.Context - taskQueue *taskqueue.Queue - taskWorker *taskqueue.WorkerServer + taskQueue TaskQueueInterface + taskWorker TaskWorkerInterface } -func NewGameHub(q *db.Queries, pool *pgxpool.Pool, taskQueue *taskqueue.Queue, taskWorker *taskqueue.WorkerServer) *Hub { +func NewGameHub(q db.Querier, txm db.TxManager, taskQueue TaskQueueInterface, taskWorker TaskWorkerInterface) *Hub { return &Hub{ q: q, - pool: pool, + txm: txm, ctx: context.Background(), taskQueue: taskQueue, taskWorker: taskWorker, @@ -104,40 +110,30 @@ func (hub *Hub) processTaskResults() { } func (hub *Hub) updateSubmissionAndGameState(taskResult *taskqueue.TaskResultRunTestcase, aggregatedStatus string) error { - tx, err := hub.pool.Begin(hub.ctx) - if err != nil { - return err - } - defer func() { - if err := tx.Rollback(hub.ctx); err != nil && err != pgx.ErrTxClosed { - slog.Error("failed to rollback transaction", "error", err) + return hub.txm.RunInTx(hub.ctx, func(qtx db.Querier) error { + if err := qtx.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: aggregatedStatus, + }); err != nil { + return err } - }() - - qtx := hub.q.WithTx(tx) - if err := qtx.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{ - SubmissionID: int32(taskResult.TaskPayload.SubmissionID), - Status: aggregatedStatus, - }); err != nil { - return err - } - if err := qtx.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{ - GameID: int32(taskResult.TaskPayload.GameID), - UserID: int32(taskResult.TaskPayload.UserID), - Status: aggregatedStatus, - }); err != nil { - return err - } - if aggregatedStatus == "success" { - if err := qtx.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{ + if err := qtx.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{ GameID: int32(taskResult.TaskPayload.GameID), UserID: int32(taskResult.TaskPayload.UserID), + Status: aggregatedStatus, }); err != nil { return err } - } - - return tx.Commit(hub.ctx) + if aggregatedStatus == "success" { + if err := qtx.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{ + GameID: int32(taskResult.TaskPayload.GameID), + UserID: int32(taskResult.TaskPayload.UserID), + }); err != nil { + return err + } + } + return nil + }) } func (hub *Hub) processTaskResultRunTestcase( |
