Tasks

Core.TaskType
Task(func)

주어진 함수 func (인수가 없는 호출 가능해야 함)을 실행하기 위해 Task (즉, 코루틴)를 생성합니다. 이 함수가 반환되면 작업이 종료됩니다. 작업은 scheduled 시 부모의 "세계 나이"에서 실행됩니다.

!!! 경고 기본적으로 작업은 sticky 비트가 true로 설정됩니다 t.sticky. 이는 @async의 역사적 기본값을 모델링합니다. Sticky 작업은 처음 예약된 작업 스레드에서만 실행될 수 있으며, 예약될 때 예약된 작업을 sticky로 만듭니다. Threads.@spawn와 동일한 동작을 얻으려면 sticky 비트를 수동으로 false로 설정하십시오.

예제

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

이 예제에서 b는 아직 시작되지 않은 실행 가능한 Task입니다.

source
Base.@taskMacro
@task

표현식을 실행하지 않고 Task로 감싸고, Task를 반환합니다. 이는 작업을 생성할 뿐이며 실행하지는 않습니다.

!!! 경고 기본적으로 작업은 sticky 비트가 true로 설정됩니다 t.sticky. 이는 @async의 역사적 기본값을 모델링합니다. Sticky 작업은 처음 예약된 작업 스레드에서만 실행될 수 있으며, 예약될 때 예약된 작업을 sticky로 만듭니다. Threads.@spawn와 동일한 동작을 얻으려면 sticky 비트를 수동으로 false로 설정하십시오.

예제

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

표현식을 Task로 감싸고 로컬 머신의 스케줄러 큐에 추가합니다.

값은 $를 통해 @async에 주입될 수 있으며, 이는 값을 생성된 기본 클로저에 직접 복사합니다. 이를 통해 변수의 을 삽입할 수 있으며, 현재 작업에서 변수 값의 변경으로부터 비동기 코드를 격리할 수 있습니다.

Warning

항상 @async보다 Threads.@spawn을 사용하는 것이 강력히 권장됩니다 병렬성이 필요하지 않은 경우에도 특히 공개적으로 배포되는 라이브러리에서. 이는 @async의 사용이 현재 Julia 구현에서 부모 작업의 워커 스레드 간 마이그레이션을 비활성화하기 때문입니다. 따라서 라이브러리 함수에서 @async를 사용하는 것은 사용자 애플리케이션의 매우 다른 부분의 성능에 큰 영향을 미칠 수 있습니다.

Julia 1.4

$를 통한 값의 주입은 Julia 1.4부터 가능합니다.

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

여러 개의 동시 작업을 사용하여 컬렉션(또는 여러 개의 동일 길이 컬렉션)에 대해 f를 매핑합니다. 여러 컬렉션 인수가 있는 경우, f는 요소별로 적용됩니다.

ntasks는 동시에 실행할 작업의 수를 지정합니다. 컬렉션의 길이에 따라 ntasks가 지정되지 않은 경우, 최대 100개의 작업이 동시 매핑에 사용됩니다.

ntasks는 또한 인수가 없는 함수로 지정할 수 있습니다. 이 경우, 각 요소를 처리하기 전에 병렬로 실행할 작업의 수가 확인되며, ntasks_func의 값이 현재 작업 수보다 크면 새로운 작업이 시작됩니다.

batch_size가 지정되면 컬렉션은 배치 모드로 처리됩니다. 이 경우 f는 인수 튜플의 Vector를 받아들이고 결과의 벡터를 반환해야 하는 함수여야 합니다. 입력 벡터는 batch_size 또는 그보다 짧은 길이를 가집니다.

다음 예제는 매핑 함수가 실행되는 작업의 objectid를 반환하여 서로 다른 작업에서의 실행을 강조합니다.

먼저, ntasks가 정의되지 않은 경우 각 요소는 다른 작업에서 처리됩니다.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2인 경우 모든 요소는 2개의 작업에서 처리됩니다.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

batch_size가 정의된 경우, 매핑 함수는 인수 튜플의 배열을 받아들이고 결과의 배열을 반환하도록 변경해야 합니다. 이를 달성하기 위해 수정된 매핑 함수에서 map이 사용됩니다.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

asyncmap와 유사하지만, 컬렉션을 반환하는 대신 results에 출력을 저장합니다.

!!! 경고 변형된 인수가 다른 인수와 메모리를 공유할 때 동작이 예기치 않게 될 수 있습니다.

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

