NodeJS下实现的频率限制优先队列调度器(RateLimitedPriorityQueueScheduler)

Yet another crazy implementation…

缘由:给某bot协议库做框架时想加入对发送消息的操作限制频率的方法

产出:一个不是过于shitcode的通用实现,能够在指定超时情况下返回一个id用于后续控制

贴代码,能在node v16+运行,大概

使用了稀奇古怪的Promise玩法

"use strict"

class PriorityQueue {
    constructor() {
        this.queue = []
    }

    put(item, prio) {
        if(prio === undefined) prio = 0
        for (let i = this.queue.length - 1; i >=0 ; i--) {
            if (prio <= this.queue[i].prio) {
                this.queue.splice(i + 1, 0, {
                    prio: prio,
                    data: item
                })
                return
            }
        }
        this.queue.unshift({
            prio: prio,
            data: item
        })
    }

    get () {
        return this.queue.shift().data
    }

    view(idx) {
        return this.queue[idx].data
    }

    rm(idx) {
        this.queue.splice(idx + 1, 1)
    }

    get length() {
        return this.queue.length
    }
}

class RLPQSTimeoutError extends Error {
    constructor(id, done) {
        super()
        this.id = id
        this.done = async () => {
            await done
        }
    }
}

class RLPQSCancelledError extends Error {
    constructor() {
        super()
    }
}

class RateLimitedPriorityQueueScheduler{
    constructor(period) {
        this._period = period * 1000
        // 对_current_timeout的读写操作不能包含await或回调异步
        this._current_timeout = null
        this._queue = new PriorityQueue()
    }

    _timeout_worker() {
        // 频率限制延时结束,尝试取队列任务
        try {
            const item = this._queue.get()
            // 通知任务开始
            item.done()
            console.log('发射新的任务,插入延迟')
            // 再度插入一个延迟
            this._current_timeout = setTimeout(this._timeout_worker.bind(this), this._period)
        } catch (e) {
            // 队列空,退出调度
            this._current_timeout = null
            console.log('队列空,退出')
        }
    }

    /*
      阻塞并等待排队成功

      若参数指定的超时发生,则抛出RLPQSTimeoutError异常,里面的done()可被再次await

      若成功执行,则返回void

      若被取消,done()与schedule()都会抛出RLPQSCancelledError,此时应当取消后续执行。注意区分Error的type
     */
    async schedule(timeout, priority) {
        if (timeout === undefined) timeout = 5
        if (priority === undefined) priority = 0
        if (this._current_timeout === null) {
            // 没有别的任务导致排队
            // 插入一个delay
            this._current_timeout = setTimeout(this._timeout_worker.bind(this), this._period)
            // 然后立即启动当前任务
        } else {
            // 当前有任务触发了频率限制
            // 构造一个任务id
            const tId_buf = Buffer.alloc(10)
            tId_buf.writeBigUint64LE(BigInt(Date.now()), 0)
            tId_buf.writeUint16LE(Math.floor(Math.random() * 32768), 8)
            const tId = tId_buf.toString('base64')
            // 构造执行回调
            let timeout_resolve, timeout_reject
            const timeout_future = new Promise((resolve, reject) => {
                timeout_resolve = resolve
                timeout_reject = reject
            })
            let cancel_resolve, cancel_reject
            const cancel_future = new Promise((resolve1, reject1) => {
                cancel_resolve = resolve1
                cancel_reject = reject1
            })
            // 压入队列
            const task_item = {
                id: tId,
                done: timeout_resolve,
                cancel: timeout_reject
            }
            this._queue.put(task_item)
            // 等待执行回调和超时任意触发
            try {
                await new Promise((resolve, reject) => {
                    timeout_future.then(() => {
                        // 排队成功
                        resolve()
                    }).catch(()=>{
                        // 在第一阶段的等待中被取消(队列退出)
                        reject('exit')
                    })
                    setTimeout(() => {
                        // 超时
                        reject('timeout')
                        // 更换resolve
                        task_item.done = cancel_resolve
                        task_item.cancel = cancel_reject
                    }, timeout * 1000)
                })
            } catch (e) {
                if (e === 'timeout')
                    throw new RLPQSTimeoutError(tId, cancel_future)
                else if (e === 'exit')
                    throw new RLPQSCancelledError()
            }
        }
    }

    async cancel(id) {
        for (let i = 0; i < this._queue.length; ++i) {
            const item = this._queue.view(i)
            if (item.id === id) {
                item.cancel()
                this._queue.rm(i)
                return
            }
        }
    }

    /*
      队列会被全部停止,所有未安排的task也会被通知cancel,但是这个函数没法得知何时所有task对cancel响应完成

      所以调用此方法后当场结束进程是不明智的
     */
    async request_stop() {
        if (this._current_timeout !== null) {
            clearTimeout(this._current_timeout)
            this._current_timeout = null
            for (let i = 0; i < this._queue.length; ++i) {
                const item = this._queue.view(i)
                item.cancel()
            }
        }
    }
}

