@AlexMikhalev so seems like there is couple of issues here, some of them are in the gear function itself but one is a real RedisGears issue (which is a funny thing because I was currently working on fixing it). Its going to be a long reply but I hope I will be able to explain it good enough. And yes basically you are right its a cluster issue and it will not happened with one shard. So here is the explanation:
Lets call the shard that you executed the RG.PYEXECUTE on the initiator. The initiator get the RG.PYEXECUTE command with the requirement list, download and install them (before running the function). Then it runs the function and link the requirements to everything that function creates (registrations, runs, âŚ). Now it needs to send the registrations to all the other shards. In order to do it, it serialize the registration with the requirements and all the steps (python function) and send it to all the shards. The serialized registration here is huge, not just because the requirements are very big but because it also serialize the ânlpâ object (because it used by âparse_paragraphsâ function) which is by itself huge. Till here everything is ok, its should only happened once so its not a big deal. The issue is that its not happening only once⌠it happened each time this registration fires, why? because the registration mode is async which means that it distributed across all the shards. So each time you write a key, Gear serialize and deserialize the registration to a buffer and memory quickly explodes.
Now, I agree that requirements should not have been serialized and deserialized each time, as well as the execution itself, only the data its working on (like the key for example) should have been sent. I am currently working on fixing it for the next version. But until then I am going to suggest a âworkaroundâ (which I also believe is the preferred way to do it regarding the issue I describe). The âworkaroundâ I will suggest will trigger a local execution on the shard that got the event (without creating global execution which require initialization time in order to send it to all the shards).
The first thing I would suggest is to initialize the ânlpâ on each shard separately. It can be done by using the âonRegisteredâ callback when register the function. this callback will be called on each shard upon registration (https://oss.redislabs.com/redisgears/functions.html#register). The second thing I would like to suggest is that each shard will write to a local keys by using the {} and âhashtagâ function. And the last thing I am going to suggest is to change the execution mode to âasync_localâ. It will look something like this:
nlp = None
def OnRegistered():
global nlp
import spacy
nlp=spacy.load('en_core_web_md', disable=['ner','tagger'])
nlp.max_length=2000000
def remove_prefix(text, prefix):
return text[text.startswith(prefix) and len(prefix):]
def parse_paragraphs(x):
global nlp
key_prefix="en:paragraphs:"
#make sure we only process english article
paragraphs =x['value']
key = x['key']
doc=nlp(paragraphs)
idx=1
article_id=remove_prefix(key,key_prefix)
for each_sent in doc.sents:
sentence_key="sentences{%s}:%s:%s" % (key, article_id, idx)
execute('SET', sentence_key, each_sent)
idx+=1
execute('SADD','processed_docs_stage2_sentence{%s}' % hashtag(), article_id)
log("Successfully processed paragraphs "+str(article_id),level='notice')
else:
execute('SADD','screw_ups{%s}' % hashtag(), x['key'])
GB().\
foreach(parse_paragraphs).\
count().\
register('en:paragraphs:*', keyTypes=['string'], onRegistered=OnRegistered, mode="async_local")
Now each time a key will be written to a shard it will be processed locally on the shard. On failure it will be written to a local shard key âscrew_ups{< string that match to shard hslot >}â and on success it will be written to âprocessed_docs_stage2_sentence{< same string that matches shard hslot >}â. Also for each key more keys will be create with the prefix âsentences{ < the original key > }â and we will know they are sitting correctly on the shards hslot mapping again thanks to the â{}â.
Now you can use batch gear execution to collect all the âprocessed_docs_stage2_sentence{âŚ}â keys and analyze them. Or maybe get all the âscrew_upsâ. And you can easily get all the âsentencesâ created for a given key.
One last thing to notice, Gears will run your function in the background one by one for each event. If you will write your data faster then the time it takes to process this buffer will increase (You can see it increases with RG.DUMPREGISTRATIONS and see the amount of trigger executions and the amount of completed execution, the diff is the backlog
). If you have picks then its fine because Gear will keep up the pace after the peek end. But if you always write faster this buffer will continue to grow. One way to avoid it is setting the execution mode to âsyncâ which mean that it will run at the same thread as the write itself and you will only get the write reply after the processing has finished. If you chose this approach you will know for sure this backlog is not growing but you will also increase the reply latency. This is a tradeoff that you will have to chose depends on the full use-case.
Hope its clear. Please update us about the progress and if you have any other questions/issues.