
数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个;类似的还有线程池。

一个数据库连接对象均对应一个物理数据库连接,每次操作都打开一个物理连接,使用完都关闭连接,这样造成系统的性能低下。各种池化技术的使用原因都是类似的,也就是单独操作比较浪费系统资源,利用池提前准备一些资源,在需要时可以重复使用这些预先准备的资源,从而减少系统开销,实现资源重复利用。
下面以访问MySQL为例,执行一个SQL命令,如果不使用连接池,需要经过哪些流程:
如果使用了连接池第一次访问的时候,需要建立连接。 但是之后的访问,均会复用之前创建的连接,直接执行SQL语句。
//// Created by Cmf on 2022/8/24.//#ifndef CLUSTERCHATSERVER_CONNECTION_H#define CLUSTERCHATSERVER_CONNECTION_H#include <mysql/mysql.h>#include <chrono>#include <string>#include "Log.hpp"class Connection {public: Connection(); ~Connection(); bool Connect(const std::string &ip, const uint16_t port, const std::string &user, const std::string &pwd, const std::string &db); bool Update(const std::string &sql); MYSQL_RES *Query(const std::string &sql); void RefreshAliveTime(); long long GetAliveTime() const;private: MYSQL *_conn; std::chrono::time_point<std::chrono::steady_clock> _aliveTime;};#endif //CLUSTERCHATSERVER_CONNECTION_H//// Created by Cmf on 2022/8/24.//#include "Connection.h"Connection::Connection() { _conn = mysql_init(nullptr); mysql_set_character_set(_conn, "utf8");//设置编码格式维utf8}Connection::~Connection() { if (_conn != nullptr) { mysql_close(_conn); }}bool Connection::Connect(const std::string &ip, const uint16_t port, const std::string &user, const std::string &pwd, const std::string &db) { _conn = mysql_real_connect(_conn, ip.c_str(), user.c_str(), pwd.c_str(), db.c_str(), port, nullptr, 0); if (_conn == nullptr) { LOG_ERROR("MySQL Connect Error") return false; } return true;}bool Connection::Update(const std::string &sql) { if (mysql_query(_conn, sql.c_str()) != 0) { LOG_INFO("SQL %s 更新失败:%d", sql.c_str(), mysql_error(_conn)); return false; } return true;}MYSQL_RES *Connection::Query(const std::string &sql) { if (mysql_query(_conn, sql.c_str()) != 0) { LOG_INFO("SQL %s 查询失败:%d", sql.c_str(), mysql_error(_conn)); return nullptr; } return mysql_use_result(_conn);}void Connection::RefreshAliveTime() { _aliveTime = std::chrono::steady_clock::now();}long long Connection::GetAliveTime() const { return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _aliveTime).count();}//// Created by Cmf on 2022/8/24.//#ifndef CLUSTERCHATSERVER_COMMOMCONNECTIONPOOL_H#define CLUSTERCHATSERVER_COMMOMCONNECTIONPOOL_H#include "noncopyable.hpp"#include <memory>#include <queue>#include <mutex>#include <atomic>#include <thread>#include <condition_variable>#include "Connection.h"class ConnectionPool : private noncopyable {public: static ConnectionPool& GetConnectionPool(); //获取连接池对象实例 //给外部提供接口,从连接池中获取一个可用的空闲连接 std::shared_ptr<Connection> GetConnection();//智能指针自动管理连接的释放 ~ConnectionPool();private: ConnectionPool(); bool LoadConfigFile(); //运行在独立的线程中,专门负责生产新连接 void ProduceConnectionTask(); //扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收 void ScannerConnectionTask(); void AddConnection();private: std::string _ip; uint16_t _port; std::string _user; std::string _pwd; std::string _db; size_t _minSize; //初始链接数量 size_t _maxSize; //最大连接数量 size_t _maxIdleTime;//最大空闲时间 size_t _connectionTimeout;//超时时间 std::queue<Connection *> _connectionQueue;//存储连接队列 std::mutex _mtx; //维护连接队列的线程安全互斥锁 std::atomic_int _connectionCount;//记录连接所创建的connection连接的总数量 std::condition_variable _cv;//设置条件变量,用于连接生产线程和连接消费线程的通信};#endif //CLUSTERCHATSERVER_COMMOMCONNECTIONPOOL_H//// Created by Cmf on 2022/8/24.//#include <fstream>#include "ConnectionPool.h"#include "json.hpp"using json = nlohmann::json;ConnectionPool &ConnectionPool::GetConnectionPool() { static ConnectionPool pool; return pool;}std::shared_ptr<Connection> ConnectionPool::GetConnection() { std::unique_lock<std::mutex> lock(_mtx); while (_connectionQueue.empty()) { //连接为空,就阻塞等待_connectionTimeout时间,如果时间过了,还没唤醒 if (std::cv_status::timeout == _cv.wait_for(lock, std::chrono::microseconds(_connectionTimeout))) { if (_connectionQueue.empty()) { //就可能还是为空 continue; } } } //对于使用完成的连接,不能直接销毁该连接,而是需要将该连接归还给连接池的队列,供之后的其他消费者使用,于是我们使用智能指针,自定义其析构函数,完成放回的操作: std::shared_ptr<Connection> res(_connectionQueue.front(), [&](Connection *conn) { std::unique_lock<std::mutex> locker(_mtx); conn->RefreshAliveTime(); _connectionQueue.push(conn); }); _connectionQueue.pop(); _cv.notify_all(); return res;}ConnectionPool::ConnectionPool() { if (!LoadConfigFile()) { LOG_ERROR("JSON Config Error"); return; } //创建初始数量的连接 for (int i = 0; i < _minSize; ++i) { AddConnection(); } //启动一个新的线程,作为连接的生产者 linux thread => pthread_create std::thread produce(std::bind(&ConnectionPool::ProduceConnectionTask, this)); produce.detach();//守护线程,主线程结束了,这个线程就结束了 //启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收 std::thread scanner(std::bind(&ConnectionPool::ScannerConnectionTask, this)); scanner.detach();}ConnectionPool::~ConnectionPool() { while (!_connectionQueue.empty()) { Connection *ptr = _connectionQueue.front(); _connectionQueue.pop(); delete ptr; }}bool ConnectionPool::LoadConfigFile() { std::ifstream ifs("../../config/dbconf.json"); json js; ifs >> js; std::cout << js << std::endl; if (!js.is_object()) { LOG_ERROR("JSON is NOT Object"); return false; } if (!js["ip"].is_string() || !js["port"].is_number() || !js["user"].is_string() || !js["pwd"].is_string() || !js["db"].is_string() || !js["minSize"].is_number() || !js["maxSize"].is_number() || !js["maxIdleTime"].is_number() || !js["timeout"].is_number()) { LOG_ERROR("JSON The data type does not match"); return false; } _ip = js["ip"].get<std::string>(); _port = js["port"].get<uint16_t>(); _user = js["user"].get<std::string>(); _pwd = js["pwd"].get<std::string>(); _db = js["db"].get<std::string>(); _minSize = js["minSize"].get<size_t>(); _maxSize = js["maxSize"].get<size_t>(); _maxIdleTime = js["maxIdleTime"].get<size_t>(); _connectionTimeout = js["timeout"].get<size_t>(); return true;}void ConnectionPool::ProduceConnectionTask() { while (true) { std::unique_lock<std::mutex> lock(_mtx); while (_connectionQueue.size() >= _minSize) { _cv.wait(lock); } if (_connectionCount < _maxSize) { AddConnection(); } _cv.notify_all(); }}void ConnectionPool::ScannerConnectionTask() { while (true) { std::this_thread::sleep_for(std::chrono::seconds(_maxIdleTime)); std::unique_lock<std::mutex> lock(_mtx); while (_connectionCount > _minSize) { Connection *ptr = _connectionQueue.front();//队头的时间没超过,那后面的时间就都没超过 if (ptr->GetAliveTime() >= _maxIdleTime * 1000) { _connectionQueue.pop(); --_connectionCount; delete ptr; } else { break; } } }}void ConnectionPool::AddConnection() { Connection *conn = new Connection(); conn->Connect(_ip, _port, _user, _pwd, _db); conn->RefreshAliveTime(); _connectionQueue.push(conn); ++_connectionCount;}