Skip to content

BUG: Result awaited for endless #493

@kai-nashi

Description

@kai-nashi

Reproduce:

  1. taskiq scheduler code:scheduler --update-interval 1
  2. taskiq worker code:broker --workers 1
  3. run code

Expected:

code awaited result and then exit

Real

code will executing forever

Code

import asyncio
import math

from taskiq import Context
from taskiq import SmartRetryMiddleware
from taskiq import TaskiqDepends
from taskiq import InMemoryBroker
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_pipelines import Pipeline
from taskiq_pipelines import PipelineMiddleware
from taskiq_redis import RedisScheduleSource
from taskiq_redis import RedisAsyncResultBackend
from taskiq_redis import ListQueueBroker
from taskiq_redis import RedisScheduleSource

result_backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379/2",
)

redis_schedule_source = RedisScheduleSource("redis://localhost/2")

broker = (
    ListQueueBroker(
        url="redis://localhost:6379/2",
    )
    .with_middlewares(
        SmartRetryMiddleware(schedule_source=redis_schedule_source),
    )
    .with_result_backend(result_backend)
)

scheduler = TaskiqScheduler(broker, [redis_schedule_source])

check_interval = 0.2


@broker.task("power", retry_on_error=True)
async def power(x: int, y: int = 2, context: "Context" = TaskiqDepends()) -> int:
    if int(context.message.labels.get("_retries", 0)) == 0:
        raise Exception("test")
    await asyncio.sleep(check_interval * 10)
    result = x**y
    print(f"{x} ** {y} = {result}")
    return result


@broker.task("sqrt", retry_on_error=True)
async def sqrt(value: int, context: "Context" = TaskiqDepends()) -> float:
    if context.message.labels.get("_retries", 0) == 0:
        raise Exception("test")
    await asyncio.sleep(check_interval * 10)
    result = math.sqrt(value)
    print(f"sqrt({value}) = {result}")
    return result


pipeline = Pipeline(broker).call_next(power).call_next(sqrt)


async def main():
    task = await power.kiq(2)
    result = await task.wait_result(check_interval=check_interval)
    print("result:", result)

    # task = await pipeline.kiq(2)
    # result = await task.wait_result(check_interval=check_interval)
    # print("result:", result)


if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions