基于Burger框架探讨shared_ptr实现copy on write技术

一个广播服务器例子

// https://github.com/BurgerGroup/Burger/blob/main/examples/chat/server_threaded.cc
#include "burger/base/Log.h"
#include "burger/net/CoTcpServer.h"
#include "burger/net/Scheduler.h"
#include "burger/net/Buffer.h"
#include "codec.h"
#include <set>
#include <string>
#include <functional>
#include <mutex>
#include <boost/noncopyable.hpp>

using namespace burger;
using namespace burger::net;
using namespace std::placeholders;

class ChatServer : boost::noncopyable {
public:
    ChatServer(Scheduler* sched, const InetAddress& listenAddr)
        : sched_(sched),
        server_(sched, listenAddr, "ChatServer"),
        codec_(std::bind(&ChatServer::onStringMsg, this, _1)) {
        server_.setConnectionHandler(std::bind(&ChatServer::connHandler, this, _1));
    }

    void setThreadNum(size_t threadNum) {
        server_.setThreadNum(threadNum);
    }
    void start() {
        server_.start();
    }

    void connHandler(const CoTcpConnection::ptr& conn) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            connSet_.insert(conn);
        }
        
        Buffer::ptr buffer = std::make_shared<Buffer>();
        while(conn->recv(buffer) > 0) {
            codec_.decode(conn, buffer);
        }
        {
            std::lock_guard<std::mutex> lock(mutex_);
            connSet_.erase(conn);
        }
        
    } 

    void onStringMsg(const std::string& msg) {
        std::lock_guard<std::mutex> lock(mutex_);
        for(auto it = connSet_.begin(); it != connSet_.end(); ++it) {
            codec_.wrapAndsend(*it, msg);
        }
    }
private: 
    using ConnectionSet = std::set<CoTcpConnection::ptr>;
    Scheduler* sched_;
    CoTcpServer server_;
    LengthHeaderCodec codec_;
    ConnectionSet connSet_;  
    std::mutex mutex_;
};

int main(int argc, char* argv[]) {
    if (argc > 1) {
        Scheduler sched;
        uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
        InetAddress serverAddr(port);
        ChatServer server(&sched, serverAddr);
        if(argc > 2) 
            server.setThreadNum(atoi(argv[2]));
        server.start();
        sched.wait();
    } else {
        printf("Usage: %s port\n", argv[0]);
    }
}

server_threaded.cc中,由于mutex的存在,多线程不能并发执行,而是串行的。

copy on write 代码

#include "burger/base/Log.h"
#include "burger/net/CoTcpServer.h"
#include "burger/net/Scheduler.h"
#include "burger/net/Buffer.h"
#include "codec.h"
#include <set>
#include <string>
#include <functional>
#include <mutex>
#include <boost/noncopyable.hpp>

using namespace burger;
using namespace burger::net;
using namespace std::placeholders;

class ChatServer : boost::noncopyable {
public:
    ChatServer(Scheduler* sched, const InetAddress& listenAddr)
        : sched_(sched),
        server_(sched, listenAddr, "ChatServer"),
        codec_(std::bind(&ChatServer::onStringMsg, this, _1)),
        connSetPtr_(std::make_shared<ConnectionSet>()) {   // use_count 为 1 
        server_.setConnectionHandler(std::bind(&ChatServer::connHandler, this, _1));
    }

    void setThreadNum(size_t threadNum) {
        server_.setThreadNum(threadNum);
    }

    void start() {
        server_.start();
    }

    void connHandler(const CoTcpConnection::ptr& conn) {
        {
            // 在复本上修改,不会影响读者,所以读者在遍历列表的时候,不需要用mutex保护
            std::lock_guard<std::mutex> lock(mutex_);
            if(!connSetPtr_.unique()) {  // 引用计数大于1
                connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
            }
            assert(connections_.unique());
            connSetPtr_.insert(conn);
        }
        
        Buffer::ptr buffer = std::make_shared<Buffer>();
        while(conn->recv(buffer) > 0) {
            codec_.decode(conn, buffer);
        }

        {
            std::lock_guard<std::mutex> lock(mutex_);
            if(!connSetPtr_.unique()) {  // 引用计数大于1
                connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
            }
            assert(connections_.unique());
            connSetPtr_.erase(conn);
        }
        
    } 

