Yet another crazy implementation…
缘由:给某bot协议库做框架时想加入对发送消息的操作限制频率的方法
产出:一个不是过于shitcode的通用实现,能够在指定超时情况下返回一个id用于后续控制
贴代码,能在node v16+运行,大概
使用了稀奇古怪的Promise玩法
"use strict"
class PriorityQueue {
constructor() {
this.queue = []
}
put(item, prio) {
if(prio === undefined) prio = 0
for (let i = this.queue.length - 1; i >=0 ; i--) {
if (prio <= this.queue[i].prio) {
this.queue.splice(i + 1, 0, {
prio: prio,
data: item
})
return
}
}
this.queue.unshift({
prio: prio,
data: item
})
}
get () {
return this.queue.shift().data
}
view(idx) {
return this.queue[idx].data
}
rm(idx) {
this.queue.splice(idx + 1, 1)
}
get length() {
return this.queue.length
}
}
class RLPQSTimeoutError extends Error {
constructor(id, done) {
super()
this.id = id
this.done = async () => {
await done
}
}
}
class RLPQSCancelledError extends Error {
constructor() {
super()
}
}
class RateLimitedPriorityQueueScheduler{
constructor(period) {
this._period = period * 1000
// 对_current_timeout的读写操作不能包含await或回调异步
this._current_timeout = null
this._queue = new PriorityQueue()
}
_timeout_worker() {
// 频率限制延时结束,尝试取队列任务
try {
const item = this._queue.get()
// 通知任务开始
item.done()
console.log('发射新的任务,插入延迟')
// 再度插入一个延迟
this._current_timeout = setTimeout(this._timeout_worker.bind(this), this._period)
} catch (e) {
// 队列空,退出调度
this._current_timeout = null
console.log('队列空,退出')
}
}
/*
阻塞并等待排队成功
若参数指定的超时发生,则抛出RLPQSTimeoutError异常,里面的done()可被再次await
若成功执行,则返回void
若被取消,done()与schedule()都会抛出RLPQSCancelledError,此时应当取消后续执行。注意区分Error的type
*/
async schedule(timeout, priority) {
if (timeout === undefined) timeout = 5
if (priority === undefined) priority = 0
if (this._current_timeout === null) {
// 没有别的任务导致排队
// 插入一个delay
this._current_timeout = setTimeout(this._timeout_worker.bind(this), this._period)
// 然后立即启动当前任务
} else {
// 当前有任务触发了频率限制
// 构造一个任务id
const tId_buf = Buffer.alloc(10)
tId_buf.writeBigUint64LE(BigInt(Date.now()), 0)
tId_buf.writeUint16LE(Math.floor(Math.random() * 32768), 8)
const tId = tId_buf.toString('base64')
// 构造执行回调
let timeout_resolve, timeout_reject
const timeout_future = new Promise((resolve, reject) => {
timeout_resolve = resolve
timeout_reject = reject
})
let cancel_resolve, cancel_reject
const cancel_future = new Promise((resolve1, reject1) => {
cancel_resolve = resolve1
cancel_reject = reject1
})
// 压入队列
const task_item = {
id: tId,
done: timeout_resolve,
cancel: timeout_reject
}
this._queue.put(task_item)
// 等待执行回调和超时任意触发
try {
await new Promise((resolve, reject) => {
timeout_future.then(() => {
// 排队成功
resolve()
}).catch(()=>{
// 在第一阶段的等待中被取消(队列退出)
reject('exit')
})
setTimeout(() => {
// 超时
reject('timeout')
// 更换resolve
task_item.done = cancel_resolve
task_item.cancel = cancel_reject
}, timeout * 1000)
})
} catch (e) {
if (e === 'timeout')
throw new RLPQSTimeoutError(tId, cancel_future)
else if (e === 'exit')
throw new RLPQSCancelledError()
}
}
}
async cancel(id) {
for (let i = 0; i < this._queue.length; ++i) {
const item = this._queue.view(i)
if (item.id === id) {
item.cancel()
this._queue.rm(i)
return
}
}
}
/*
队列会被全部停止,所有未安排的task也会被通知cancel,但是这个函数没法得知何时所有task对cancel响应完成
所以调用此方法后当场结束进程是不明智的
*/
async request_stop() {
if (this._current_timeout !== null) {
clearTimeout(this._current_timeout)
this._current_timeout = null
for (let i = 0; i < this._queue.length; ++i) {
const item = this._queue.view(i)
item.cancel()
}
}
}
}
async function test() {
const queue = new RateLimitedPriorityQueueScheduler(0.5)
let timestamp = Date.now()
console.log('提交第一个任务,开始计时')
await queue.schedule(1)
console.log(`第一个任务未超时排队成功,经过了${(Date.now() - timestamp)/1000}秒`)
timestamp = Date.now()
console.log('提交第二个任务,开始计时')
try{
await queue.schedule(1)
console.log('第二个任务未超时排队成功') // never happen
} catch (e) {
console.log(`第二个任务超时,id:${e.id},经过了${(Date.now() - timestamp)/1000}秒,再度等待第二个任务`)
timestamp = Date.now()
await e.done()
console.log(`第二个任务排队成功,经过了${(Date.now() - timestamp)/1000}秒`)
}
timestamp = Date.now()
console.log('提交第三个任务,开始计时')
try {
await queue.schedule(0) // 立即超时
} catch (e) {
console.log(`第三个任务超时,id:${e.id},经过了${(Date.now() - timestamp)/1000}秒`)
timestamp = Date.now()
console.log('1秒后取消第三个任务')
setTimeout(() => {
console.log('正在取消第三个任务')
queue.cancel(e.id)
}, 1000)
console.log('继续等待第三个任务')
try {
await e.done()
console.log(`第三个任务排队成功,经过了${(Date.now() - timestamp)/1000}秒`) // never happen
} catch (e) {
console.log(`第三个任务被取消,经过了${(Date.now() - timestamp)/1000}秒`)
}
}
timestamp = Date.now()
console.log('同时提交5个任务,开始计时')
const new_task = async () => {
try{
await queue.schedule(1)
console.log('并发任务未超时排队成功') // never happen
} catch (e) {
console.log(`并发任务超时,id:${e.id},经过了${(Date.now() - timestamp)/1000}秒,再度等待`)
try {
await e.done()
console.log(`并发任务排队成功,经过了${(Date.now() - timestamp)/1000}秒`)
} catch (e) {
console.log('并发任务被取消')
}
}
}
new_task().then()
new_task().then()
new_task().then()
new_task().then()
await new_task() // 这里由于队列FIFO所以可以确保前面的任务都被安排调度了
console.log('并发5任务,但是全部取消')
timestamp = Date.now()
new_task().then()
new_task().then()
new_task().then()
new_task().then()
setTimeout(() => {
queue.request_stop()
}, 1000)
await new_task() // 同理
}
test()
里面有个test()
,复制到新js文件后可以node直接跑。
修改const queue = new RateLimitedPriorityQueueScheduler(0.5)
这个数值可以设定队列两次调度之间的时间差,单位秒
运行输出:
提交第一个任务,开始计时
第一个任务未超时排队成功,经过了0.004秒
提交第二个任务,开始计时
发射新的任务,插入延迟
第二个任务未超时排队成功
提交第三个任务,开始计时
第三个任务超时,id:OwSfYIMBAAA/Jw==,经过了0.002秒
1秒后取消第三个任务
继续等待第三个任务
发射新的任务,插入延迟
第三个任务排队成功,经过了0.497秒
同时提交5个任务,开始计时
发射新的任务,插入延迟
并发任务未超时排队成功
正在取消第三个任务
发射新的任务,插入延迟
并发任务未超时排队成功
并发任务超时,id:MAafYIMBAABWAg==,经过了1.002秒,再度等待
并发任务超时,id:MAafYIMBAACgKQ==,经过了1.003秒,再度等待
并发任务超时,id:MAafYIMBAADCGQ==,经过了1.003秒,再度等待
发射新的任务,插入延迟
并发任务排队成功,经过了1.503秒
发射新的任务,插入延迟
并发任务排队成功,经过了2.004秒
发射新的任务,插入延迟
并发任务排队成功,经过了2.505秒
并发5任务,但是全部取消
发射新的任务,插入延迟
并发任务未超时排队成功
并发任务超时,id:+Q+fYIMBAAAlOQ==,经过了1.002秒,再度等待
并发任务超时,id:+Q+fYIMBAABhcg==,经过了1.002秒,再度等待
并发任务超时,id:+Q+fYIMBAACuaQ==,经过了1.002秒,再度等待
并发任务被取消
并发任务被取消
并发任务被取消
并发任务超时,id:undefined,经过了1.004秒,再度等待
并发任务被取消
每当打印出排队成功
时,代表那个时刻允许bot发送一条消息。默认环境代表限制bot只能0.5秒发一次消息,不同的queue.schedule(1)
值代表在同步请求中最多等待的时长,超时则转入异步模式,并支持取消已经进入队列的任务