aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend/game/hub.go
diff options
context:
space:
mode:
Diffstat (limited to 'backend/game/hub.go')
-rw-r--r--backend/game/hub.go70
1 files changed, 33 insertions, 37 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go
index d918543..9c193f2 100644
--- a/backend/game/hub.go
+++ b/backend/game/hub.go
@@ -7,25 +7,31 @@ import (
"regexp"
"strings"
- "github.com/jackc/pgx/v5"
- "github.com/jackc/pgx/v5/pgxpool"
-
"albatross-2026-backend/db"
"albatross-2026-backend/taskqueue"
)
+type TaskQueueInterface interface {
+ EnqueueTaskRunTestcase(gameID, userID, submissionID, testcaseID int, language, code, stdin, stdout string) error
+}
+
+type TaskWorkerInterface interface {
+ Run() error
+ Results() chan taskqueue.TaskResult
+}
+
type Hub struct {
- q *db.Queries
- pool *pgxpool.Pool
+ q db.Querier
+ txm db.TxManager
ctx context.Context
- taskQueue *taskqueue.Queue
- taskWorker *taskqueue.WorkerServer
+ taskQueue TaskQueueInterface
+ taskWorker TaskWorkerInterface
}
-func NewGameHub(q *db.Queries, pool *pgxpool.Pool, taskQueue *taskqueue.Queue, taskWorker *taskqueue.WorkerServer) *Hub {
+func NewGameHub(q db.Querier, txm db.TxManager, taskQueue TaskQueueInterface, taskWorker TaskWorkerInterface) *Hub {
return &Hub{
q: q,
- pool: pool,
+ txm: txm,
ctx: context.Background(),
taskQueue: taskQueue,
taskWorker: taskWorker,
@@ -104,40 +110,30 @@ func (hub *Hub) processTaskResults() {
}
func (hub *Hub) updateSubmissionAndGameState(taskResult *taskqueue.TaskResultRunTestcase, aggregatedStatus string) error {
- tx, err := hub.pool.Begin(hub.ctx)
- if err != nil {
- return err
- }
- defer func() {
- if err := tx.Rollback(hub.ctx); err != nil && err != pgx.ErrTxClosed {
- slog.Error("failed to rollback transaction", "error", err)
+ return hub.txm.RunInTx(hub.ctx, func(qtx db.Querier) error {
+ if err := qtx.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{
+ SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
+ Status: aggregatedStatus,
+ }); err != nil {
+ return err
}
- }()
-
- qtx := hub.q.WithTx(tx)
- if err := qtx.UpdateSubmissionStatus(hub.ctx, db.UpdateSubmissionStatusParams{
- SubmissionID: int32(taskResult.TaskPayload.SubmissionID),
- Status: aggregatedStatus,
- }); err != nil {
- return err
- }
- if err := qtx.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{
- GameID: int32(taskResult.TaskPayload.GameID),
- UserID: int32(taskResult.TaskPayload.UserID),
- Status: aggregatedStatus,
- }); err != nil {
- return err
- }
- if aggregatedStatus == "success" {
- if err := qtx.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{
+ if err := qtx.UpdateGameStateStatus(hub.ctx, db.UpdateGameStateStatusParams{
GameID: int32(taskResult.TaskPayload.GameID),
UserID: int32(taskResult.TaskPayload.UserID),
+ Status: aggregatedStatus,
}); err != nil {
return err
}
- }
-
- return tx.Commit(hub.ctx)
+ if aggregatedStatus == "success" {
+ if err := qtx.SyncGameStateBestScoreSubmission(hub.ctx, db.SyncGameStateBestScoreSubmissionParams{
+ GameID: int32(taskResult.TaskPayload.GameID),
+ UserID: int32(taskResult.TaskPayload.UserID),
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
}
func (hub *Hub) processTaskResultRunTestcase(