Tasks
Core.Task
— TypeTask(func)
주어진 함수 func
(인수가 없는 호출 가능해야 함)을 실행하기 위해 Task
(즉, 코루틴)를 생성합니다. 이 함수가 반환되면 작업이 종료됩니다. 작업은 schedule
d 시 부모의 "세계 나이"에서 실행됩니다.
!!! 경고 기본적으로 작업은 sticky 비트가 true로 설정됩니다 t.sticky
. 이는 @async
의 역사적 기본값을 모델링합니다. Sticky 작업은 처음 예약된 작업 스레드에서만 실행될 수 있으며, 예약될 때 예약된 작업을 sticky로 만듭니다. Threads.@spawn
와 동일한 동작을 얻으려면 sticky 비트를 수동으로 false
로 설정하십시오.
예제
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
이 예제에서 b
는 아직 시작되지 않은 실행 가능한 Task
입니다.
Base.@task
— Macro@task
표현식을 실행하지 않고 Task
로 감싸고, Task
를 반환합니다. 이는 작업을 생성할 뿐이며 실행하지는 않습니다.
!!! 경고 기본적으로 작업은 sticky 비트가 true로 설정됩니다 t.sticky
. 이는 @async
의 역사적 기본값을 모델링합니다. Sticky 작업은 처음 예약된 작업 스레드에서만 실행될 수 있으며, 예약될 때 예약된 작업을 sticky로 만듭니다. Threads.@spawn
와 동일한 동작을 얻으려면 sticky 비트를 수동으로 false로 설정하십시오.
예제
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
표현식을 Task
로 감싸고 로컬 머신의 스케줄러 큐에 추가합니다.
값은 $
를 통해 @async
에 주입될 수 있으며, 이는 값을 생성된 기본 클로저에 직접 복사합니다. 이를 통해 변수의 값을 삽입할 수 있으며, 현재 작업에서 변수 값의 변경으로부터 비동기 코드를 격리할 수 있습니다.
항상 @async
보다 Threads.@spawn
을 사용하는 것이 강력히 권장됩니다 병렬성이 필요하지 않은 경우에도 특히 공개적으로 배포되는 라이브러리에서. 이는 @async
의 사용이 현재 Julia 구현에서 부모 작업의 워커 스레드 간 마이그레이션을 비활성화하기 때문입니다. 따라서 라이브러리 함수에서 @async
를 사용하는 것은 사용자 애플리케이션의 매우 다른 부분의 성능에 큰 영향을 미칠 수 있습니다.
$
를 통한 값의 주입은 Julia 1.4부터 가능합니다.
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
여러 개의 동시 작업을 사용하여 컬렉션(또는 여러 개의 동일 길이 컬렉션)에 대해 f
를 매핑합니다. 여러 컬렉션 인수가 있는 경우, f
는 요소별로 적용됩니다.
ntasks
는 동시에 실행할 작업의 수를 지정합니다. 컬렉션의 길이에 따라 ntasks
가 지정되지 않은 경우, 최대 100개의 작업이 동시 매핑에 사용됩니다.
ntasks
는 또한 인수가 없는 함수로 지정할 수 있습니다. 이 경우, 각 요소를 처리하기 전에 병렬로 실행할 작업의 수가 확인되며, ntasks_func
의 값이 현재 작업 수보다 크면 새로운 작업이 시작됩니다.
batch_size
가 지정되면 컬렉션은 배치 모드로 처리됩니다. 이 경우 f
는 인수 튜플의 Vector
를 받아들이고 결과의 벡터를 반환해야 하는 함수여야 합니다. 입력 벡터는 batch_size
또는 그보다 짧은 길이를 가집니다.
다음 예제는 매핑 함수가 실행되는 작업의 objectid
를 반환하여 서로 다른 작업에서의 실행을 강조합니다.
먼저, ntasks
가 정의되지 않은 경우 각 요소는 다른 작업에서 처리됩니다.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
ntasks=2
인 경우 모든 요소는 2개의 작업에서 처리됩니다.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
batch_size
가 정의된 경우, 매핑 함수는 인수 튜플의 배열을 받아들이고 결과의 배열을 반환하도록 변경해야 합니다. 이를 달성하기 위해 수정된 매핑 함수에서 map
이 사용됩니다.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
asyncmap
와 유사하지만, 컬렉션을 반환하는 대신 results
에 출력을 저장합니다.
!!! 경고 변형된 인수가 다른 인수와 메모리를 공유할 때 동작이 예기치 않게 될 수 있습니다.
Base.current_task
— Functioncurrent_task()
현재 실행 중인 Task
를 가져옵니다.
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
작업이 종료되었는지 확인합니다.
예제
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
작업이 실행되기 시작했는지 여부를 결정합니다.
예시
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
작업이 예외가 발생하여 종료되었는지 여부를 결정합니다.
예제
julia> a4() = error("작업 실패");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
이 함수는 최소한 Julia 1.3이 필요합니다.
Base.task_local_storage
— Methodtask_local_storage(key)
현재 작업의 작업 로컬 저장소에서 키의 값을 조회합니다.
Base.task_local_storage
— Methodtask_local_storage(key, value)
현재 작업의 작업 로컬 저장소에 키에 값을 할당합니다.
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
value
가 key
에 할당된 수정된 작업-로컬 저장소로 body
함수를 호출합니다. key
의 이전 값 또는 그 부재는 이후에 복원됩니다. 동적 범위를 에뮬레이션하는 데 유용합니다.
Scheduling
Base.yield
— Functionyield()
스케줄러로 전환하여 다른 예약된 작업이 실행될 수 있도록 합니다. 이 함수를 호출하는 작업은 여전히 실행 가능하며, 다른 실행 가능한 작업이 없으면 즉시 재시작됩니다.
yield(t::Task, arg = nothing)
schedule(t, arg); yield()
의 빠르고 불공정한 스케줄링 버전으로, 스케줄러를 호출하기 전에 즉시 t
에 양보합니다.
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
주어진 작업으로 전환합니다. 작업으로 처음 전환할 때, 작업의 함수는 인수 없이 호출됩니다. 이후 전환에서는 arg
가 작업의 마지막 yieldto
호출에서 반환됩니다. 이는 상태나 스케줄링을 고려하지 않고 작업만 전환하는 저수준 호출입니다. 사용이 권장되지 않습니다.
Base.sleep
— Functionsleep(seconds)
현재 작업을 지정된 초 수만큼 차단합니다. 최소 수면 시간은 1밀리초 또는 0.001
의 입력입니다.
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
스케줄러의 큐에 Task
를 추가합니다. 이는 시스템이 다른 작업을 수행하지 않을 때 작업이 지속적으로 실행되도록 하며, 작업이 wait
와 같은 차단 작업을 수행하지 않는 한 계속 실행됩니다.
두 번째 인수 val
이 제공되면, 작업이 다시 실행될 때 ( yieldto
의 반환 값을 통해) 작업에 전달됩니다. error
가 true
인 경우, 값은 깨어난 작업에서 예외로 발생합니다.
이미 시작된 임의의 Task
에 대해 schedule
을 사용하는 것은 잘못된 것입니다. 자세한 내용은 API 참조를 참조하십시오.
기본적으로 작업은 t.sticky
가 true로 설정됩니다. 이는 @async
의 역사적 기본값을 모델링합니다. 스티키 작업은 처음 스케줄된 작업 스레드에서만 실행될 수 있으며, 스케줄될 때 스케줄된 작업을 스티키로 만듭니다. Threads.@spawn
과 동일한 동작을 얻으려면 스티키 비트를 수동으로 false
로 설정하십시오.
예제
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
작업 t
가 실패하면 stderr
에 오류 로그를 출력합니다.
예시
julia> Base._wait(errormonitor(Threads.@spawn error("작업 실패")))
Unhandled Task ERROR: 작업 실패
Stacktrace:
[...]
Base.@sync
— Macro@sync
모든 어휘적으로 포함된 @async
, @spawn
, Distributed.@spawnat
및 Distributed.@distributed
의 사용이 완료될 때까지 기다립니다. 포함된 비동기 작업에서 발생한 모든 예외는 수집되어 CompositeException
으로 던져집니다.
예제
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— FunctionThreads.Condition
에 대한 특별한 주의 사항:
호출자는 이 메서드를 호출하기 전에 Threads.Condition
을 소유하는 lock
를 보유하고 있어야 합니다. 호출 작업은 다른 작업이 이를 깨울 때까지 차단됩니다. 일반적으로 같은 Threads.Condition
객체에서 notify
를 호출하여 깨웁니다. 차단될 때 잠금은 원자적으로 해제되며(재귀적으로 잠금이 되어 있더라도), 반환되기 전에 다시 획득됩니다.
wait(r::Future)
지정된 Future
에 대해 값이 사용 가능해질 때까지 기다립니다.
wait(r::RemoteChannel, args...)
지정된 RemoteChannel
에서 값이 사용 가능해질 때까지 대기합니다.
wait([x])
현재 작업을 특정 이벤트가 발생할 때까지 차단합니다. 인수의 유형에 따라 다릅니다:
Channel
: 채널에 값이 추가될 때까지 기다립니다.Condition
: 조건에서notify
를 기다리고notify
에 전달된val
매개변수를 반환합니다. 조건에서 기다릴 때first=true
를 전달할 수 있으며, 이는 대기자가notify
에서 깨어나는 순서에서 첫 번째로 배치되도록 하여 일반적인 선입선출 동작 대신에 작동합니다.Process
: 프로세스 또는 프로세스 체인이 종료될 때까지 기다립니다. 프로세스의exitcode
필드를 사용하여 성공 또는 실패를 판단할 수 있습니다.Task
:Task
가 완료될 때까지 기다립니다. 작업이 예외로 실패하면TaskFailedException
(실패한 작업을 래핑함)이 발생합니다.RawFD
: 파일 설명자에서 변경 사항을 기다립니다(자세한 내용은FileWatching
패키지를 참조하십시오).
인수가 전달되지 않으면 작업은 정의되지 않은 기간 동안 차단됩니다. 작업은 schedule
또는 yieldto
에 대한 명시적 호출로만 다시 시작할 수 있습니다.
종종 wait
는 대기 중인 조건이 충족될 때까지 진행하기 위해 while
루프 내에서 호출됩니다.
wait(c::Channel)
Channel
isready
가 될 때까지 블록합니다.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # 작업은 채널이 준비되지 않았기 때문에 블록됨
false
julia> put!(c, 1);
julia> istaskdone(task) # 작업이 이제 언블록됨
true
Base.fetch
— Methodfetch(t::Task)
Task
가 완료될 때까지 기다린 후, 그 결과 값을 반환합니다. 작업이 예외로 실패하면 실패한 작업을 감싸는 TaskFailedException
가 발생합니다.
Base.fetch
— Methodfetch(x::Any)
x
를 반환합니다.
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
testcb()
가 true
를 반환하거나 timeout
초가 경과할 때까지 기다립니다. 두 경우 중 더 이른 경우에 종료됩니다. 테스트 함수는 매 pollint
초마다 폴링됩니다. pollint
의 최소값은 0.001초, 즉 1밀리초입니다.
:ok
또는 :timed_out
을 반환합니다.
예제
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
작업이 대기할 수 있는 엣지 트리거 이벤트 소스를 생성합니다. Condition
에서 wait
를 호출하는 작업은 중단되고 대기열에 추가됩니다. 나중에 Condition
에서 notify
가 호출되면 작업이 깨어납니다. 조건에서 대기하는 것은 값을 반환하거나 notify
의 선택적 인수가 사용될 경우 오류를 발생시킬 수 있습니다. 엣지 트리거링은 notify
가 호출될 때 대기 중인 작업만 깨어날 수 있음을 의미합니다. 레벨 트리거 알림의 경우 알림이 발생했는지 추적하기 위해 추가 상태를 유지해야 합니다. Channel
및 Threads.Event
유형은 이를 수행하며 레벨 트리거 이벤트에 사용할 수 있습니다.
이 객체는 스레드 안전하지 않습니다. 스레드 안전 버전은 Threads.Condition
를 참조하십시오.
Base.Threads.Condition
— TypeThreads.Condition([lock])
스레드 안전한 버전의 Base.Condition
입니다.
Threads.Condition
에서 wait
또는 notify
를 호출하려면 먼저 lock
를 호출해야 합니다. wait
가 호출되면, 블로킹 동안 잠금이 원자적으로 해제되며, wait
가 반환되기 전에 다시 획득됩니다. 따라서 Threads.Condition
c
의 관용적인 사용은 다음과 같습니다:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
이 기능은 최소한 Julia 1.2가 필요합니다.
Base.Event
— TypeEvent([autoreset=false])
레벨 트리거 이벤트 소스를 생성합니다. Event
에서 wait
를 호출하는 작업은 중단되고 대기열에 추가되며, Event
에서 notify
가 호출될 때까지 대기합니다. notify
가 호출된 후, Event
는 신호 상태를 유지하며, reset
이 호출될 때까지 작업이 대기할 때 더 이상 차단되지 않습니다.
autoreset
이 true인 경우, notify
호출당 최대 하나의 작업만 wait
에서 해제됩니다.
이는 notify/wait에 대한 획득 및 해제 메모리 순서를 제공합니다.
이 기능은 최소한 Julia 1.1이 필요합니다.
autoreset
기능 및 메모리 순서 보장은 최소한 Julia 1.8이 필요합니다.
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
조건을 기다리고 있는 작업을 깨우고, val
을 전달합니다. all
이 true
(기본값)인 경우 모든 대기 작업이 깨워지며, 그렇지 않으면 하나만 깨워집니다. error
가 true
인 경우, 전달된 값이 깨워진 작업에서 예외로 발생합니다.
깨워진 작업의 수를 반환합니다. condition
에서 대기 중인 작업이 없으면 0을 반환합니다.
Base.reset
— MethodBase.Semaphore
— TypeSemaphore(sem_size)
최대 sem_size
개의 획득이 언제든지 사용될 수 있도록 하는 카운팅 세마포어를 생성합니다. 각 획득은 해제와 일치해야 합니다.
이는 획득/해제 호출에 대한 획득 및 해제 메모리 순서를 제공합니다.
Base.acquire
— Functionacquire(s::Semaphore)
sem_size
허가 중 하나가 사용 가능해질 때까지 대기하며, 하나를 획득할 수 있을 때까지 차단됩니다.
acquire(f, s::Semaphore)
세마포어 s
에서 획득한 후 f
를 실행하고, 완료 또는 오류 시 release
합니다.
예를 들어, 동시에 foo
가 2번만 활성화되도록 보장하는 do-block 형태:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
이 메서드는 최소한 Julia 1.8이 필요합니다.
Base.release
— Functionrelease(s::Semaphore)
풀에 하나의 허가를 반환하여 다른 작업이 이를 획득하고 실행을 재개할 수 있도록 할 수 있습니다.
Base.AbstractLock
— TypeBase.lock
— Functionlock(lock)
lock
이 사용 가능해지면 이를 획득합니다. 만약 lock
이 다른 작업/스레드에 의해 이미 잠겨 있다면, 사용 가능해질 때까지 기다립니다.
각 lock
은 unlock
으로 매칭되어야 합니다.
lock(f::Function, lock)
lock
을 획득하고, lock
이 유지되는 동안 f
를 실행하며, f
가 반환되면 lock
을 해제합니다. 만약 lock
이 다른 작업/스레드에 의해 이미 잠겨 있다면, 사용 가능해질 때까지 기다립니다.
이 함수가 반환되면 lock
이 해제되므로, 호출자는 unlock
을 시도해서는 안 됩니다.
참고: @lock
.
두 번째 인수로 Channel
를 사용하는 것은 Julia 1.7 이상이 필요합니다.
lock(f::Function, l::Lockable)
l
과 관련된 잠금을 획득하고, 잠금을 유지한 채로 f
를 실행하며, f
가 반환될 때 잠금을 해제합니다. f
는 l
에 의해 래핑된 값을 하나의 위치 인수로 받습니다. 잠금이 다른 작업/스레드에 의해 이미 잠겨 있는 경우, 사용 가능해질 때까지 기다립니다. 이 함수가 반환되면 lock
이 해제되므로 호출자는 이를 unlock
하려고 시도해서는 안 됩니다.
최소한 Julia 1.11이 필요합니다.
Base.unlock
— Functionunlock(lock)
lock
의 소유권을 해제합니다.
이것이 이전에 획득된 재귀 잠금인 경우, 내부 카운터를 감소시키고 즉시 반환합니다.
Base.trylock
— Functiontrylock(lock) -> 성공 (부울)
잠금이 사용 가능하면 잠금을 획득하고 성공하면 true
를 반환합니다. 잠금이 이미 다른 작업/스레드에 의해 잠겨 있는 경우 false
를 반환합니다.
각 성공적인 trylock
은 unlock
으로 매칭되어야 합니다.
함수 trylock
은 islocked
와 결합하여 테스트-테스트-세트 또는 지수 백오프 알고리즘을 작성하는 데 사용할 수 있습니다 typeof(lock)
에 의해 지원되는 경우 (문서를 참조하십시오).
Base.islocked
— Functionislocked(lock) -> 상태 (부울)
lock
이 어떤 작업/스레드에 의해 보유되고 있는지 확인합니다. 이 함수는 단독으로 동기화에 사용되어서는 안 됩니다. 그러나 islocked
와 trylock
를 결합하면 typeof(lock)
에 의해 지원되는 경우 테스트-테스트-세트 또는 지수 백오프 알고리즘을 작성하는 데 사용할 수 있습니다(문서를 참조하십시오).
확장 도움말
예를 들어, lock
구현이 아래에 문서화된 속성을 만족하는 경우 지수 백오프를 다음과 같이 구현할 수 있습니다.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
구현
잠금 구현은 다음 속성으로 islocked
를 정의하고 이를 문서 문자열에 명시하는 것이 좋습니다.
islocked(lock)
은 데이터 경합이 없습니다.islocked(lock)
이false
를 반환하면, 다른 작업의 간섭이 없을 경우trylock(lock)
의 즉각적인 호출이 성공해야 합니다(true
를 반환).
Base.ReentrantLock
— TypeReentrantLock()
동기화를 위한 재진입 잠금을 생성합니다 Task
s. 동일한 작업은 필요에 따라 잠금을 여러 번 획득할 수 있습니다(이것이 이름의 "Reentrant" 부분이 의미하는 바입니다). 각 lock
는 unlock
과 쌍을 이루어야 합니다.
lock
을 호출하면 해당 스레드에서 최종화기의 실행이 해당 unlock
이 호출될 때까지 금지됩니다. 아래에 설명된 표준 잠금 패턴의 사용은 자연스럽게 지원되어야 하지만, 시도/잠금 순서를 뒤집거나 시도 블록을 완전히 누락하는 것(예: 잠금을 유지한 채로 반환하려고 시도하는 것)에 주의해야 합니다:
이것은 잠금/해제 호출에 대한 획득/해제 메모리 순서를 제공합니다.
lock(l)
try
<atomic work>
finally
unlock(l)
end
!islocked(lck::ReentrantLock)
조건이 참이면, trylock(lck)
는 "동시에" 잠금을 보유하려고 시도하는 다른 작업이 없는 한 성공합니다.
Base.@lock
— Macro@lock l expr
lock(f, l::AbstractLock)
의 매크로 버전이지만 f
함수 대신 expr
을 사용합니다. 다음과 같이 확장됩니다:
lock(l)
try
expr
finally
unlock(l)
end
이는 do
블록과 함께 lock
를 사용하는 것과 유사하지만 클로저를 생성하지 않으므로 성능을 향상시킬 수 있습니다.
@lock
은 Julia 1.3에 추가되었으며, Julia 1.10에서 내보내졌습니다.
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
value
를 감싸고 제공된 lock
과 연결된 Lockable
객체를 생성합니다. 이 객체는 @lock
, lock
, trylock
, unlock
를 지원합니다. 값을 접근하려면 잠금을 유지하면서 lockable 객체의 인덱스를 사용해야 합니다.
최소 Julia 1.11이 필요합니다.
예제
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # 값을 접근하려면 잠금을 유지해야 합니다
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
타입 T
의 객체를 전달하는 채널의 표현입니다.
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
T
유형의 최대 size
개체를 보유할 수 있는 내부 버퍼가 있는 Channel
을 생성합니다. put!
호출은 채널이 가득 차면 take!
로 개체가 제거될 때까지 차단됩니다.
Channel(0)
은 버퍼가 없는 채널을 생성합니다. put!
은 일치하는 take!
가 호출될 때까지 차단됩니다. 그리고 그 반대도 마찬가지입니다.
기타 생성자:
Channel()
: 기본 생성자로,Channel{Any}(0)
와 동일합니다.Channel(Inf)
:Channel{Any}(typemax(Int))
와 동일합니다.Channel(sz)
:Channel{Any}(sz)
와 동일합니다.
기본 생성자 Channel()
과 기본 size=0
은 Julia 1.3에 추가되었습니다.
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
func
에서 새 작업을 생성하고, 이를 크기 size
와 유형 T
의 새 채널에 바인딩한 후, 단일 호출로 작업을 예약합니다. 작업이 종료되면 채널이 자동으로 닫힙니다.
func
는 바인딩된 채널을 유일한 인수로 받아야 합니다.
생성된 작업에 대한 참조가 필요하면 키워드 인수 taskref
를 통해 Ref{Task}
객체를 전달하십시오.
spawn=true
인 경우, func
에 대해 생성된 Task
는 다른 스레드에서 병렬로 예약될 수 있으며, 이는 Threads.@spawn
를 통해 작업을 생성하는 것과 동등합니다.
spawn=true
이고 threadpool
인수가 설정되지 않은 경우 기본값은 :default
입니다.
threadpool
인수가 설정된 경우(:default
또는 :interactive
), 이는 spawn=true
를 의미하며 새 작업이 지정된 스레드 풀에 생성됩니다.
Channel
을 반환합니다.
예제
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
생성된 작업 참조하기:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
spawn=
매개변수는 Julia 1.3에서 추가되었습니다. 이 생성자는 Julia 1.3에서 추가되었습니다. 이전 버전의 Julia에서는 채널이 키워드 인수를 사용하여 size
와 T
를 설정했지만, 이러한 생성자는 더 이상 사용되지 않습니다.
threadpool=
인수는 Julia 1.9에서 추가되었습니다.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
채널 c
에 항목 v
를 추가합니다. 채널이 가득 차면 블록됩니다.
버퍼가 없는 채널의 경우, 다른 작업에서 take!
가 수행될 때까지 블록됩니다.
v
는 이제 put!
가 호출될 때 채널의 유형으로 convert
를 통해 변환됩니다.
Base.take!
— Methodtake!(c::Channel)
채널(Channel
)에서 값을 제거하고 반환합니다. 데이터가 사용 가능할 때까지 블록됩니다. 버퍼가 없는 채널의 경우, 다른 작업이 put!
를 수행할 때까지 블록됩니다.
예제
버퍼가 있는 채널:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
버퍼가 없는 채널:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
채널이 Channel
에 값이 저장되어 있는지 여부를 결정합니다. 즉시 반환되며, 블록하지 않습니다.
버퍼가 없는 채널의 경우, put!
를 기다리고 있는 작업이 있는 경우 true
를 반환합니다.
예제
버퍼가 있는 채널:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
버퍼가 없는 채널:
julia> c = Channel();
julia> isready(c) # 값을 넣으려고 대기 중인 작업이 없음!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # put! 작업을 스케줄링합니다.
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
Channel
에서 첫 번째로 사용 가능한 항목을 기다리고 반환합니다(제거하지 않음). 참고: fetch
는 버퍼가 없는(0 크기) Channel
에서는 지원되지 않습니다.
예제
버퍼가 있는 채널:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # 항목이 제거되지 않음
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
채널을 닫습니다. 예외(excp
로 선택적으로 제공됨)는 다음에 의해 발생합니다:
Base.bind
— Methodbind(chnl::Channel, task::Task)
chnl
의 생명주기를 작업에 연결합니다. Channel
chnl
은 작업이 종료될 때 자동으로 닫힙니다. 작업에서 발생한 모든 uncaught 예외는 chnl
의 모든 대기자에게 전파됩니다.
chnl
객체는 작업 종료와 관계없이 명시적으로 닫을 수 있습니다. 종료된 작업은 이미 닫힌 Channel
객체에 영향을 미치지 않습니다.
하나의 채널이 여러 작업에 바인딩될 때, 가장 먼저 종료된 작업이 채널을 닫습니다. 여러 채널이 동일한 작업에 바인딩될 때, 작업의 종료는 모든 바인딩된 채널을 닫습니다.
예제
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
schedule
의 가장 쉬운 올바른 사용법은 아직 시작되지 않은 (예정된) Task
에서 사용하는 것입니다. 그러나 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
와 wait
를 동기화 인터페이스를 구성하는 매우 저수준의 빌딩 블록으로 사용할 수 있습니다. schedule(task)
를 호출하기 위한 중요한 전제 조건은 호출자가 task
를 "소유"해야 한다는 것입니다. 즉, 주어진 task
에서 wait
호출이 schedule(task)
를 호출하는 코드에서 알고 있는 위치에서 발생하고 있어야 합니다. 이러한 전제 조건을 보장하기 위한 한 가지 전략은 다음 예제에서 보여준 것처럼 원자성을 사용하는 것입니다.
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
는 한 작업이 다른 작업의 notify
를 wait
할 수 있게 해줍니다. 이는 wait
가 단일 작업에서 한 번만 사용될 수 있기 때문에 제한된 통신 인터페이스입니다 (비원자적 할당인 ev.task
에 유의하세요).
이 예제에서 notify(ev::OneWayEvent)
는 그것이 상태를 OWE_WAITING
에서 OWE_NOTIFYING
으로 수정하는 경우에만 schedule(ev.task)
를 호출할 수 있습니다. 이는 wait(ev::OneWayEvent)
를 실행하는 작업이 이제 ok
분기에 있으며, 다른 작업이 schedule(ev.task)
를 시도할 수 없음을 알려줍니다. 왜냐하면 그들의 @atomicreplace(ev.state, state => OWE_NOTIFYING)
가 실패할 것이기 때문입니다.