작업이 종료되었는지 확인합니다.

예제

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

작업이 실행되기 시작했는지 여부를 결정합니다.

예시

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

작업이 예외가 발생하여 종료되었는지 여부를 결정합니다.

예제

julia> a4() = error("작업 실패");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

이 함수는 최소한 Julia 1.3이 필요합니다.

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

valuekey에 할당된 수정된 작업-로컬 저장소로 body 함수를 호출합니다. key의 이전 값 또는 그 부재는 이후에 복원됩니다. 동적 범위를 에뮬레이션하는 데 유용합니다.

source

Scheduling

Base.yieldFunction
yield()

스케줄러로 전환하여 다른 예약된 작업이 실행될 수 있도록 합니다. 이 함수를 호출하는 작업은 여전히 실행 가능하며, 다른 실행 가능한 작업이 없으면 즉시 재시작됩니다.

source
yield(t::Task, arg = nothing)

schedule(t, arg); yield()의 빠르고 불공정한 스케줄링 버전으로, 스케줄러를 호출하기 전에 즉시 t에 양보합니다.

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

주어진 작업으로 전환합니다. 작업으로 처음 전환할 때, 작업의 함수는 인수 없이 호출됩니다. 이후 전환에서는 arg가 작업의 마지막 yieldto 호출에서 반환됩니다. 이는 상태나 스케줄링을 고려하지 않고 작업만 전환하는 저수준 호출입니다. 사용이 권장되지 않습니다.

source
Base.sleepFunction
sleep(seconds)

현재 작업을 지정된 초 수만큼 차단합니다. 최소 수면 시간은 1밀리초 또는 0.001의 입력입니다.

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

스케줄러의 큐에 Task를 추가합니다. 이는 시스템이 다른 작업을 수행하지 않을 때 작업이 지속적으로 실행되도록 하며, 작업이 wait와 같은 차단 작업을 수행하지 않는 한 계속 실행됩니다.

두 번째 인수 val이 제공되면, 작업이 다시 실행될 때 ( yieldto의 반환 값을 통해) 작업에 전달됩니다. errortrue인 경우, 값은 깨어난 작업에서 예외로 발생합니다.

Warning

이미 시작된 임의의 Task에 대해 schedule을 사용하는 것은 잘못된 것입니다. 자세한 내용은 API 참조를 참조하십시오.

Warning

기본적으로 작업은 t.sticky가 true로 설정됩니다. 이는 @async의 역사적 기본값을 모델링합니다. 스티키 작업은 처음 스케줄된 작업 스레드에서만 실행될 수 있으며, 스케줄될 때 스케줄된 작업을 스티키로 만듭니다. Threads.@spawn과 동일한 동작을 얻으려면 스티키 비트를 수동으로 false로 설정하십시오.

예제

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

작업 t가 실패하면 stderr에 오류 로그를 출력합니다.

예시

julia> Base._wait(errormonitor(Threads.@spawn error("작업 실패")))
Unhandled Task ERROR: 작업 실패
Stacktrace:
[...]
source
Base.@syncMacro
@sync

모든 어휘적으로 포함된 @async, @spawn, Distributed.@spawnatDistributed.@distributed의 사용이 완료될 때까지 기다립니다. 포함된 비동기 작업에서 발생한 모든 예외는 수집되어 CompositeException으로 던져집니다.

예제

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

Threads.Condition에 대한 특별한 주의 사항:

호출자는 이 메서드를 호출하기 전에 Threads.Condition을 소유하는 lock를 보유하고 있어야 합니다. 호출 작업은 다른 작업이 이를 깨울 때까지 차단됩니다. 일반적으로 같은 Threads.Condition 객체에서 notify를 호출하여 깨웁니다. 차단될 때 잠금은 원자적으로 해제되며(재귀적으로 잠금이 되어 있더라도), 반환되기 전에 다시 획득됩니다.

source
wait(r::Future)

지정된 Future에 대해 값이 사용 가능해질 때까지 기다립니다.

source
wait(r::RemoteChannel, args...)

지정된 RemoteChannel에서 값이 사용 가능해질 때까지 대기합니다.

source
wait([x])

현재 작업을 특정 이벤트가 발생할 때까지 차단합니다. 인수의 유형에 따라 다릅니다:

  • Channel: 채널에 값이 추가될 때까지 기다립니다.
  • Condition: 조건에서 notify를 기다리고 notify에 전달된 val 매개변수를 반환합니다. 조건에서 기다릴 때 first=true를 전달할 수 있으며, 이는 대기자가 notify에서 깨어나는 순서에서 첫 번째로 배치되도록 하여 일반적인 선입선출 동작 대신에 작동합니다.
  • Process: 프로세스 또는 프로세스 체인이 종료될 때까지 기다립니다. 프로세스의 exitcode 필드를 사용하여 성공 또는 실패를 판단할 수 있습니다.
  • Task: Task가 완료될 때까지 기다립니다. 작업이 예외로 실패하면 TaskFailedException(실패한 작업을 래핑함)이 발생합니다.
  • RawFD: 파일 설명자에서 변경 사항을 기다립니다(자세한 내용은 FileWatching 패키지를 참조하십시오).

인수가 전달되지 않으면 작업은 정의되지 않은 기간 동안 차단됩니다. 작업은 schedule 또는 yieldto에 대한 명시적 호출로만 다시 시작할 수 있습니다.

종종 wait는 대기 중인 조건이 충족될 때까지 진행하기 위해 while 루프 내에서 호출됩니다.

source
wait(c::Channel)

Channel isready가 될 때까지 블록합니다.

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # 작업은 채널이 준비되지 않았기 때문에 블록됨
false

julia> put!(c, 1);

julia> istaskdone(task)  # 작업이 이제 언블록됨
true
source
Base.fetchMethod
fetch(t::Task)

Task가 완료될 때까지 기다린 후, 그 결과 값을 반환합니다. 작업이 예외로 실패하면 실패한 작업을 감싸는 TaskFailedException가 발생합니다.

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

testcb()true를 반환하거나 timeout 초가 경과할 때까지 기다립니다. 두 경우 중 더 이른 경우에 종료됩니다. 테스트 함수는 매 pollint 초마다 폴링됩니다. pollint의 최소값은 0.001초, 즉 1밀리초입니다.

:ok 또는 :timed_out을 반환합니다.

예제

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

작업이 대기할 수 있는 엣지 트리거 이벤트 소스를 생성합니다. Condition에서 wait를 호출하는 작업은 중단되고 대기열에 추가됩니다. 나중에 Condition에서 notify가 호출되면 작업이 깨어납니다. 조건에서 대기하는 것은 값을 반환하거나 notify의 선택적 인수가 사용될 경우 오류를 발생시킬 수 있습니다. 엣지 트리거링은 notify가 호출될 때 대기 중인 작업만 깨어날 수 있음을 의미합니다. 레벨 트리거 알림의 경우 알림이 발생했는지 추적하기 위해 추가 상태를 유지해야 합니다. ChannelThreads.Event 유형은 이를 수행하며 레벨 트리거 이벤트에 사용할 수 있습니다.

이 객체는 스레드 안전하지 않습니다. 스레드 안전 버전은 Threads.Condition를 참조하십시오.

source
Base.Threads.ConditionType
Threads.Condition([lock])

스레드 안전한 버전의 Base.Condition입니다.

Threads.Condition에서 wait 또는 notify를 호출하려면 먼저 lock를 호출해야 합니다. wait가 호출되면, 블로킹 동안 잠금이 원자적으로 해제되며, wait가 반환되기 전에 다시 획득됩니다. 따라서 Threads.Condition c의 관용적인 사용은 다음과 같습니다:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

이 기능은 최소한 Julia 1.2가 필요합니다.

source
Base.EventType
Event([autoreset=false])

레벨 트리거 이벤트 소스를 생성합니다. Event에서 wait를 호출하는 작업은 중단되고 대기열에 추가되며, Event에서 notify가 호출될 때까지 대기합니다. notify가 호출된 후, Event는 신호 상태를 유지하며, reset이 호출될 때까지 작업이 대기할 때 더 이상 차단되지 않습니다.

autoreset이 true인 경우, notify 호출당 최대 하나의 작업만 wait에서 해제됩니다.

이는 notify/wait에 대한 획득 및 해제 메모리 순서를 제공합니다.

Julia 1.1

이 기능은 최소한 Julia 1.1이 필요합니다.

Julia 1.8

autoreset 기능 및 메모리 순서 보장은 최소한 Julia 1.8이 필요합니다.

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

