@@ -420,9 +420,10 @@ def func(scheduler):
420
420
await comm .close ()
421
421
422
422
423
- def test_scheduler_init_pulls_blocked_handlers_from_config ():
424
- with dask .config .set ({"distributed.scheduler.blocked-handlers" : ["test-handler" ]}):
425
- s = Scheduler ()
423
+ @gen_cluster (
424
+ nthreads = [], config = {"distributed.scheduler.blocked-handlers" : ["test-handler" ]}
425
+ )
426
+ def test_scheduler_init_pulls_blocked_handlers_from_config (s ):
426
427
assert s .blocked_handlers == ["test-handler" ]
427
428
428
429
@@ -673,9 +674,8 @@ async def test_broadcast_nanny(s, a, b):
673
674
assert result1 == result3
674
675
675
676
676
- @gen_test ()
677
- async def test_worker_name ():
678
- s = await Scheduler (validate = True , port = 0 )
677
+ @gen_cluster (nthreads = [])
678
+ async def test_worker_name (s ):
679
679
w = await Worker (s .address , name = "alice" )
680
680
assert s .workers [w .address ].name == "alice"
681
681
assert s .aliases ["alice" ] == w .address
@@ -685,60 +685,53 @@ async def test_worker_name():
685
685
await w2 .close ()
686
686
687
687
await w .close ()
688
- await s .close ()
689
688
690
689
691
- @gen_test ()
692
- async def test_coerce_address ():
693
- with dask .config .set ({"distributed.comm.timeouts.connect" : "100ms" }):
694
- s = await Scheduler (validate = True , port = 0 )
695
- print ("scheduler:" , s .address , s .listen_address )
696
- a = Worker (s .address , name = "alice" )
697
- b = Worker (s .address , name = 123 )
698
- c = Worker ("127.0.0.1" , s .port , name = "charlie" )
699
- await asyncio .gather (a , b , c )
700
-
701
- assert s .coerce_address ("127.0.0.1:8000" ) == "tcp://127.0.0.1:8000"
702
- assert s .coerce_address ("[::1]:8000" ) == "tcp://[::1]:8000"
703
- assert s .coerce_address ("tcp://127.0.0.1:8000" ) == "tcp://127.0.0.1:8000"
704
- assert s .coerce_address ("tcp://[::1]:8000" ) == "tcp://[::1]:8000"
705
- assert s .coerce_address ("localhost:8000" ) in (
706
- "tcp://127.0.0.1:8000" ,
707
- "tcp://[::1]:8000" ,
708
- )
709
- assert s .coerce_address ("localhost:8000" ) in (
710
- "tcp://127.0.0.1:8000" ,
711
- "tcp://[::1]:8000" ,
712
- )
713
- assert s .coerce_address (a .address ) == a .address
714
- # Aliases
715
- assert s .coerce_address ("alice" ) == a .address
716
- assert s .coerce_address (123 ) == b .address
717
- assert s .coerce_address ("charlie" ) == c .address
690
+ @gen_cluster (nthreads = [])
691
+ async def test_coerce_address (s ):
692
+ print ("scheduler:" , s .address , s .listen_address )
693
+ a = Worker (s .address , name = "alice" )
694
+ b = Worker (s .address , name = 123 )
695
+ c = Worker ("127.0.0.1" , s .port , name = "charlie" )
696
+ await asyncio .gather (a , b , c )
697
+
698
+ assert s .coerce_address ("127.0.0.1:8000" ) == "tcp://127.0.0.1:8000"
699
+ assert s .coerce_address ("[::1]:8000" ) == "tcp://[::1]:8000"
700
+ assert s .coerce_address ("tcp://127.0.0.1:8000" ) == "tcp://127.0.0.1:8000"
701
+ assert s .coerce_address ("tcp://[::1]:8000" ) == "tcp://[::1]:8000"
702
+ assert s .coerce_address ("localhost:8000" ) in (
703
+ "tcp://127.0.0.1:8000" ,
704
+ "tcp://[::1]:8000" ,
705
+ )
706
+ assert s .coerce_address ("localhost:8000" ) in (
707
+ "tcp://127.0.0.1:8000" ,
708
+ "tcp://[::1]:8000" ,
709
+ )
710
+ assert s .coerce_address (a .address ) == a .address
711
+ # Aliases
712
+ assert s .coerce_address ("alice" ) == a .address
713
+ assert s .coerce_address (123 ) == b .address
714
+ assert s .coerce_address ("charlie" ) == c .address
718
715
719
- assert s .coerce_hostname ("127.0.0.1" ) == "127.0.0.1"
720
- assert s .coerce_hostname ("alice" ) == a .ip
721
- assert s .coerce_hostname (123 ) == b .ip
722
- assert s .coerce_hostname ("charlie" ) == c .ip
723
- assert s .coerce_hostname ("jimmy" ) == "jimmy"
716
+ assert s .coerce_hostname ("127.0.0.1" ) == "127.0.0.1"
717
+ assert s .coerce_hostname ("alice" ) == a .ip
718
+ assert s .coerce_hostname (123 ) == b .ip
719
+ assert s .coerce_hostname ("charlie" ) == c .ip
720
+ assert s .coerce_hostname ("jimmy" ) == "jimmy"
724
721
725
- assert s .coerce_address ("zzzt:8000" , resolve = False ) == "tcp://zzzt:8000"
722
+ assert s .coerce_address ("zzzt:8000" , resolve = False ) == "tcp://zzzt:8000"
723
+ await asyncio .gather (a .close (), b .close (), c .close ())
726
724
727
- await s .close ()
728
- await asyncio .gather (a .close (), b .close (), c .close ())
729
725
726
+ @gen_cluster (nthreads = [], config = {"distributed.scheduler.work-stealing" : True })
727
+ async def test_config_stealing (s ):
728
+ """Regression test for https://github.com/dask/distributed/issues/3409"""
729
+ assert "stealing" in s .extensions
730
730
731
- @pytest .mark .asyncio
732
- async def test_config_stealing (cleanup ):
733
- # Regression test for https://github.com/dask/distributed/issues/3409
734
-
735
- with dask .config .set ({"distributed.scheduler.work-stealing" : True }):
736
- async with Scheduler (port = 0 ) as s :
737
- assert "stealing" in s .extensions
738
731
739
- with dask . config . set ( {"distributed.scheduler.work-stealing" : False }):
740
- async with Scheduler ( port = 0 ) as s :
741
- assert "stealing" not in s .extensions
732
+ @ gen_cluster ( nthreads = [], config = {"distributed.scheduler.work-stealing" : False })
733
+ async def test_config_no_stealing ( s ) :
734
+ assert "stealing" not in s .extensions
742
735
743
736
744
737
@pytest .mark .skipif (WINDOWS , reason = "num_fds not supported on windows" )
@@ -771,7 +764,7 @@ async def test_update_graph_culls(s, a, b):
771
764
772
765
773
766
def test_io_loop (loop ):
774
- s = Scheduler (loop = loop , validate = True )
767
+ s = Scheduler (loop = loop , port = 0 , dashboard_address = ":0" , validate = True )
775
768
assert s .io_loop is loop
776
769
777
770
@@ -1284,7 +1277,7 @@ async def test_fifo_submission(c, s, w):
1284
1277
@gen_test ()
1285
1278
async def test_scheduler_file ():
1286
1279
with tmpfile () as fn :
1287
- s = await Scheduler (scheduler_file = fn , port = 0 )
1280
+ s = await Scheduler (scheduler_file = fn , port = 0 , dashboard_address = ":0" )
1288
1281
with open (fn ) as f :
1289
1282
data = json .load (f )
1290
1283
assert data ["address" ] == s .address
@@ -1332,7 +1325,7 @@ async def test_dashboard_host(host, dashboard_address, expect):
1332
1325
"""Dashboard is accessible from any host by default, but it can be also bound to
1333
1326
localhost.
1334
1327
"""
1335
- async with Scheduler (host = host , dashboard_address = dashboard_address ) as s :
1328
+ async with Scheduler (host = host , port = 0 , dashboard_address = dashboard_address ) as s :
1336
1329
sock = first (s .http_server ._sockets .values ())
1337
1330
assert sock .getsockname ()[0 ] in expect
1338
1331
@@ -1484,10 +1477,10 @@ async def test_get_task_status(c, s, a, b):
1484
1477
assert result == {future .key : "memory" }
1485
1478
1486
1479
1487
- def test_deque_handler ():
1480
+ @gen_cluster (nthreads = [])
1481
+ async def test_deque_handler (s ):
1488
1482
from distributed .scheduler import logger
1489
1483
1490
- s = Scheduler ()
1491
1484
deque_handler = s ._deque_handler
1492
1485
logger .info ("foo123" )
1493
1486
assert len (deque_handler .deque ) >= 1
@@ -1801,33 +1794,30 @@ async def test_close_workers(s, a, b):
1801
1794
@pytest .mark .skipif (not LINUX , reason = "Need 127.0.0.2 to mean localhost" )
1802
1795
@gen_test ()
1803
1796
async def test_host_address ():
1804
- s = await Scheduler (host = "127.0.0.2" , port = 0 )
1797
+ s = await Scheduler (host = "127.0.0.2" , port = 0 , dashboard_address = ":0" )
1805
1798
assert "127.0.0.2" in s .address
1806
1799
await s .close ()
1807
1800
1808
1801
1809
1802
@gen_test ()
1810
1803
async def test_dashboard_address ():
1811
1804
pytest .importorskip ("bokeh" )
1812
- s = await Scheduler (dashboard_address = "127.0.0.1:8901" , port = 0 )
1813
- assert s .services ["dashboard" ].port == 8901
1814
- await s .close ()
1805
+ async with Scheduler (dashboard_address = "127.0.0.1:8901" , port = 0 ) as s :
1806
+ assert s .services ["dashboard" ].port == 8901
1815
1807
1816
- s = await Scheduler (dashboard_address = "127.0.0.1" , port = 0 )
1817
- assert s .services ["dashboard" ].port
1818
- await s .close ()
1808
+ async with Scheduler (dashboard_address = "127.0.0.1" , port = 0 ) as s :
1809
+ assert s .services ["dashboard" ].port
1819
1810
1820
- s = await Scheduler (dashboard_address = "127.0.0.1:8901,127.0.0.1:8902" , port = 0 )
1821
- assert s .services ["dashboard" ].port == 8901
1822
- await s .close ()
1811
+ async with Scheduler (
1812
+ dashboard_address = "127.0.0.1:8901,127.0.0.1:8902" , port = 0
1813
+ ) as s :
1814
+ assert s .services ["dashboard" ].port == 8901
1823
1815
1824
- s = await Scheduler (dashboard_address = ":8901,:8902" , port = 0 )
1825
- assert s .services ["dashboard" ].port == 8901
1826
- await s .close ()
1816
+ async with Scheduler (dashboard_address = ":8901,:8902" , port = 0 ) as s :
1817
+ assert s .services ["dashboard" ].port == 8901
1827
1818
1828
- s = await Scheduler (dashboard_address = [8901 , 8902 ], port = 0 )
1829
- assert s .services ["dashboard" ].port == 8901
1830
- await s .close ()
1819
+ async with Scheduler (dashboard_address = [8901 , 8902 ], port = 0 ) as s :
1820
+ assert s .services ["dashboard" ].port == 8901
1831
1821
1832
1822
1833
1823
@gen_cluster (client = True )
@@ -1856,82 +1846,79 @@ async def test_adaptive_target(c, s, a, b):
1856
1846
assert s .adaptive_target (target_duration = ".1s" ) == 0
1857
1847
1858
1848
1859
- @pytest . mark . asyncio
1860
- async def test_async_context_manager (cleanup ):
1861
- async with Scheduler (port = 0 ) as s :
1849
+ @gen_test ()
1850
+ async def test_async_context_manager ():
1851
+ async with Scheduler (port = 0 , dashboard_address = ":0" ) as s :
1862
1852
assert s .status == Status .running
1863
1853
async with Worker (s .address ) as w :
1864
1854
assert w .status == Status .running
1865
1855
assert s .workers
1866
1856
assert not s .workers
1867
1857
1868
1858
1869
- @pytest . mark . asyncio
1870
- async def test_allowed_failures_config (cleanup ):
1871
- async with Scheduler (port = 0 , allowed_failures = 10 ) as s :
1859
+ @gen_test ()
1860
+ async def test_allowed_failures_config ():
1861
+ async with Scheduler (port = 0 , dashboard_address = ":0" , allowed_failures = 10 ) as s :
1872
1862
assert s .allowed_failures == 10
1873
1863
1874
1864
with dask .config .set ({"distributed.scheduler.allowed_failures" : 100 }):
1875
- async with Scheduler (port = 0 ) as s :
1865
+ async with Scheduler (port = 0 , dashboard_address = ":0" ) as s :
1876
1866
assert s .allowed_failures == 100
1877
1867
1878
1868
with dask .config .set ({"distributed.scheduler.allowed_failures" : 0 }):
1879
- async with Scheduler (port = 0 ) as s :
1869
+ async with Scheduler (port = 0 , dashboard_address = ":0" ) as s :
1880
1870
assert s .allowed_failures == 0
1881
1871
1882
1872
1883
- @pytest . mark . asyncio
1873
+ @gen_test ()
1884
1874
async def test_finished ():
1885
- async with Scheduler (port = 0 ) as s :
1875
+ async with Scheduler (port = 0 , dashboard_address = ":0" ) as s :
1886
1876
async with Worker (s .address ) as w :
1887
1877
pass
1888
1878
1889
1879
await s .finished ()
1890
1880
await w .finished ()
1891
1881
1892
1882
1893
- @pytest .mark .asyncio
1894
- async def test_retire_names_str (cleanup ):
1895
- async with Scheduler (port = 0 ) as s :
1896
- async with Worker (s .address , name = "0" ) as a :
1897
- async with Worker (s .address , name = "1" ) as b :
1898
- async with Client (s .address , asynchronous = True ) as c :
1899
- futures = c .map (inc , range (10 ))
1900
- await wait (futures )
1901
- assert a .data and b .data
1902
- await s .retire_workers (names = [0 ])
1903
- assert all (f .done () for f in futures )
1904
- assert len (b .data ) == 10
1883
+ @gen_cluster (nthreads = [], client = True )
1884
+ async def test_retire_names_str (c , s ):
1885
+ async with Worker (s .address , name = "0" ) as a , Worker (s .address , name = "1" ) as b :
1886
+ futures = c .map (inc , range (10 ))
1887
+ await wait (futures )
1888
+ assert a .data and b .data
1889
+ await s .retire_workers (names = [0 ])
1890
+ assert all (f .done () for f in futures )
1891
+ assert len (b .data ) == 10
1905
1892
1906
1893
1907
- @gen_cluster (client = True )
1894
+ @gen_cluster (
1895
+ client = True , config = {"distributed.scheduler.default-task-durations" : {"inc" : 100 }}
1896
+ )
1908
1897
async def test_get_task_duration (c , s , a , b ):
1909
- with dask .config .set (
1910
- {"distributed.scheduler.default-task-durations" : {"inc" : 100 }}
1911
- ):
1912
- future = c .submit (inc , 1 )
1913
- await future
1914
- assert 10 < s .task_prefixes ["inc" ].duration_average < 100
1898
+ future = c .submit (inc , 1 )
1899
+ await future
1900
+ assert 10 < s .task_prefixes ["inc" ].duration_average < 100
1915
1901
1916
- ts_pref1 = s .new_task ("inc-abcdefab" , None , "released" )
1917
- assert 10 < s .get_task_duration (ts_pref1 ) < 100
1902
+ ts_pref1 = s .new_task ("inc-abcdefab" , None , "released" )
1903
+ assert 10 < s .get_task_duration (ts_pref1 ) < 100
1918
1904
1919
- # make sure get_task_duration adds TaskStates to unknown dict
1920
- assert len (s .unknown_durations ) == 0
1921
- x = c .submit (slowinc , 1 , delay = 0.5 )
1922
- while len (s .tasks ) < 3 :
1923
- await asyncio .sleep (0.01 )
1905
+ # make sure get_task_duration adds TaskStates to unknown dict
1906
+ assert len (s .unknown_durations ) == 0
1907
+ x = c .submit (slowinc , 1 , delay = 0.5 )
1908
+ while len (s .tasks ) < 3 :
1909
+ await asyncio .sleep (0.01 )
1924
1910
1925
- ts = s .tasks [x .key ]
1926
- assert s .get_task_duration (ts ) == 0.5 # default
1927
- assert len (s .unknown_durations ) == 1
1928
- assert len (s .unknown_durations ["slowinc" ]) == 1
1911
+ ts = s .tasks [x .key ]
1912
+ assert s .get_task_duration (ts ) == 0.5 # default
1913
+ assert len (s .unknown_durations ) == 1
1914
+ assert len (s .unknown_durations ["slowinc" ]) == 1
1929
1915
1930
1916
1931
1917
@gen_cluster (client = True )
1932
1918
async def test_default_task_duration_splits (c , s , a , b ):
1933
- """This test ensures that the default task durations for shuffle split tasks are, by default, aligned with the task names of dask.dask"""
1934
-
1919
+ """Ensure that the default task durations for shuffle split tasks are, by default,
1920
+ aligned with the task names of dask.dask
1921
+ """
1935
1922
pd = pytest .importorskip ("pandas" )
1936
1923
dd = pytest .importorskip ("dask.dataframe" )
1937
1924
@@ -1956,13 +1943,13 @@ async def test_default_task_duration_splits(c, s, a, b):
1956
1943
assert default_time <= 1e-6
1957
1944
1958
1945
1959
- @pytest . mark . asyncio
1960
- async def test_no_danglng_asyncio_tasks (cleanup ):
1946
+ @gen_test ()
1947
+ async def test_no_danglng_asyncio_tasks ():
1961
1948
start = asyncio .all_tasks ()
1962
- async with Scheduler (port = 0 ) as s :
1963
- async with Worker (s .address , name = "0" ) as a :
1949
+ async with Scheduler (port = 0 , dashboard_address = ":0" ) as s :
1950
+ async with Worker (s .address , name = "0" ):
1964
1951
async with Client (s .address , asynchronous = True ) as c :
1965
- await asyncio . sleep ( 0.01 )
1952
+ await c . submit ( lambda : 1 )
1966
1953
1967
1954
tasks = asyncio .all_tasks ()
1968
1955
assert tasks == start
@@ -2278,10 +2265,12 @@ async def test_too_many_groups(c, s, a, b):
2278
2265
assert len (s .task_groups ) < 3
2279
2266
2280
2267
2281
- @pytest . mark . asyncio
2282
- async def test_multiple_listeners (cleanup ):
2268
+ @gen_test ()
2269
+ async def test_multiple_listeners ():
2283
2270
with captured_logger (logging .getLogger ("distributed.scheduler" )) as log :
2284
- async with Scheduler (port = 0 , protocol = ["inproc" , "tcp" ]) as s :
2271
+ async with Scheduler (
2272
+ port = 0 , dashboard_address = ":0" , protocol = ["inproc" , "tcp" ]
2273
+ ) as s :
2285
2274
async with Worker (s .listeners [0 ].contact_address ) as a :
2286
2275
async with Worker (s .listeners [1 ].contact_address ) as b :
2287
2276
assert a .address .startswith ("inproc" )
0 commit comments