Hello all!
I’ve got a question about concurrent graph modifications with RedisGraph and Redis’ transaction support. Transaction support is made possible by using Redis’ “multi” keyword, followed by a set of commands, followed by an “exec” directive. I was using this the other day for bulk insert support, specifically because it offers the ability to receive back individual query results / responses. The implantation for bulk loading prior only worked for “create” statements (that are unbounded, or matched) and would chain entities together: ex 'graph.query asdf “create (:thing {age: 33}), (:thing {age: 21})”.
I’ve found that if this process runs concurrently Redis quickly falls into a state where it doesn’t crash but it locks up and stops responding to any command. So, I thought, perhaps this is where I’m supposed to use watch / unwatch. My understanding of this function is that it allows the clients who are committing transactions to know whether a key has been updated since the watch began in an effort to sort of circumvent the need for rollback – i.e. the watch fails or denotes the key was updated, cancel your transaction and try again.
Anyway, JRedisGraph implements this functionality atop the Jedis library. Other RedisGraph clients likely do the same (I haven’t looked).
My question is: what purpose do transactions support in / for RedisGraph where an app/user/function is operating almost entirely on one Redis key?
@Grapes([
@Grab(group='com.redislabs', module='jredisgraph', version='2.0.2'),
@Grab(group='redis.clients', module='jedis', version='3.3.0')
])
import com.redislabs.redisgraph.impl.api.RedisGraph
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
def config = new GenericObjectPoolConfig()
config.setMaxTotal(6)
def jedisPool = new JedisPool(config)
def graph = new RedisGraph(jedisPool)
def createQueries = [] // this contains the queries
graph.getContext().withCloseable { context ->
context.watch(db)
context.multi().withCloseable { transaction ->
createQueries.each { query ->
transaction.query(db, query)
}
def result = transaction.exec()
if (result) {
// yay, store them
} else {
// a null result means the watch failed, the key was altered by another process since the watch was opened
}
}
context.unwatch()
}