조건을 기다리고 있는 작업을 깨우고, val을 전달합니다. alltrue(기본값)인 경우 모든 대기 작업이 깨워지며, 그렇지 않으면 하나만 깨워집니다. errortrue인 경우, 전달된 값이 깨워진 작업에서 예외로 발생합니다.

깨워진 작업의 수를 반환합니다. condition에서 대기 중인 작업이 없으면 0을 반환합니다.

source
Base.resetMethod
reset(::Event)

Event를 초기화하여 설정되지 않은 상태로 되돌립니다. 그런 다음 이후의 wait 호출은 notify가 다시 호출될 때까지 차단됩니다.

source
Base.SemaphoreType
Semaphore(sem_size)

최대 sem_size 개의 획득이 언제든지 사용될 수 있도록 하는 카운팅 세마포어를 생성합니다. 각 획득은 해제와 일치해야 합니다.

이는 획득/해제 호출에 대한 획득 및 해제 메모리 순서를 제공합니다.

source
Base.acquireFunction
acquire(s::Semaphore)

sem_size 허가 중 하나가 사용 가능해질 때까지 대기하며, 하나를 획득할 수 있을 때까지 차단됩니다.

source
acquire(f, s::Semaphore)

세마포어 s에서 획득한 후 f를 실행하고, 완료 또는 오류 시 release합니다.

예를 들어, 동시에 foo가 2번만 활성화되도록 보장하는 do-block 형태:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

이 메서드는 최소한 Julia 1.8이 필요합니다.

source
Base.releaseFunction
release(s::Semaphore)

풀에 하나의 허가를 반환하여 다른 작업이 이를 획득하고 실행을 재개할 수 있도록 할 수 있습니다.

source
Base.lockFunction
lock(lock)

lock이 사용 가능해지면 이를 획득합니다. 만약 lock이 다른 작업/스레드에 의해 이미 잠겨 있다면, 사용 가능해질 때까지 기다립니다.

lockunlock으로 매칭되어야 합니다.

source
lock(f::Function, lock)

lock을 획득하고, lock이 유지되는 동안 f를 실행하며, f가 반환되면 lock을 해제합니다. 만약 lock이 다른 작업/스레드에 의해 이미 잠겨 있다면, 사용 가능해질 때까지 기다립니다.

이 함수가 반환되면 lock이 해제되므로, 호출자는 unlock을 시도해서는 안 됩니다.

참고: @lock.

Julia 1.7

두 번째 인수로 Channel를 사용하는 것은 Julia 1.7 이상이 필요합니다.

source

lock(f::Function, l::Lockable)

l과 관련된 잠금을 획득하고, 잠금을 유지한 채로 f를 실행하며, f가 반환될 때 잠금을 해제합니다. fl에 의해 래핑된 값을 하나의 위치 인수로 받습니다. 잠금이 다른 작업/스레드에 의해 이미 잠겨 있는 경우, 사용 가능해질 때까지 기다립니다. 이 함수가 반환되면 lock이 해제되므로 호출자는 이를 unlock하려고 시도해서는 안 됩니다.

Julia 1.11

최소한 Julia 1.11이 필요합니다.

source
Base.unlockFunction
unlock(lock)

lock의 소유권을 해제합니다.

이것이 이전에 획득된 재귀 잠금인 경우, 내부 카운터를 감소시키고 즉시 반환합니다.

source
Base.trylockFunction
trylock(lock) -> 성공 (부울)

잠금이 사용 가능하면 잠금을 획득하고 성공하면 true를 반환합니다. 잠금이 이미 다른 작업/스레드에 의해 잠겨 있는 경우 false를 반환합니다.

각 성공적인 trylockunlock으로 매칭되어야 합니다.

함수 trylockislocked와 결합하여 테스트-테스트-세트 또는 지수 백오프 알고리즘을 작성하는 데 사용할 수 있습니다 typeof(lock)에 의해 지원되는 경우 (문서를 참조하십시오).

source
Base.islockedFunction
islocked(lock) -> 상태 (부울)

lock이 어떤 작업/스레드에 의해 보유되고 있는지 확인합니다. 이 함수는 단독으로 동기화에 사용되어서는 안 됩니다. 그러나 islockedtrylock를 결합하면 typeof(lock)에 의해 지원되는 경우 테스트-테스트-세트 또는 지수 백오프 알고리즘을 작성하는 데 사용할 수 있습니다(문서를 참조하십시오).

확장 도움말