async function test() {
    const queue = new RateLimitedPriorityQueueScheduler(0.5)
    let timestamp = Date.now()
    console.log('提交第一个任务,开始计时')
    await queue.schedule(1)
    console.log(`第一个任务未超时排队成功,经过了${(Date.now() - timestamp)/1000}秒`)
    timestamp = Date.now()
    console.log('提交第二个任务,开始计时')
    try{
        await queue.schedule(1)
        console.log('第二个任务未超时排队成功')     // never happen
    } catch (e) {
        console.log(`第二个任务超时,id:${e.id},经过了${(Date.now() - timestamp)/1000}秒,再度等待第二个任务`)
        timestamp = Date.now()
        await e.done()
        console.log(`第二个任务排队成功,经过了${(Date.now() - timestamp)/1000}秒`)
    }
    timestamp = Date.now()
    console.log('提交第三个任务,开始计时')
    try {
        await queue.schedule(0) // 立即超时
    } catch (e) {
        console.log(`第三个任务超时,id:${e.id},经过了${(Date.now() - timestamp)/1000}秒`)
        timestamp = Date.now()
        console.log('1秒后取消第三个任务')
        setTimeout(() => {
            console.log('正在取消第三个任务')
            queue.cancel(e.id)
        }, 1000)
        console.log('继续等待第三个任务')
        try {
            await e.done()
            console.log(`第三个任务排队成功,经过了${(Date.now() - timestamp)/1000}秒`)       // never happen
        } catch (e) {
            console.log(`第三个任务被取消,经过了${(Date.now() - timestamp)/1000}秒`)
        }
    }
    timestamp = Date.now()
    console.log('同时提交5个任务,开始计时')
    const new_task = async () => {
        try{
            await queue.schedule(1)
            console.log('并发任务未超时排队成功')     // never happen
        } catch (e) {
            console.log(`并发任务超时,id:${e.id},经过了${(Date.now() - timestamp)/1000}秒,再度等待`)
            try {
                await e.done()
                console.log(`并发任务排队成功,经过了${(Date.now() - timestamp)/1000}秒`)
            } catch (e) {
                console.log('并发任务被取消')
            }
        }
    }
    new_task().then()
    new_task().then()
    new_task().then()
    new_task().then()
    await new_task()        // 这里由于队列FIFO所以可以确保前面的任务都被安排调度了
    console.log('并发5任务,但是全部取消')
    timestamp = Date.now()
    new_task().then()
    new_task().then()
    new_task().then()
    new_task().then()
    setTimeout(() => {
        queue.request_stop()
    }, 1000)
    await new_task()        // 同理
}

test()

里面有个test(),复制到新js文件后可以node直接跑。

修改const queue = new RateLimitedPriorityQueueScheduler(0.5)这个数值可以设定队列两次调度之间的时间差,单位秒

运行输出:

提交第一个任务,开始计时
第一个任务未超时排队成功,经过了0.004秒
提交第二个任务,开始计时
发射新的任务,插入延迟
第二个任务未超时排队成功
提交第三个任务,开始计时
第三个任务超时,id:OwSfYIMBAAA/Jw==,经过了0.002秒
1秒后取消第三个任务
继续等待第三个任务
发射新的任务,插入延迟
第三个任务排队成功,经过了0.497秒
同时提交5个任务,开始计时
发射新的任务,插入延迟
并发任务未超时排队成功
正在取消第三个任务
发射新的任务,插入延迟
并发任务未超时排队成功
并发任务超时,id:MAafYIMBAABWAg==,经过了1.002秒,再度等待
并发任务超时,id:MAafYIMBAACgKQ==,经过了1.003秒,再度等待
并发任务超时,id:MAafYIMBAADCGQ==,经过了1.003秒,再度等待
发射新的任务,插入延迟
并发任务排队成功,经过了1.503秒
发射新的任务,插入延迟
并发任务排队成功,经过了2.004秒
发射新的任务,插入延迟
并发任务排队成功,经过了2.505秒
并发5任务,但是全部取消
发射新的任务,插入延迟
并发任务未超时排队成功
并发任务超时,id:+Q+fYIMBAAAlOQ==,经过了1.002秒,再度等待
并发任务超时,id:+Q+fYIMBAABhcg==,经过了1.002秒,再度等待
并发任务超时,id:+Q+fYIMBAACuaQ==,经过了1.002秒,再度等待
并发任务被取消
并发任务被取消
并发任务被取消
并发任务超时,id:undefined,经过了1.004秒,再度等待
并发任务被取消

每当打印出排队成功时,代表那个时刻允许bot发送一条消息。默认环境代表限制bot只能0.5秒发一次消息,不同的queue.schedule(1)值代表在同步请求中最多等待的时长,超时则转入异步模式,并支持取消已经进入队列的任务