Python Threads: event.set does not activate the thread in Task/Job management

73
July 16, 2019, at 11:50 AM

I have a Task Broker who uses an internal message queue (redis) where Workers poll tasks from. The tasks have a timeout and should be finished by a Worker before they are expired. So, I implemented a queue in the Broker that adds tasks when given and a thread that checks this queue based on the minimal wait time of the most recently added task. In theory, a worker once done with a task should remove the task from this queue so that wen the minimal wait time has expired, it is no longer present in the queue. If it is, some kind of rollback should be done.

The code beneath does not work. Specifically, I can see in the logs that a task is successfully given and even processed by a worker. However, two issues are: (1) when the worker calls "remove_task_from_queue" the task is not found. (2) when add_task_to_queue is called, the task_signal.set() is also called but the task_queue_monitor function does not progress.

I am unsure what is wrong. Can you help?

Under the code, shortened versions of the logs are also attached. Both from the Worker (running in a different container) and from the main app with the Task Broker.

import threading
from logger import Logger
from task_broker import TaskBroker
from message_queue.message_queue_factory import MessageQueueFactory
from utils.general_utils import get_absolute_time_seconds

class CustomTaskBroker(TaskBroker):
    task_queue = []
    task_signal = None
    def __init__(self):
        self.logger = Logger("CustomTaskBroker")
        self.message_queue = MessageQueueFactory().get_message_queue()
        self.task_signal = threading.Event()
        self.task_monitor_thread = threading.Thread(name = "Task Monitor Thread", target = self.task_queue_monitor)
        self.task_monitor_thread.start()

    def start(self, task_handler, subscribe_channels=[]):
        self.logger.log("[CustomTaskBroker] Starting Broker: starting MQ")
        return self.message_queue.start(task_handler, subscribe_channels)
    def give_task(self, task_obj):
        try:
            self.add_task_to_queue(task_obj)
            self.logger.log("[CustomTaskBroker] Task given. Task is '{0}'. Added to queue: '{1}'".format(task_obj.cast_type, self.task_queue))
            if task_obj.cast_type is "anycast":
                self.message_queue.queue.lpush('changes_' + task_obj.tenant + "_" + "common" +':process', task_obj.as_json_string())
            else:
                self.message_queue.publish(task_obj)
        except Exception as e:
            self.logger.error("[CustomTaskBroker] invalid task received. Error message '{0}'", e)
    def add_task_to_queue(self, task_obj):
        self.task_queue.append({"task": task_obj, "id": task_obj.id, "start_time": get_absolute_time_seconds(), "wait_time": task_obj.wait_time})
        self.task_signal.set()
    def remove_task_from_queue(self, task):
        self.logger.error("[CustomTaskBroker] Trying to remove task with id '{0}' from task_queue '{1}'".format(task["id"], self.task_queue))
        for i in range(len(self.task_queue)):
            if self.task_queue[i]["id"] == task["id"]:
                self.task_queue.pop(i)
                return
        self.logger.error("[CustomTaskBroker] Tried to remove task with id '{0}' but not found!".format(task["id"]))
    def task_queue_monitor(self):
        try:
            while True:
                earliest_task_timer = self.determine_wait_time()
                self.logger.log("[CustomTaskBroker] Monitor_threads, len task_queue is '{0}'. Waiting until task added or wait_time '{1}'".format(len(self.task_queue), earliest_task_timer["wait_time"]))
                self.task_signal.wait(earliest_task_timer["wait_time"])
                self.task_signal.clear()
                self.logger.log("[CustomTaskBroker] Wait time expired or new task added. Checking if tasks expired")
                earliest_task_timer = self.determine_wait_time()
                if earliest_task_timer["wait_time"] < 0:
                    self.logger.log("[CustomTaskBroker] Wait time expired. Checking if the waiting task is still in queue")
                    if earliest_task_timer["task_id"] in [ task["id"] for task in self.task_queue]:
                        raise Exception("We have an expired Task! Rollback")
                        #TODO implement rollback
        except Exception as e:
            self.logger.error("[CustomTaskBroker] Monitor threads encountered an issue. Exception: '{0}'".format(e))
    def determine_wait_time(self):
        if len(self.task_queue) == 0:
            earliest_task_timer = { "id" : "None", "wait_time" : None} #block indefinitely
        else:
            list_of_wait_time_tasks = [(task["id"], task["wait_time"] - (get_absolute_time_seconds() - task["start_time"])) for task in self.task_queue]
            self.logger.log("[CustomTaskBroker] list_of_wait_time_tasks: '{0}'".format(list_of_wait_time_tasks))
            earliest_task_tuple = min(list_of_wait_time_tasks, key= lambda n: n[1])
            earliest_task_timer =  { "id" : earliest_task_tuple[0], "wait_time" : earliest_task_tuple[1]}
        self.logger.log("[CustomTaskBroker] determine_wait_time determined earliest_task_timer: '{0}'".format(earliest_task_timer))
        return earliest_task_timer

Logs Worker

[[[16-07-19 07:05:09] Worker [info] # [Worker] Waiting for items to appear in queue
[[[16-07-19 07:05:09] RedisWrapper [debug] # [RedisWrapper] Doing block_rpop(). Name 'changes_nubera_common:process'
[[[16-07-19 07:05:16] Worker [info] # [Worker] Popped item. Type '<class 'bytes'>'. Item: 'b'{"trigger_invocation": false, "type": "document_task", "action": "unknown", "cast_type": "anycast", "tenant": "nubera", "return_channel": null,
"wait_time": 2, "tasks": ["put_document"], "channel": "worker", "id": "sroxkaqqle", "source": "a628cedb41b5", "doc": {"payload": {"name": "tst_sv_zone", "description": "Testing tst_sv_zone", "type": "zone"}, "category": "types", "subcate
gory": "zone", "name": "tst_sv_zone", "description": "Testing tst_sv_zone"}}''
[[[16-07-19 07:05:16] Document_Manager [debug] # [Document_manager] Succesfully handled document task. Notifying task broker.
[[[16-07-19 07:05:16] CustomTaskBroker [error] # [CustomTaskBroker] Tried to remove task with id 'sroxkaqqle' but not found!
[[[16-07-19 07:05:16] CustomTaskBroker NoneType: None

Logs Main App/Task_broker

[[[16-07-19 07:04:54] TaskBrokerFactory [info] # [TaskBrokerFactory] Creating Broker with type: Custom
[[[16-07-19 07:04:54] MessageQueueFactory [info] # [MessageQueueFactory] Creating MQ with type: Redis
[[[16-07-19 07:04:54] CustomTaskBroker [debug] # [CustomTaskBroker] determine_wait_time determined earliest_task_timer: '{'id': 'None', 'wait_time': None}'
[[[16-07-19 07:04:54] CustomTaskBroker [debug] # [CustomTaskBroker] Monitor_threads, len task_queue is '0'. Waiting until task added or wait_time 'None'
[[[16-07-19 07:04:54] DefaultTaskHandler [info] # [DefaultTaskHandler] Created with call_object 'None', tenants 'None', and stackl_type 'rest'
[[[16-07-19 07:04:54] CustomTaskBroker [debug] # [CustomTaskBroker] Starting Broker: starting MQ
[[[16-07-19 07:04:54] RedisQueue [info] # [RedisQueue] Broker connecting to redis
[[[16-07-19 07:05:16] CustomTaskBroker [debug] # [CustomTaskBroker] Task given. Task is 'anycast'. Added to queue: '[{'task': <task.document_task.DocumentTask object at 0x7ffa1c5a7e80>, 'id': 'sroxkaqqle', 'start_time': 1563260716.1085796, 'wait_time': 2}]'
[[[16-07-19 07:05:16] RedisWrapper [debug] # [RedisWrapper] Doing lpush(). Name changes_nubera_common:process and values {"trigger_invocation": false, "type": "document_task", "action": "unknown", "cast_type": "anycast", "tenant": "nuber
a", "return_channel": null, "wait_time": 2, "tasks": ["put_document"], "channel": "worker", "id": "sroxkaqqle", "source": "a628cedb41b5", "doc": {"payload": {"name": "tst_sv_zone", "description": "Testing tst_sv_zone", "type": "zone"}, "
category": "types", "subcategory": "zone", "name": "tst_sv_zone", "description": "Testing tst_sv_zone"}}
READ ALSO
How to customize AOSP keyboard?

How to customize AOSP keyboard?

I can try to add tab layout in AOSP keybord but tab click doesn't work and also whatsapp edittext hide behind tab so please help me how to add tab functionality in AOSP keyboard ?

56
Showing error while routing from one screen to another in flutter

Showing error while routing from one screen to another in flutter

I'm moving from one screen to another screen but it shows error in routingMy First screen named as VenueOption and the second one is PlayerOption

66
How to change &ldquo;Battery Saver -&gt; Allow Background Running&rdquo; by programming on OPPO A3S with ColorOs

How to change “Battery Saver -> Allow Background Running” by programming on OPPO A3S with ColorOs

I want to my app always running In-Background, but it did not work with OPPO Device ColorOs Version

45
In flutter, How to add tflite as a dependency in pubspec.yaml ?file?

In flutter, How to add tflite as a dependency in pubspec.yaml ?file?

I am little bit confused with thisNow I have

83