I have added a “amrange” (that means “query multiple ranges all at once”) function to the module, in order to show my idea:
int TSDB_amrange(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
char *currentKey;
size_t currentKeyLen;
long long replylen;
timestamp_t last_agg_timestamp;
Series *series;
void *context;
Sample sample;
const char *description = “Dummy\0”; // Put a dummy description to keep the same interface with the python redistimeseries module.
RedisModule_AutoMemory(ctx);
if (argc < 4) {
return RedisModule_WrongArity(ctx);
}
api_timestamp_t start_ts, end_ts;
api_timestamp_t time_delta = 0;
Series fake_series = {0};
fake_series.lastTimestamp = LLONG_MAX;
if (parseRangeArguments(ctx, &fake_series, 1, argv, &start_ts, &end_ts) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
AggregationClass *aggObject = NULL;
int aggregationResult = parseAggregationArgs(ctx, argv, argc, &time_delta, &aggObject);
if (aggregationResult == TSDB_ERROR) {
return REDISMODULE_ERR;
}
int filter_location = RMUtil_ArgIndex(“FILTER”, argv, argc);
if (filter_location == -1) {
return RedisModule_WrongArity(ctx);
}
long long count = -1;
if (parseCountArgument(ctx, argv, argc, &count) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
size_t query_count = argc - 1 - filter_location;
QueryPredicate *queries = RedisModule_PoolAlloc(ctx, sizeof(QueryPredicate) * query_count);
if (parseLabelListFromArgs(ctx, argv, filter_location + 1, query_count, queries) == TSDB_ERROR) {
return RedisModule_ReplyWithError(ctx, “TSDB: failed parsing labels”);
}
if (CountPredicateType(queries, (size_t) query_count, EQ) == 0) {
return RedisModule_ReplyWithError(ctx, “TSDB: please provide at least one matcher”);
}
RedisModuleDict *result = QueryIndex(ctx, queries, query_count);
replylen = 0;
last_agg_timestamp = 0;
context = NULL;
RedisModule_ReplyWithArray(ctx, 1);
RedisModule_ReplyWithArray(ctx, 3);
RedisModule_ReplyWithStringBuffer(ctx, description, strlen(description));
RedisModule_ReplyWithArray(ctx, 1);
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithStringBuffer(ctx, description, strlen(description));
RedisModule_ReplyWithStringBuffer(ctx, description, strlen(description));
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(result, “^”, NULL, 0);
if (aggObject != NULL)
context = aggObject->createContext();
while((currentKey = RedisModule_DictNextC(iter, ¤tKeyLen, NULL)) != NULL) {
RedisModuleKey *key = RedisModule_OpenKey(ctx, RedisModule_CreateString(ctx, currentKey, currentKeyLen),REDISMODULE_READ);
if (key == NULL || RedisModule_ModuleTypeGetType(key) != SeriesType){
RedisModule_Log(ctx, “warning”, “couldn’t open key or key is not a Timeseries. key=%s”, currentKey);
continue;
}
series = RedisModule_ModuleTypeGetValue(key);
// In case a retention is set shouldn’t return chunks older than the retention
if(series->retentionTime){
start_ts = series->lastTimestamp > series->retentionTime ?
max(start_ts, series->lastTimestamp - series->retentionTime) : start_ts;
}
SeriesIterator iterator = SeriesQuery(series, start_ts, end_ts);
while (SeriesIteratorGetNext(&iterator, &sample) != 0 && (count == -1 || replylen < count)) {
if (aggObject == NULL) { // No aggregation whatssoever
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithLongLong(ctx, sample.timestamp);
RedisModule_ReplyWithDouble(ctx, sample.data);
replylen++;
} else {
timestamp_t current_timestamp = sample.timestamp - (sample.timestamp % time_delta);
if (current_timestamp > last_agg_timestamp) {
if (last_agg_timestamp != 0) {
ReplyWithAggValue(ctx, last_agg_timestamp, aggObject, context);
replylen++;
}
last_agg_timestamp = current_timestamp;
}
aggObject->appendValue(context, sample.data);
}
}
SeriesIteratorClose(&iterator);
}
if (aggObject != AGG_NONE && replylen != count) {
// reply last bucket of data
ReplyWithAggValue(ctx, last_agg_timestamp, aggObject, context);
replylen++;
}
RedisModule_DictIteratorStop(iter);
RedisModule_ReplySetArrayLength(ctx, replylen);
return REDISMODULE_OK;
}