diff options
Diffstat (limited to 'backend/game')
| -rw-r--r-- | backend/game/hub.go | 74 |
1 files changed, 54 insertions, 20 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go index 9ae725a..c9e9680 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -7,20 +7,25 @@ import ( "regexp" "strings" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "albatross-2026-backend/db" "albatross-2026-backend/taskqueue" ) type Hub struct { q *db.Queries + pool *pgxpool.Pool ctx context.Context taskQueue *taskqueue.Queue taskWorker *taskqueue.WorkerServer } -func NewGameHub(q *db.Queries, taskQueue *taskqueue.Queue, taskWorker *taskqueue.WorkerServer) *Hub { +func NewGameHub(q *db.Queries, pool *pgxpool.Pool, taskQueue *taskqueue.Queue, taskWorker *taskqueue.WorkerServer) *Hub { return &Hub{ q: q, + pool: pool, ctx: context.Background(), taskQueue: taskQueue, taskWorker: taskWorker, @@ -74,37 +79,66 @@ func (hub *Hub) processTaskResults() { for taskResult := range hub.taskWorker.Results() { switch taskResult := taskResult.(type) { case *taskqueue.TaskResultRunTestcase: - // TODO: error handling - _ = hub.processTaskResultRunTestcase(taskResult) - aggregatedStatus, _ := hub.q.AggregateTestcaseResults(hub.ctx, int32(taskResult.TaskPayload.SubmissionID)) + if err := hub.processTaskResultRunTestcase(taskResult); err != nil { + slog.Error("failed to process testcase result", "error", err, "submissionID", taskResult.TaskPayload.SubmissionID) + continue + } + aggregatedStatus, err := hub.q.AggregateTestcaseResults(hub.ctx, int32(taskResult.TaskPayload.SubmissionID)) + if err != nil { + slog.Error("failed to aggregate testcase results", "error", err, "submissionID", taskResult.TaskPayload.SubmissionID) + continue + } if aggregatedStatus == "running" { continue } - // TODO: error handling - // TODO: transaction - _ = hub.q.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{ - SubmissionID: int32(taskResult.TaskPayload.SubmissionID), - Status: aggregatedStatus, - }) - _ = hub.q.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{ - GameID: int32(taskResult.TaskPayload.GameID), - UserID: int32(taskResult.TaskPayload.UserID), - Status: aggregatedStatus, - }) - if aggregatedStatus != "success" { + if err := hub.updateSubmissionAndGameState(taskResult, aggregatedStatus); err != nil { + slog.Error("failed to update submission and game state", "error", err, "submissionID", taskResult.TaskPayload.SubmissionID) continue } - _ = hub.q.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{ - GameID: int32(taskResult.TaskPayload.GameID), - UserID: int32(taskResult.TaskPayload.UserID), - }) default: panic("unexpected task result type") } } } +func (hub *Hub) updateSubmissionAndGameState(taskResult *taskqueue.TaskResultRunTestcase, aggregatedStatus string) error { + tx, err := hub.pool.Begin(hub.ctx) + if err != nil { + return err + } + defer func() { + if err := tx.Rollback(hub.ctx); err != nil && err != pgx.ErrTxClosed { + slog.Error("failed to rollback transaction", "error", err) + } + }() + + qtx := hub.q.WithTx(tx) + if err := qtx.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: aggregatedStatus, + }); err != nil { + return err + } + if err := qtx.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{ + GameID: int32(taskResult.TaskPayload.GameID), + UserID: int32(taskResult.TaskPayload.UserID), + Status: aggregatedStatus, + }); err != nil { + return err + } + if aggregatedStatus == "success" { + if err := qtx.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{ + GameID: int32(taskResult.TaskPayload.GameID), + UserID: int32(taskResult.TaskPayload.UserID), + }); err != nil { + return err + } + } + + return tx.Commit(hub.ctx) +} + func (hub *Hub) processTaskResultRunTestcase( taskResult *taskqueue.TaskResultRunTestcase, ) error { |
