Worker & Server of COJ

大概想了一下,感觉是这样的。

Server

  • 连接数据库,实际上管理数据
  • 响应前端请求,鉴权,返回应用内容
  • 响应Worker请求,提供评测相关信息

本文主要设计 Server 和 Worker 的交互方式

首先由 Worker 向 Server 发起一个长连接,发送消息包含:worker 的 group,id,连接口令。Server 将 Worker 添加到评测机 pool (链表)中,并返回分配的 WorkerSeqNumber。

Server 收到前端的 submission

-> 将 submission 添加到队尾(rejudge,那就是从数据里取出来放到队尾)

-> 查询对应的 problem 是否有指定的评测机 group

-> 取对应 group 链表头(无指定则随机选择)发送 submission 数据

-> 等待 Worker 返回,放回到对应队尾。持久化评测数据,返回数据给前端

Worker 本地不存在数据库,不与 Server 共享文件系统(分布式)。这意味着:

  • 不能从 db 查询提交详情。由 Server 直接发送。
  • 需要向 Server 实时请求评测数据、题目信息(包括special judge、sample case、题目的分数衰减等)。
    • 为了节约一点,每一个 Problem 有属性 SpjEdited 和 TestEdited。只有在本地无数据最新版本数据的时候,worker 才会重新发请求。
  • 文件和时间戳可以保存在本地文件系统,故单机的 Worker 之间是共享这些数据的。重启后,如果版本没有更新,则不需要重新请求。

Server 监听 Client 和 Worker 消息,使用两个不同的 Socket。

worker api:

/connect?group={}&password={}

返回:>=1 的未占用最小序列号

/tests?problemid={}

返回:problemid 对应问题的评测数据 tarball

/task 请求评测任务(可能很久后才返回):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"submissionId": Int,
"problem": {
"id": Int,
"hasSpj": Bool,
"Spj": String, (base64)
"samplecase": String, Json of base64 encoded input/output
"testcase": String, Json array of input/output file path and testcase weight
"validLanguage": Int,
"skipSample": Bool,
"memory": Int,
"time": Int,
"SpjEdited": String,
"TestEdited": String
},
"Language": Int,
"Src": string, (base64)
"HasInput": Bool,
"UserInput": string, (base64)
}

/submission

参数:id, score, message, failedOn, verdict, judgedAt, memory, time…

Worker

  • 读取配置文件。包含本地文件的存储路径,server 的 ip:port,worker 的分组及 cpuset
  • fork 出对应数量个进程,并设置进程绑定 cpuset
  • 连接 Server,得到自身的 workerSeqNo(用作本地 isolate 容器编号。这样就不会被多个 worker 混用了)
  • 请求任务-处理-返回

重点说一下处理:

请求到评测任务后,先检查评测数据版本。如果最新版本不存在,则向 Server 请求,然后丢到本地目录,并加上时间戳。

再检查如果有 spj,是否有最新版本二进制。如果没有,就重新编译。

然后就和 dotoj 的流程差不多了…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Compile(code):
switch code.Language:
case C:
Exec(gcc, -o, main, ...)
case Cpp:
Exec(g++, -o, main, ...)
default:
return COMPILE_NO_SUCH_LANGUAGE

RunCase(cases):
for case in cases:
Exec(main, case.input)
if (hasSpj)
spj(case.answer, case.output)
else
diff(case.answer, case.output)

RunSubmission():
if language == Lab:
return RunLab()
if hasSpj:
if stat(checker + date) == ENOENT:
Compile(spj)
Compile(src);
if hasInput:
Exec(main, CustomInput)
return Output
if !skipSample:
result = runCase(Samples)
if result != AC:
return WA with X
result = runCase(Tests)
return Result, ...