Commit b07310b6 authored by Yuqi Zhang's avatar Yuqi Zhang
Browse files

fix missing clean old add for the add func for remove-win crdts

parent 12ee4099
......@@ -222,6 +222,21 @@ static void insertFunc(redisDb *db, robj *ht, robj **argv, vc *t)
listNode *ln;
listIter li;
int ase_removed = 0;
listRewind(e->aset, &li);
while ((ln = listNext(&li)))
{
rl_ase *ae = ln->value;
if (vc_cmp(ae->t, t) == VC_LESS)
{
listDelNode(e->aset, ln);
if (e->value == ae) e->value = NULL;
rl_aseDelete(ae);
ase_removed = 1;
}
}
listRewind(e->rset, &li);
while ((ln = listNext(&li)))
{
......@@ -235,6 +250,16 @@ static void insertFunc(redisDb *db, robj *ht, robj **argv, vc *t)
#endif
}
}
if(ase_removed && e->value == NULL)
{
listRewind(e->aset, &li);
while ((ln = listNext(&li)))
{
rl_ase *ae = ln->value;
if (e->value == NULL || e->value->t->id < ae->t->id) e->value = ae;
}
}
if (e->value == NULL || e->value->t->id < t->id) e->value = a;
if (pre_insert == 0 && LOOKUP(e))
......
......@@ -106,7 +106,7 @@ void rz_cmdDelete(rz_cmd *cmd)
}
// doesn't free t, doesn't own t
static void insertFunc(rze *e, redisDb *db, robj *tname, robj *key, double value, vc *t)
static void addFunc(rze *e, redisDb *db, robj *tname, robj *key, double value, vc *t)
{
rz_ase *a = zmalloc(sizeof(rz_ase));
a->t = vc_dup(t);
......@@ -119,6 +119,25 @@ static void insertFunc(rze *e, redisDb *db, robj *tname, robj *key, double value
listNode *ln;
listIter li;
int ase_removed = 0;
listRewind(e->aset, &li);
while ((ln = listNext(&li)))
{
rz_ase *ae = ln->value;
if (vc_cmp(ae->t, t) == VC_LESS)
{
#ifdef CRDT_OVERHEAD
ovhd_inc(-RZE_ASE_SIZE);
#endif
listDelNode(e->aset, ln);
if (e->value == ae) e->value = NULL;
vc_delete(ae->t);
zfree(ae);
ase_removed = 1;
}
}
listRewind(e->rset, &li);
while ((ln = listNext(&li)))
{
......@@ -133,7 +152,17 @@ static void insertFunc(rze *e, redisDb *db, robj *tname, robj *key, double value
}
}
if(ase_removed && e->value == NULL)
{
listRewind(e->aset, &li);
while ((ln = listNext(&li)))
{
rz_ase *ae = ln->value;
if (e->value == NULL || e->value->t->id < ae->t->id) e->value = ae;
}
}
if (e->value == NULL || e->value->t->id < t->id) e->value = a;
if (LOOKUP(e))
{
robj *zset = getZsetOrCreate(db, tname, key);
......@@ -228,7 +257,7 @@ static void notifyLoop(rze *e, redisDb *db, robj *tname, robj *key)
switch (cmd->type)
{
case RZADD:
insertFunc(e, db, tname, key, cmd->value, cmd->t);
addFunc(e, db, tname, key, cmd->value, cmd->t);
break;
case RZINCBY:
increaseFunc(e, db, tname, key, cmd->value, cmd->t);
......@@ -268,7 +297,7 @@ int readyCheck(rze *e, vc *t)
// no memory free
void insertFunc(rze *e, redisDb *db, robj *tname, robj *element, double value, vc *t)
void addFunc(rze *e, redisDb *db, robj *tname, robj *element, double value, vc *t)
{
if (!addCheck(e, t))return;
e->aid = t->id;
......@@ -303,7 +332,7 @@ void notifyLoop(rze *e, redisDb *db)
switch (cmd->type)
{
case RZADD:
insertFunc(e, db, cmd->tname, cmd->element, cmd->value, cmd->t);
addFunc(e, db, cmd->tname, cmd->element, cmd->value, cmd->t);
break;
case RZINCBY:
increaseFunc(e, db, cmd->tname, cmd->element, cmd->value, cmd->t);
......@@ -321,7 +350,7 @@ void notifyLoop(rze *e, redisDb *db)
// in effect of addCommand function:
if (readyCheck(e, t))
{
insertFunc(e, c->db, c->rargv[1], c->rargv[2], v, t);
addFunc(e, c->db, c->rargv[1], c->rargv[2], v, t);
vc_delete(t);
}
else
......@@ -379,7 +408,7 @@ void rzaddCommand(client *c)
rze *e = GET_RZE(rargv, 1);
if (vc_causally_ready(e->current, t))
{
insertFunc(e, c->db, c->rargv[1], c->rargv[2], v, t);
addFunc(e, c->db, c->rargv[1], c->rargv[2], v, t);
vc_delete(t);
notifyLoop(e, c->db, c->rargv[1], c->rargv[2]);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment