一个广播服务器例子
// https://github.com/BurgerGroup/Burger/blob/main/examples/chat/server_threaded.cc
#include "burger/base/Log.h"
#include "burger/net/CoTcpServer.h"
#include "burger/net/Scheduler.h"
#include "burger/net/Buffer.h"
#include "codec.h"
#include <set>
#include <string>
#include <functional>
#include <mutex>
#include <boost/noncopyable.hpp>
using namespace burger;
using namespace burger::net;
using namespace std::placeholders;
class ChatServer : boost::noncopyable {
public:
ChatServer(Scheduler* sched, const InetAddress& listenAddr)
: sched_(sched),
server_(sched, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMsg, this, _1)) {
server_.setConnectionHandler(std::bind(&ChatServer::connHandler, this, _1));
}
void setThreadNum(size_t threadNum) {
server_.setThreadNum(threadNum);
}
void start() {
server_.start();
}
void connHandler(const CoTcpConnection::ptr& conn) {
{
std::lock_guard<std::mutex> lock(mutex_);
connSet_.insert(conn);
}
Buffer::ptr buffer = std::make_shared<Buffer>();
while(conn->recv(buffer) > 0) {
codec_.decode(conn, buffer);
}
{
std::lock_guard<std::mutex> lock(mutex_);
connSet_.erase(conn);
}
}
void onStringMsg(const std::string& msg) {
std::lock_guard<std::mutex> lock(mutex_);
for(auto it = connSet_.begin(); it != connSet_.end(); ++it) {
codec_.wrapAndsend(*it, msg);
}
}
private:
using ConnectionSet = std::set<CoTcpConnection::ptr>;
Scheduler* sched_;
CoTcpServer server_;
LengthHeaderCodec codec_;
ConnectionSet connSet_;
std::mutex mutex_;
};
int main(int argc, char* argv[]) {
if (argc > 1) {
Scheduler sched;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&sched, serverAddr);
if(argc > 2)
server.setThreadNum(atoi(argv[2]));
server.start();
sched.wait();
} else {
printf("Usage: %s port\n", argv[0]);
}
}
server_threaded.cc中,由于mutex的存在,多线程不能并发执行,而是串行的。
copy on write 代码
#include "burger/base/Log.h"
#include "burger/net/CoTcpServer.h"
#include "burger/net/Scheduler.h"
#include "burger/net/Buffer.h"
#include "codec.h"
#include <set>
#include <string>
#include <functional>
#include <mutex>
#include <boost/noncopyable.hpp>
using namespace burger;
using namespace burger::net;
using namespace std::placeholders;
class ChatServer : boost::noncopyable {
public:
ChatServer(Scheduler* sched, const InetAddress& listenAddr)
: sched_(sched),
server_(sched, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMsg, this, _1)),
connSetPtr_(std::make_shared<ConnectionSet>()) { // use_count 为 1
server_.setConnectionHandler(std::bind(&ChatServer::connHandler, this, _1));
}
void setThreadNum(size_t threadNum) {
server_.setThreadNum(threadNum);
}
void start() {
server_.start();
}
void connHandler(const CoTcpConnection::ptr& conn) {
{
// 在复本上修改,不会影响读者,所以读者在遍历列表的时候,不需要用mutex保护
std::lock_guard<std::mutex> lock(mutex_);
if(!connSetPtr_.unique()) { // 引用计数大于1
connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
}
assert(connections_.unique());
connSetPtr_.insert(conn);
}
Buffer::ptr buffer = std::make_shared<Buffer>();
while(conn->recv(buffer) > 0) {
codec_.decode(conn, buffer);
}
{
std::lock_guard<std::mutex> lock(mutex_);
if(!connSetPtr_.unique()) { // 引用计数大于1
connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
}
assert(connections_.unique());
connSetPtr_.erase(conn);
}
}
void onStringMsg(const std::string& msg) {
// 引用计数加1,mutex保护的临界区大大缩短
// 写者是在另一个复本上修改,所以写者无需担心更改了连接的列表
ConnectionSetPtr connSetPtr = getConnSetPtr();
for(auto it = connSetPtr.begin(); it != connSetPtr.end(); ++it) {
codec_.wrapAndsend(*it, msg);
}
// 这个断言不一定成立, 不能确定之前到达reset没有
// assert(!connections.unique());
// 当ConnectionSetPtr这个栈上的变量销毁的时候,引用计数减1
}
ConnectionSetPtr getConnSetPtr() {
std::lock_guard<std::mutex> lock(mutex_);
return connSetPtr_;
}
private:
using ConnectionSet = std::set<CoTcpConnection::ptr>;
using ConnectionSetPtr = std::shared_ptr<ConnectionSet>;
Scheduler* sched_;
CoTcpServer server_;
LengthHeaderCodec codec_;
ConnectionSetPtr connSetPtr_;
std::mutex mutex_;
};
int main(int argc, char* argv[]) {
if (argc > 1) {
Scheduler sched;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&sched, serverAddr);
if(argc > 2)
server.setThreadNum(atoi(argv[2]));
server.start();
sched.wait();
} else {
printf("Usage: %s port\n", argv[0]);
}
}
代码剖析
void onStringMsg(const std::string& msg) {
std::lock_guard<std::mutex> lock(mutex_);
for(auto it = connections_.begin(); it != connections_.end(); ++it) {
codec_.wrapAndsend(*it, msg);
}
}
我们可以借助 shared_ptr实现copy on write 从而降低锁竞争
shared_ptr是引用计数智能指针,如果当前只有一个观察者,那么引用计数为1,可以用shared_ptr::unique()来判断
对于write端,如果发现引用计数为1,这时可以安全地修改对象,不必担心有人在读它。
对于read端,在读之前把引用计数加1,读完之后减1,这样可以保证在读的期间其引用计数大于1,可以阻止并发写。
比较难的是,对于write端,如果发现引用计数大于1,该如何处理?
既然要更新数据,肯定要加锁,如果这时候其他线程正在读,那么不能在原来的数据上修改,得创建一个副本,在副本上修改,修改完了再替换。如果没有用户在读,那么可以直接修改。
核心要点 :
1.如果你是数据的唯一拥有者,那么你可以直接修改数据。
2.如果你不是数据的唯一拥有者,那么你拷贝它之后再修改。
ConnectionSetPtr getConnSetPtr() {
std::lock_guard<std::mutex> lock(mutex_);
return connSetPtr_;
}
void onStringMsg(const std::string& msg) {
ConnectionSetPtr connSetPtr = getConnSetPtr();
for(auto it = connSetPtr.begin(); it != connSetPtr.end(); ++it) {
codec_.wrapAndsend(*it, msg);
}
}
在对象进行读操作,我们getConnSetPtr()将引用计数+1,当离开这个函数,栈上变量离开作用域,引用计数-1
void connHandler(const CoTcpConnection::ptr& conn) {
{
// 在复本上修改,不会影响读者,所以读者在遍历列表的时候,不需要用mutex保护
std::lock_guard<std::mutex> lock(mutex_);
if(!connSetPtr_.unique()) { // 引用计数大于1
connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
}
assert(connections_.unique());
connSetPtr_.insert(conn);
}
在进行读操作的时候,当不是唯一的拥有者,我们就需要去拷贝数据然后去更改,
connSetPtr_.reset(new ConnectionSet(*connSetPtr_)); // 此处是精髓
这里我们new ConnectionSet(*connSetPtr_) 进行复制,这里指向的的是新拷贝的副本,可以进行修改操作,reset,之前的引用计数减1,而读者那边还持有原来那个对象的引用计数继续循环而不用进行mutex保护。
这里我们只需要在连接断开的时候进行加锁,否则我们在onStringMsg()中会一个转发消息给所有客户端之后才进行下一个转发,就无法并发,变成了串行操作
我们此处也只是拷贝了个数据结构,拷贝的损耗也不是很大