예를 들어, lock 구현이 아래에 문서화된 속성을 만족하는 경우 지수 백오프를 다음과 같이 구현할 수 있습니다.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

구현

잠금 구현은 다음 속성으로 islocked를 정의하고 이를 문서 문자열에 명시하는 것이 좋습니다.

  • islocked(lock)은 데이터 경합이 없습니다.
  • islocked(lock)false를 반환하면, 다른 작업의 간섭이 없을 경우 trylock(lock)의 즉각적인 호출이 성공해야 합니다( true를 반환).
source
Base.ReentrantLockType
ReentrantLock()

동기화를 위한 재진입 잠금을 생성합니다 Tasks. 동일한 작업은 필요에 따라 잠금을 여러 번 획득할 수 있습니다(이것이 이름의 "Reentrant" 부분이 의미하는 바입니다). 각 lockunlock과 쌍을 이루어야 합니다.

lock을 호출하면 해당 스레드에서 최종화기의 실행이 해당 unlock이 호출될 때까지 금지됩니다. 아래에 설명된 표준 잠금 패턴의 사용은 자연스럽게 지원되어야 하지만, 시도/잠금 순서를 뒤집거나 시도 블록을 완전히 누락하는 것(예: 잠금을 유지한 채로 반환하려고 시도하는 것)에 주의해야 합니다:

이것은 잠금/해제 호출에 대한 획득/해제 메모리 순서를 제공합니다.

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

!islocked(lck::ReentrantLock) 조건이 참이면, trylock(lck)는 "동시에" 잠금을 보유하려고 시도하는 다른 작업이 없는 한 성공합니다.

source
Base.@lockMacro
@lock l expr

lock(f, l::AbstractLock)의 매크로 버전이지만 f 함수 대신 expr을 사용합니다. 다음과 같이 확장됩니다:

lock(l)
try
    expr
finally
    unlock(l)
end

이는 do 블록과 함께 lock를 사용하는 것과 유사하지만 클로저를 생성하지 않으므로 성능을 향상시킬 수 있습니다.

Compat

@lock은 Julia 1.3에 추가되었으며, Julia 1.10에서 내보내졌습니다.

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

value를 감싸고 제공된 lock과 연결된 Lockable 객체를 생성합니다. 이 객체는 @lock, lock, trylock, unlock를 지원합니다. 값을 접근하려면 잠금을 유지하면서 lockable 객체의 인덱스를 사용해야 합니다.

Julia 1.11

최소 Julia 1.11이 필요합니다.

예제

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # 값을 접근하려면 잠금을 유지해야 합니다
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

T 유형의 최대 size 개체를 보유할 수 있는 내부 버퍼가 있는 Channel을 생성합니다. put! 호출은 채널이 가득 차면 take!로 개체가 제거될 때까지 차단됩니다.

Channel(0)은 버퍼가 없는 채널을 생성합니다. put!은 일치하는 take!가 호출될 때까지 차단됩니다. 그리고 그 반대도 마찬가지입니다.

기타 생성자:

  • Channel(): 기본 생성자로, Channel{Any}(0)와 동일합니다.
  • Channel(Inf): Channel{Any}(typemax(Int))와 동일합니다.
  • Channel(sz): Channel{Any}(sz)와 동일합니다.
Julia 1.3

기본 생성자 Channel()과 기본 size=0은 Julia 1.3에 추가되었습니다.

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

func에서 새 작업을 생성하고, 이를 크기 size와 유형 T의 새 채널에 바인딩한 후, 단일 호출로 작업을 예약합니다. 작업이 종료되면 채널이 자동으로 닫힙니다.

func는 바인딩된 채널을 유일한 인수로 받아야 합니다.

생성된 작업에 대한 참조가 필요하면 키워드 인수 taskref를 통해 Ref{Task} 객체를 전달하십시오.

spawn=true인 경우, func에 대해 생성된 Task는 다른 스레드에서 병렬로 예약될 수 있으며, 이는 Threads.@spawn를 통해 작업을 생성하는 것과 동등합니다.

spawn=true이고 threadpool 인수가 설정되지 않은 경우 기본값은 :default입니다.

threadpool 인수가 설정된 경우(:default 또는 :interactive), 이는 spawn=true를 의미하며 새 작업이 지정된 스레드 풀에 생성됩니다.

Channel을 반환합니다.

예제

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

생성된 작업 참조하기:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

spawn= 매개변수는 Julia 1.3에서 추가되었습니다. 이 생성자는 Julia 1.3에서 추가되었습니다. 이전 버전의 Julia에서는 채널이 키워드 인수를 사용하여 sizeT를 설정했지만, 이러한 생성자는 더 이상 사용되지 않습니다.

Julia 1.9

threadpool= 인수는 Julia 1.9에서 추가되었습니다.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

채널 c에 항목 v를 추가합니다. 채널이 가득 차면 블록됩니다.

버퍼가 없는 채널의 경우, 다른 작업에서 take!가 수행될 때까지 블록됩니다.

Julia 1.1

v는 이제 put!가 호출될 때 채널의 유형으로 convert를 통해 변환됩니다.

source
Base.take!Method
take!(c::Channel)

채널(Channel)에서 값을 제거하고 반환합니다. 데이터가 사용 가능할 때까지 블록됩니다. 버퍼가 없는 채널의 경우, 다른 작업이 put!를 수행할 때까지 블록됩니다.

예제

버퍼가 있는 채널:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

버퍼가 없는 채널:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

채널이 Channel에 값이 저장되어 있는지 여부를 결정합니다. 즉시 반환되며, 블록하지 않습니다.

버퍼가 없는 채널의 경우, put!를 기다리고 있는 작업이 있는 경우 true를 반환합니다.

예제

버퍼가 있는 채널:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

버퍼가 없는 채널:

julia> c = Channel();

julia> isready(c)  # 값을 넣으려고 대기 중인 작업이 없음!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # put! 작업을 스케줄링합니다.

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

Channel에서 첫 번째로 사용 가능한 항목을 기다리고 반환합니다(제거하지 않음). 참고: fetch는 버퍼가 없는(0 크기) Channel에서는 지원되지 않습니다.

예제

버퍼가 있는 채널:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # 항목이 제거되지 않음
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

채널을 닫습니다. 예외(excp로 선택적으로 제공됨)는 다음에 의해 발생합니다:

  • 닫힌 채널에서 put!.
  • 비어 있고 닫힌 채널에서 take!fetch.
source
Base.bindMethod
bind(chnl::Channel, task::Task)

chnl의 생명주기를 작업에 연결합니다. Channel chnl은 작업이 종료될 때 자동으로 닫힙니다. 작업에서 발생한 모든 uncaught 예외는 chnl의 모든 대기자에게 전파됩니다.

chnl 객체는 작업 종료와 관계없이 명시적으로 닫을 수 있습니다. 종료된 작업은 이미 닫힌 Channel 객체에 영향을 미치지 않습니다.

하나의 채널이 여러 작업에 바인딩될 때, 가장 먼저 종료된 작업이 채널을 닫습니다. 여러 채널이 동일한 작업에 바인딩될 때, 작업의 종료는 모든 바인딩된 채널을 닫습니다.

예제

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

schedule의 가장 쉬운 올바른 사용법은 아직 시작되지 않은 (예정된) Task에서 사용하는 것입니다. 그러나 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566wait를 동기화 인터페이스를 구성하는 매우 저수준의 빌딩 블록으로 사용할 수 있습니다. schedule(task)를 호출하기 위한 중요한 전제 조건은 호출자가 task를 "소유"해야 한다는 것입니다. 즉, 주어진 task에서 wait 호출이 schedule(task)를 호출하는 코드에서 알고 있는 위치에서 발생하고 있어야 합니다. 이러한 전제 조건을 보장하기 위한 한 가지 전략은 다음 예제에서 보여준 것처럼 원자성을 사용하는 것입니다.

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent는 한 작업이 다른 작업의 notifywait할 수 있게 해줍니다. 이는 wait가 단일 작업에서 한 번만 사용될 수 있기 때문에 제한된 통신 인터페이스입니다 (비원자적 할당인 ev.task에 유의하세요).

이 예제에서 notify(ev::OneWayEvent)그것이 상태를 OWE_WAITING에서 OWE_NOTIFYING으로 수정하는 경우에만 schedule(ev.task)를 호출할 수 있습니다. 이는 wait(ev::OneWayEvent)를 실행하는 작업이 이제 ok 분기에 있으며, 다른 작업이 schedule(ev.task)를 시도할 수 없음을 알려줍니다. 왜냐하면 그들의 @atomicreplace(ev.state, state => OWE_NOTIFYING)가 실패할 것이기 때문입니다.