代码拉取完成,页面将自动刷新
#include "redix.h"
#include <signal>
#include <algorithm>
Redix::Redix()
: m_evLoop(nullptr) {
}
Redix::~Redix() {
if (getRunning()) { stop(); }
if (m_evloopThread.joinable()) {
m_evloopThread.join();
}
if (nullptr != m_evLoop) {
ev_loop_destroy(m_evLoop);
}
}
bool Redix::connnect(const std::string& host, const int port) {
m_host = host;
m_port = port;
if (!initEv())
{
return false;
}
m_ctx = redisAsyncConnect(host.c_str(), port);
if (!initHiredis())
{
return false;
}
m_evloopThread = thread([this]() { runEventLoop(); });
{
std::unique_lock<std::mutex> lock(m_runMutex);
m_runCond.wait(lock, [this]() {
std::lock_guard<std::mutex> lok(m_connMutex);
return m_running || m_connState == CONNECT_ERROR;
});
}
return getConnectState() == CONNECTED;
}
void Redix::disconnect() {
stop();
wait();
}
void Redix::stop() {
m_toExit = true;
ev_async_send(m_evLoop, &m_stopWatcher);
}
void Redix::wait() {
std::unique_lock<std::mutex> lock(m_exitMutex);
m_exitCond.wait(lock, [this]{
return m_exited;
});
}
void Redix::command(const std::string& cmd, const Callback& callback) {
}
void Redix::runInLoop() {
}
bool Redix::initLibev() {
signal(SIGPIPE, SIG_IGN);
m_evLoop = ev_loop_new(EVFLAG_AUTO);
if (nullptr == m_evLoop) {
setConnectState(INIT_ERROR);
return false;
}
ev_set_userdata(m_evLoop, (void*)this);
return true;
}
bool Redix::initHiredis() {
m_ctx->data = (void*)this;
if (m_ctx->err) {
setConnectState(INIT_ERROR);
return false;
}
if (redisLibevAttach(m_evLoop, m_ctx) != REDIS_OK) {
setConnectState(INIT_ERROR);
return false;
}
if (redisAsyncSetConnectCallback(m_ctx, Redix::connectCallback) != REDIS_OK) {
setConnectState(INIT_ERROR);
return false;
}
if (redisAsyncSetDisconnectCallback(m_ctx, Redix::disconnectCallback) != REDIS_OK) {
setConnectState(INIT_ERROR);
return false;
}
return true;
}
void Redix::connectCallback(const redisAsyncContext* ctx, int status) {
Redix *rdx = (Redix*)ctx->data;
if (REDIS_OK != status) {
rdx->setConnectState(CONNECT_ERROR);
}
else {
ctx->c.reader->fn->freeObject = [](void* reply){};
rdx->setConnectState(CONNECTED);
}
}
void Redix::disconnectCallback(const redisAsyncContext* ctx, int status) {
Redix* rdx = (Redix*)ctx->data;
if (REDIS_OK != status) {
rdx->setConnectState(DISCONNECT_ERROR);
}
else {
rdx->setConnectState(DISCONNECTED);
}
rdx->stop();
}
void Redix::runEventLoop() {
ev_run(m_evLoop, EVRUN_ONCE);
ev_run(m_evLoop, EVRUN_NOWAIT);
{
std::unique_lock<std::mutex> lock(m_connMutex);
m_connCond.wait(lock, [this]() {
return NOT_CONNECTED_YET != m_connState;
});
if (CONNECTED != m_connState) {
setExited(true);
setRunning(false);
return;
}
}
ev_async_init(&m_cmdWatcher, processQueuedCommands);
ev_async_start(m_evLoop, &m_cmdWatcher);
ev_aysnc_init(&m_stopWatcher, breakEventLoop);
ev_async_start(m_evLoop, &m_stopWatcher);
ev_async_init(&m_freeWatcher, freeQueuedCommands);
ev_async_start(m_evLoop, &m_freeWatcher);
setRunning(true);
while (!m_toExit) {
ev_run(m_evLoop);
}
freeAllCommands();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ev_run(m_evLoop, EVRUN_NOWAIT);
if (getConnectState() == CONNECTED) {
redisAsyncDisconnect(m_ctx);
}
ev_run(m_evLoop, EVRUN_NOWAIT);
setExited(true);
setRunning(false);
}
Command* Redix::findCommand(long id) {
std::lock_guard<std::mutex> lock(m_commandMutex);
auto it = m_commands.find(id);
if (it == m_commands.end()) {
return nullptr;
}
return it->second;
}
bool Redix::submitToServer(Command* c) {
Redix* rdx = c->m_rdx;
if (redisAsyncCommand(rdx->m_ctx, commandCallback, (void*)c->m_id, c->cmd()).c_str() != REDIS_OK) {
return false;
}
return true;
}
void Redix::commandCallback(redisAsyncContext* ctx, void* r, void* privdata) {
Redix* rdx = (Redix*)ctx->data;
long id = (long)privdata;
redisReply* reply = (redisReply*)r;
Command* c = rdx->findCommand(id);
if (nullptr == c) {
freeReplyObject(reply);
return;
}
c->processReply(reply);
}
void Redix::processQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) {
Redix *rdx = (Redix*)ev_userdata(loop);
std::lock_guard<std::mutex> lock(rdx->m_preMutex);
while (!rdx->m_preQueue.empty()) {
long id = rdx->m_preQueue.front();
rdx->m_preQueue.pop();
if (rdx->processQueuedCommand(id)) {
}
else {
}
}
}
bool Redix::processQueuedCommand(long id) {
Command* c = findCommand(id);
if (nullptr == c) {
return false;
}
submitToServer(c);
return true;
}
void Redix::processCompletedCommands(struct ev_loop* loop, ev_async* async, int revents) {
Redix* rdx = (Redix*)ev_userdata(loop);
std::lock_guard<std::mutex> lock(rdx->m_completedMutex);
while (!rdx->m_completedQueue.empty()) {
long id = rdx->m_completedQueue.front();
rdx->m_completedQueue.pop();
if (!rdx->processCompletedCommand(id)) {
}
}
}
bool Redix::processCompletedCommand(long id) {
Command* c = findCommand(id);
if (nullptr == c) {
return false;
}
c->invoke();
return true;
}
void Redix::freeQueuedCommands(struct ev_loop* loop, ev_async* async, int revents) {
Redix* rdx = (Redix*)ev_userdata(loop);
std::lock_guard<std::mutex> lock(rdx->m_freeMutex);
while (!rdx->m_freeQueue.empty()) {
long id = rdx->m_freeQueue.front();
rdx->m_freeQueue.pop();
if (!rdx->freeQueueCommand(id)) {
}
}
}
bool Redix::freeQueuedCommand(long id) {
Command* c = findCommand(id);
if (nullptr == c) {
return false;
}
c->freeReply();
deregisterCommand(c->m_id);
delete c;
return true;
}
void Redix::deregisterCommand(const long id) {
std::lock_guard<std::mutex> lock(m_commandMutex);
m_commands.erase(id);
}
long Redix::freeAllCommands() {
std::lock_guard<std::mutex> lock1(m_freeMutex);
std::lock_guard<std::mutex> lock2(m_completedMutex);
std::lock_guard<std::mutex> lock3(m_preMutex);
std::lock_guard<std::mutex> lock4(m_commandMutex);
long len = m_commands.size();
for (auto &pair : m_commands) {
Command* c = pair.second;
c->freeReply();
delete c;
}
m_commands.clear();
return len;
}
int Redix::getConnectState() {
std::lock_guard<std::mutex> lock(m_connMutex);
return m_connectState;
}
void Redix::setConnectState(int state) {
{
std::lock_guard<std::mutex> lock(m_connMutex);
m_connectState = state;
}
m_connCond.notify_all();
}
int Redix::getRunning() {
std::lock_guard<std::mutex> lock(m_runMutex);
return m_running;
}
void Redix::setRunning(bool running) {
{
std::lock_guard<std::mutex> lock(m_runMuext);
m_running = running;
}
m_runCond.notify_one();
}
int Redix::getExited() {
std::lock_guard<std::mutex> lock(m_exitMutex);
return m_exited;
}
void Redix::setExited(bool exited) {
{
std::lock_guard<std::mutex> lock(m_exitMutex);
m_exited = exited;
}
m_exitCond.notify_one();
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。