Commit 6ff74e49 authored by Yuqi Zhang's avatar Yuqi Zhang
Browse files

Start DMCK for CRDT-Redis

- Connection part.
Test required.

Removed the wrong code for select DB in p2p replicationBroadCast. Need to implement the p2p select command in the future. We assume all the replicas always use the same DB and not implement the p2p select for now.
parent f2a1f4a7
......@@ -11,6 +11,7 @@ cmake-build-debug
compile_commands.json
*.lnk
.clangd
.cache
bench_start
ozlog
......
cmake_minimum_required(VERSION 3.10)
project(dmck)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -lhiredis")
aux_source_directory(. SRC_LIST)
add_executable(dmck_run ${SRC_LIST} redis_connect.h)
target_link_libraries(dmck_run hiredis)
CC = g++
CPPFLAGS = -std=c++14 -O3
CPPFILES = $(shell find . -maxdepth 2 -name "*.cpp")
OBJS = $(CPPFILES:.cpp=.o)
INCLUDES = $(CPPFILES:.cpp=.d)
all: dmck
include $(INCLUDES)
%.d: %.cpp
@set -e; rm -f $@; \
$(CC) -MM -MT $*.o $(CPPFLAGS) $< > $@.$$$$; \
sed 's,\($*\)\.o[ :]*,\1.o $@ : ,g' < $@.$$$$ > $@; \
rm -f $@.$$$$
dmck: $(OBJS)
$(CC) -o dmck_run $^ -pthread -lhiredis
.PHONY: stop clean run
stop:
@../redis_test/shutdown.sh 6379 6380 6381 6382 6383 6384 6385 6386 6387 6388 6389 6390 6391 6392 6393 2>/dev/null 1>&2
clean:
rm -rf dmck_run $(OBJS) $(INCLUDES) *.rdb *.log
run: dmck stop
@rm -rf *.rdb *.log
@./dmck_run
# DMCK for CRDT-Redis
Dis-sys Model Checking (DMCK) for CRDT-Redis.
#include <iostream>
int main()
{
std::istream::sync_with_stdio(false);
std::ostream::sync_with_stdio(false);
std::cout<<"hi"<<std::endl;
return 0;
}
//
// Created by yqzhang on 2021/3/10.
//
#ifndef DMCK_REDIS_CONNECT_H
#define DMCK_REDIS_CONNECT_H
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#if defined(__linux__)
#include <hiredis/hiredis.h>
#elif defined(_WIN32)
#include "../redis-6.0.5/deps/hiredis/hiredis.h"
#endif
using redisReply_ptr = std::unique_ptr<redisReply, decltype(freeReplyObject) *>;
class redis_connect
{
private:
const char *ip;
const int port;
int size, id;
redisContext *client = nullptr, *server_instruct = nullptr, *server_listen = nullptr;
void connect(redisContext *&c)
{
if (c != nullptr) redisFree(c);
c = redisConnect(ip, port);
if (c == nullptr || c->err)
{
if (c)
{
std::cout << "\nError for redisConnect: " << c->errstr << ", ip:" << ip
<< ", port:" << port << std::endl;
redisFree(c);
c = nullptr;
}
else
std::cout << "\nCan't allocate redis context" << std::endl;
exit(-1);
}
}
void connect_server_instruct()
{
std::ostringstream stream;
connect(server_instruct);
stream << "repltest " << size << " " << id;
exec(stream.str(), server_instruct);
}
void connect_server_listen()
{
std::ostringstream stream;
connect(server_listen);
stream << "repltest";
exec(stream.str(), server_listen);
}
inline void connect_client() { connect(client); }
inline void reconnect(redisContext *&c)
{
if (c == client)
connect_client();
else if (c == server_instruct)
connect_server_instruct();
else if (c == server_listen)
connect_server_listen();
else
{
std::cout << "\nWhere does this redisContext come from?" << std::endl;
exit(-1);
}
}
void reply_error(const std::string &cmd, redisContext *&c)
{
std::cout << "\nSomething wrong for host " << c->tcp.host << ":" << c->tcp.port
<< "to execute " << (c != client ? "server inner message " : "") << cmd << "\n";
if (c->reader->err == REDIS_ERR_IO)
std::cout << "IO error: " << strerror(errno) << std::endl;
else
std::cout << "errno: " << c->reader->err << ", err str: " << c->reader->errstr
<< std::endl;
exit(-1);
}
redisReply_ptr exec(const std::string &cmd, redisContext *&c)
{
bool retried = false;
auto r = static_cast<redisReply *>(redisCommand(c, cmd.c_str()));
while (r == nullptr)
{
if (!retried)
{
reconnect(c);
retried = true;
r = static_cast<redisReply *>(redisCommand(c, cmd.c_str()));
continue;
}
reply_error(cmd, c);
}
return redisReply_ptr(r, freeReplyObject);
}
public:
redis_connect(const char *ip, int port, int size, int id)
: ip(ip), port(port), size(size), id(id)
{
connect_server_instruct();
connect_server_listen();
connect_client();
}
void pass_inner_msg(redisReply_ptr &r)
{
std::ostringstream stream;
for (int i = 0; i < r->elements; ++i)
stream << (i != 0 ? " " : "") << r->element[i]->str;
exec(stream.str(), server_instruct);
}
inline redisReply_ptr exec(const std::string &cmd) { return exec(cmd, client); }
redisReply_ptr get_inner_msg()
{
void *reply;
if (redisGetReply(server_listen, &reply) != REDIS_OK)
reply_error("func: get_inner_msg()", server_listen);
return redisReply_ptr(static_cast<redisReply *>(reply), freeReplyObject);
}
~redis_connect()
{
if (client != nullptr) redisFree(client);
if (server_instruct != nullptr) redisFree(server_instruct);
if (server_listen != nullptr) redisFree(server_listen);
}
};
#endif // DMCK_REDIS_CONNECT_H
......@@ -50,7 +50,6 @@ void repltestCommand(client *c)
}
c->flags |= CLIENT_REPLICA;
c->flags |= CLIENT_REPLICA_MESSAGE;
c->authenticated = 1;
if (c->argc == 1)
{
......@@ -60,6 +59,7 @@ void repltestCommand(client *c)
if (c->argc == 3)
{
c->flags |= CLIENT_REPLICA_MESSAGE;
long id, size;
getLongFromObjectOrReply(c, c->argv[1], &size, "invalid replica size.");
getLongFromObjectOrReply(c, c->argv[2], &id, "invalid replica id.");
......@@ -118,54 +118,59 @@ void replicateCommand(client *c)
addReply(c, shared.ok);
}
void replicationBroadcast(list *replicas, int dictid, robj **argv, int argc)
//void replicationBroadcast(list *replicas, int dictid, robj **argv, int argc)
void replicationBroadcast(list *replicas, robj **argv, int argc)
{
listNode *ln;
listIter li;
int j, selected = 0;
char llstr[LONG_STR_SIZE];
int j;
// int selected = 0;
// char llstr[LONG_STR_SIZE];
if (listLength(replicas) == 0)
return;
/* Send SELECT command to every replica if needed. */
if (server.p2p_seldb != dictid)
{
selected = 1;
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS)
{
selectcmd = shared.select[dictid];
}
else
{
int dictid_len;
dictid_len = ll2string(llstr, sizeof(llstr), dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
/* Send it to replicas. */
listRewind(replicas, &li);
while ((ln = listNext(&li)))
{
client *replica = ln->value;
// if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
replica->flags |= CLIENT_REPLICA_MESSAGE;
addReply(replica, shared.multi_cmd);
addReply(replica, selectcmd);
replica->flags &= ~CLIENT_REPLICA_MESSAGE;
}
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.p2p_seldb = dictid;
/* Send SELECT command to every replica if needed.
Need to implement the p2p select command.
We assume all the replicas always use the same DB and
not implement the p2p select for now.*/
// if (server.p2p_seldb != dictid)
// {
// selected = 1;
// robj *selectcmd;
// /* For a few DBs we have pre-computed SELECT command. */
// if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS)
// {
// selectcmd = shared.select[dictid];
// }
// else
// {
// int dictid_len;
// dictid_len = ll2string(llstr, sizeof(llstr), dictid);
// selectcmd = createObject(OBJ_STRING,
// sdscatprintf(sdsempty(),
// "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
// dictid_len, llstr));
// }
// /* Send it to replicas. */
// listRewind(replicas, &li);
// while ((ln = listNext(&li)))
// {
// client *replica = ln->value;
// // if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// replica->flags |= CLIENT_REPLICA_MESSAGE;
// addReply(replica, shared.multi_cmd);
// addReply(replica, selectcmd);
// replica->flags &= ~CLIENT_REPLICA_MESSAGE;
// }
// if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
// decrRefCount(selectcmd);
// }
// server.p2p_seldb = dictid;
/* Write the command to every replica. */
listRewind(replicas, &li);
......@@ -186,8 +191,8 @@ void replicationBroadcast(list *replicas, int dictid, robj **argv, int argc)
for (j = 0; j < argc; j++)
addReplyBulk(replica, argv[j]);
if (selected)
addReply(replica, shared.exec_cmd);
// if (selected)
// addReply(replica, shared.exec_cmd);
replica->flags &= ~CLIENT_REPLICA_MESSAGE;
}
......
......@@ -2946,7 +2946,7 @@ void initServer(void) {
server.clients_timeout_table = raxNew();
/* Force to emit the first SELECT command. */
server.slaveseldb = -1;
server.p2p_seldb = -1;
//server.p2p_seldb = -1;
server.p2p_count = 0;
server.p2p_id = -1;
server.unblocked_clients = listCreate();
......@@ -3497,7 +3497,8 @@ void call(client *c, int flags) {
// Propagate the command to P2P replicas
if (c->rargv && !(c->flags & CLIENT_REPLICA))
{
replicationBroadcast(server.replicas, c->db->id, c->rargv, c->rargc);
// replicationBroadcast(server.replicas, c->db->id, c->rargv, c->rargc);
replicationBroadcast(server.replicas, c->rargv, c->rargc);
}
if (c->flags & CLIENT_REPLICA)
{
......
......@@ -1320,7 +1320,7 @@ struct redisServer {
long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Replication(P2P) */
int p2p_seldb;
//int p2p_seldb; // This should be in client struct, used for p2p select command.
int p2p_count;
int p2p_id;
/* Replication script cache. */
......@@ -1825,7 +1825,9 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* P2P replication */
void replicationBroadcast(list *replicas, int dictid, robj **argv, int argc);
//void replicationBroadcast(list *replicas, int dictid, robj **argv, int argc);
void replicationBroadcast(list *replicas, robj **argv, int argc);
/* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename, int rdbflags);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment