aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend/game/hub.go
diff options
context:
space:
mode:
Diffstat (limited to 'backend/game/hub.go')
-rw-r--r--backend/game/hub.go74
1 files changed, 54 insertions, 20 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go
index 9ae725a..c9e9680 100644
--- a/backend/game/hub.go
+++ b/backend/game/hub.go
@@ -7,20 +7,25 @@ import (
"regexp"
"strings"
+ "github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
+
"albatross-2026-backend/db"
"albatross-2026-backend/taskqueue"
)
type Hub struct {
q *db.Queries
+ pool *pgxpool.Pool
ctx context.Context
taskQueue *taskqueue.Queue
taskWorker *taskqueue.WorkerServer
}
-func NewGameHub(q *db.Queries, taskQueue *taskqueue.Queue, taskWorker *taskqueue.WorkerServer) *Hub {
+func NewGameHub(q *db.Queries, pool *pgxpool.Pool, taskQueue *taskqueue.Queue, taskWorker *taskqueue.WorkerServer) *Hub {
return &Hub{
q: q,
+ pool: pool,
ctx: context.Background(),
taskQueue: taskQueue,
taskWorker: taskWorker,
@@ -74,37 +79,66 @@ func (hub *Hub) processTaskResults() {
for taskResult := range hub.taskWorker.Results() {
switch taskResult := taskResult.(type) {
case *taskqueue.TaskResultRunTestcase:
- // TODO: error handling
- _ = hub.processTaskResultRunTestcase(taskResult)
- aggregatedStatus, _ := hub.q.AggregateTestcaseResults(hub.ctx, int32(taskResult.TaskPayload.SubmissionID))
+ if err := hub.processTaskResultRunTestcase(taskResult); err != nil {
+ slog.Error("failed to process testcase result", "error", err, "submissionID", taskResult.TaskPayload.SubmissionID)
+ continue
+ }
+ aggregatedStatus, err := hub.q.AggregateTestcaseResults(hub.ctx, int32(taskResult.TaskPayload.SubmissionID))
+ if err != nil {
+ slog.Error("failed to aggregate testcase results", "error", err, "submissionID", taskResult.TaskPayload.SubmissionID)
+ continue
+ }
if aggregatedStatus == "running" {
continue
}
- // TODO: error handling
- // TODO: transaction
- _ = hub.q.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{
- SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
- Status: aggregatedStatus,
- })
- _ = hub.q.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{
- GameID: int32(taskResult.TaskPayload.GameID),
- UserID: int32(taskResult.TaskPayload.UserID),
- Status: aggregatedStatus,
- })
- if aggregatedStatus != "success" {
+ if err := hub.updateSubmissionAndGameState(taskResult, aggregatedStatus); err != nil {
+ slog.Error("failed to update submission and game state", "error", err, "submissionID", taskResult.TaskPayload.SubmissionID)
continue
}
- _ = hub.q.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{
- GameID: int32(taskResult.TaskPayload.GameID),
- UserID: int32(taskResult.TaskPayload.UserID),
- })
default:
panic("unexpected task result type")
}
}
}
+func (hub *Hub) updateSubmissionAndGameState(taskResult *taskqueue.TaskResultRunTestcase, aggregatedStatus string) error {
+ tx, err := hub.pool.Begin(hub.ctx)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if err := tx.Rollback(hub.ctx); err != nil && err != pgx.ErrTxClosed {
+ slog.Error("failed to rollback transaction", "error", err)
+ }
+ }()
+
+ qtx := hub.q.WithTx(tx)
+ if err := qtx.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ Status: aggregatedStatus,
+ }); err != nil {
+ return err
+ }
+ if err := qtx.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{
+ GameID: int32(taskResult.TaskPayload.GameID),
+ UserID: int32(taskResult.TaskPayload.UserID),
+ Status: aggregatedStatus,
+ }); err != nil {
+ return err
+ }
+ if aggregatedStatus == "success" {
+ if err := qtx.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{
+ GameID: int32(taskResult.TaskPayload.GameID),
+ UserID: int32(taskResult.TaskPayload.UserID),
+ }); err != nil {
+ return err
+ }
+ }
+
+ return tx.Commit(hub.ctx)
+}
+
func (hub *Hub) processTaskResultRunTestcase(
taskResult *taskqueue.TaskResultRunTestcase,
) error {