    void onStringMsg(const std::string& msg) {
        // 引用计数加1,mutex保护的临界区大大缩短
        // 写者是在另一个复本上修改,所以写者无需担心更改了连接的列表
        ConnectionSetPtr connSetPtr = getConnSetPtr();
        for(auto it = connSetPtr.begin(); it != connSetPtr.end(); ++it) {
            codec_.wrapAndsend(*it, msg);
        }
        // 这个断言不一定成立, 不能确定之前到达reset没有
        // assert(!connections.unique());
        // 当ConnectionSetPtr这个栈上的变量销毁的时候,引用计数减1
    }

    ConnectionSetPtr getConnSetPtr() {
        std::lock_guard<std::mutex> lock(mutex_);
        return connSetPtr_;
    }
private: 
    using ConnectionSet = std::set<CoTcpConnection::ptr>;
    using ConnectionSetPtr = std::shared_ptr<ConnectionSet>;
    Scheduler* sched_;
    CoTcpServer server_;
    LengthHeaderCodec codec_;
    ConnectionSetPtr connSetPtr_;
    std::mutex mutex_;
};

int main(int argc, char* argv[]) {
    if (argc > 1) {
        Scheduler sched;
        uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
        InetAddress serverAddr(port);
        ChatServer server(&sched, serverAddr);
        if(argc > 2) 
            server.setThreadNum(atoi(argv[2]));
        server.start();
        sched.wait();
    } else {
        printf("Usage: %s port\n", argv[0]);
    }
}

代码剖析

void onStringMsg(const std::string& msg) {
    std::lock_guard<std::mutex> lock(mutex_);
    for(auto it = connections_.begin(); it != connections_.end(); ++it) {
        codec_.wrapAndsend(*it, msg);
    }
}

我们可以借助 shared_ptr实现copy on write 从而降低锁竞争

shared_ptr是引用计数智能指针,如果当前只有一个观察者,那么引用计数为1,可以用shared_ptr::unique()来判断

对于write端,如果发现引用计数为1,这时可以安全地修改对象,不必担心有人在读它。

对于read端,在读之前把引用计数加1,读完之后减1,这样可以保证在读的期间其引用计数大于1,可以阻止并发写。

比较难的是,对于write端,如果发现引用计数大于1,该如何处理?

既然要更新数据,肯定要加锁,如果这时候其他线程正在读,那么不能在原来的数据上修改,得创建一个副本,在副本上修改,修改完了再替换。如果没有用户在读,那么可以直接修改。

核心要点 :

1.如果你是数据的唯一拥有者,那么你可以直接修改数据。

2.如果你不是数据的唯一拥有者,那么你拷贝它之后再修改。

ConnectionSetPtr getConnSetPtr() {
    std::lock_guard<std::mutex> lock(mutex_);
    return connSetPtr_;
}

void onStringMsg(const std::string& msg) {
    ConnectionSetPtr connSetPtr = getConnSetPtr();
    for(auto it = connSetPtr.begin(); it != connSetPtr.end(); ++it) {
        codec_.wrapAndsend(*it, msg);
    }
}

在对象进行读操作,我们getConnSetPtr()将引用计数+1,当离开这个函数,栈上变量离开作用域,引用计数-1

void connHandler(const CoTcpConnection::ptr& conn) {
    {
        // 在复本上修改,不会影响读者,所以读者在遍历列表的时候,不需要用mutex保护
        std::lock_guard<std::mutex> lock(mutex_);
        if(!connSetPtr_.unique()) {  // 引用计数大于1
            connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
        }
        assert(connections_.unique());
        connSetPtr_.insert(conn);
    }

在进行读操作的时候,当不是唯一的拥有者,我们就需要去拷贝数据然后去更改,

connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓

这里我们new ConnectionSet(*connSetPtr_) 进行复制,这里指向的的是新拷贝的副本,可以进行修改操作,reset,之前的引用计数减1,而读者那边还持有原来那个对象的引用计数继续循环而不用进行mutex保护。

这里我们只需要在连接断开的时候进行加锁,否则我们在onStringMsg()中会一个转发消息给所有客户端之后才进行下一个转发,就无法并发,变成了串行操作

我们此处也只是拷贝了个数据结构,拷贝的损耗也不是很大