Distributed Computing
Distributed
— ModuleDağıtık paralel işleme için araçlar.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> Süreç tanımlayıcılarının listesi
Belirtilen küme yöneticisi aracılığıyla işçi süreçlerini başlatır.
Örneğin, Beowulf kümeleri, ClusterManagers.jl
paketinde uygulanan özel bir küme yöneticisi aracılığıyla desteklenmektedir.
Yeni başlatılan bir işçinin ana makineden bağlantı kurulmasını beklediği saniye sayısı, işçi sürecinin ortamında JULIA_WORKER_TIMEOUT
değişkeni aracılığıyla belirtilebilir. Sadece TCP/IP taşıma kullanıldığında geçerlidir.
İşçileri REPL'yi veya işçileri programlı olarak başlatıyorsanız içindeki işlevi engellemeden başlatmak için, addprocs
'ı kendi görevinde çalıştırın.
Örnekler
# Meşgul kümelerde, `addprocs`'ı asenkron olarak çağırın
t = @async addprocs(...)
# İşçileri çevrimiçi olduklarında kullanın
if nprocs() > 1 # En az bir yeni işçi mevcut olduğundan emin olun
.... # dağıtılmış yürütme gerçekleştirin
end
# Yeni başlatılan işçi kimliklerini veya herhangi bir hata mesajını alın
if istaskdone(t) # `addprocs`'ın tamamlanıp tamamlanmadığını kontrol edin, böylece `fetch` engellenmez
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Süreç tanımlayıcılarının listesi
Uzak makinelerde SSH üzerinden işçi süreçleri ekleyin. Yapılandırma, anahtar argümanlarla yapılır (aşağıya bakınız). Özellikle, exename
anahtar kelimesi, uzak makine(ler)deki julia
ikili dosyasının yolunu belirtmek için kullanılabilir.
machines
, [user@]host[:port] [bind_addr[:port]]
biçiminde verilen "makine spesifikasyonları" dizisidir. user
, geçerli kullanıcıya ve port
standart SSH portuna varsayılan olarak ayarlanır. Eğer [bind_addr[:port]]
belirtilirse, diğer işçiler bu işçiye belirtilen bind_addr
ve port
üzerinden bağlanacaktır.
machines
dizisinde bir demet kullanarak veya (machine_spec, count)
biçiminde birden fazla süreci uzak bir ana bilgisayarda başlatmak mümkündür; burada count
, belirtilen ana bilgisayarda başlatılacak işçi sayısını belirtir. İşçi sayısı olarak :auto
geçmek, uzak ana bilgisayardaki CPU iş parçacığı sayısı kadar işçi başlatır.
Örnekler:
addprocs([
"remote1", # 'remote1' üzerinde geçerli kullanıcı adıyla bir işçi
"user@remote2", # 'user' kullanıcı adıyla 'remote2' üzerinde bir işçi
"user@remote3:2222", # 'remote3' için SSH portunu '2222' olarak belirtme
("user@remote4", 4), # 'remote4' üzerinde 4 işçi başlat
("user@remote5", :auto), # 'remote5' üzerinde CPU iş parçacığı sayısı kadar işçi başlat
])
Anahtar argümanlar:
tunnel
:true
ise, ana süreçten işçiye bağlanmak için SSH tünellemesi kullanılacaktır. Varsayılanfalse
'dır.multiplex
:true
ise, SSH tünellemesi için SSH çoklama kullanılır. Varsayılanfalse
'dır.ssh
: işçileri başlatmak için kullanılan SSH istemci yürütülebilir dosyasının adı veya yolu. Varsayılan"ssh"
'dir.sshflags
: ek ssh seçeneklerini belirtir, örneğinsshflags=`-i /home/foo/bar.pem`
max_parallel
: bir ana bilgisayara paralel olarak bağlanan maksimum işçi sayısını belirtir. Varsayılan 10'dur.shell
: işçilerin ssh ile bağlandığı kabuk türünü belirtir.shell=:posix
: POSIX uyumlu bir Unix/Linux kabuğu (sh, ksh, bash, dash, zsh, vb.). Varsayılan.shell=:csh
: Unix C kabuğu (csh, tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: işçilerin çalışma dizinini belirtir. Varsayılan, ana bilgisayarın geçerli dizinidir (pwd()
ile bulunur).enable_threaded_blas
:true
ise, eklenen süreçlerde BLAS çoklu iş parçacıklarında çalışacaktır. Varsayılanfalse
'dır.exename
:julia
yürütülebilir dosyasının adı. Varsayılan"$(Sys.BINDIR)/julia"
veya"$(Sys.BINDIR)/julia-debug"
'dır. Tüm uzak makinelerde ortak bir Julia sürümünün kullanılması önerilir, aksi takdirde serileştirme ve kod dağıtımı başarısız olabilir.exeflags
: işçi süreçlerine geçirilen ek bayraklar.topology
: İşçilerin birbirine nasıl bağlandığını belirtir. Bağlı olmayan işçiler arasında bir mesaj göndermek bir hataya neden olur.topology=:all_to_all
: Tüm süreçler birbirine bağlıdır. Varsayılan.topology=:master_worker
: Sadece sürücü süreci, yanipid
1 işçilere bağlanır. İşçiler birbirine bağlanmaz.topology=:custom
: Küme yöneticisininlaunch
yöntemi,WorkerConfig
içindekiident
veconnect_idents
alanları aracılığıyla bağlantı topolojisini belirtir. Bir küme yöneticisi kimliğiident
olan bir işçi,connect_idents
içinde belirtilen tüm işçilere bağlanacaktır.
lazy
: Sadecetopology=:all_to_all
ile uygulanabilir.true
ise, işçi-işçi bağlantıları tembel bir şekilde kurulur, yani işçiler arasında bir uzak çağrının ilk örneğinde kurulur. Varsayılantrue
'dur.env
:env=["JULIA_DEPOT_PATH"=>"/depot"]
gibi dize çiftleri içeren bir dizi sağlayarak, çevre değişkenlerinin uzak makinede ayarlanmasını talep eder. Varsayılan olarak yalnızcaJULIA_WORKER_TIMEOUT
çevre değişkeni otomatik olarak yerel ortamdan uzak ortama geçirilir.cmdline_cookie
: kimlik doğrulama çerezini--worker
komut satırı seçeneği aracılığıyla geçin. Çerezi ssh stdio aracılığıyla geçmenin (daha güvenli) varsayılan davranışı, daha eski (pre-ConPTY) Julia veya Windows sürümleri kullanan Windows işçileriyle takılabilir; bu durumdacmdline_cookie=true
bir geçici çözüm sunar.
ssh
, shell
, env
ve cmdline_cookie
anahtar argümanları Julia 1.6'da eklendi.
Çevre değişkenleri:
Ana süreç, yeni başlatılan bir işçi ile 60.0 saniye içinde bağlantı kurmayı başaramazsa, işçi bunu ölümcül bir durum olarak değerlendirir ve sonlandırır. Bu zaman aşımı, JULIA_WORKER_TIMEOUT
çevre değişkeni aracılığıyla kontrol edilebilir. Ana süreçteki JULIA_WORKER_TIMEOUT
değeri, yeni başlatılan bir işçinin bağlantı kurulmasını beklediği saniye sayısını belirtir.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Süreç tanımlayıcılarının listesi
Yerel ana bilgisayarda np
işçilerini yerleşik LocalManager
kullanarak başlatın.
Yerel işçiler, ana süreçten mevcut paket ortamını (yani, aktif proje, LOAD_PATH
ve DEPOT_PATH
) miras alır.
!!! uyarı İşçilerin ~/.julia/config/startup.jl
başlangıç betiğini çalıştırmadığını ve diğer çalışan süreçlerle küresel durumlarını (komut satırı anahtarları, küresel değişkenler, yeni yöntem tanımları ve yüklenen modüller gibi) senkronize etmediğini unutmayın.
Anahtar argümanlar:
restrict::Bool
:true
(varsayılan) ise, bağlama127.0.0.1
ile sınırlıdır.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
:SSHManager
için olduğu gibi aynı etki,addprocs(machines::AbstractVector)
belgelerine bakın.
Paket ortamının miras alınması ve env
anahtar argümanı Julia 1.9'da eklendi.
Distributed.nprocs
— Functionnprocs()
Mevcut işlem sayısını alır.
Örnekler
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
Mevcut işçi süreçlerinin sayısını alır. Bu, nprocs()
değerinden bir eksiktir. nprocs() == 1
ise nprocs()
ile eşittir.
Örnekler
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
Tüm işlem tanımlayıcılarının bir listesini döndürür, pid 1 dahil (bu workers()
tarafından dahil edilmez).
Örnekler
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
Aynı fiziksel düğümdeki tüm işlem tanımlayıcılarının bir listesini döndürür. Özellikle, pid
ile aynı ip adresine bağlı olan tüm işçiler döndürülür.
Distributed.workers
— Functionworkers()
Tüm işçi işlem tanımlayıcılarının bir listesini döndürür.
Örnekler
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
Belirtilen işçileri kaldırın. Sadece işlem 1'in işçi ekleyip kaldırabileceğini unutmayın.
waitfor
argümanı, işçilerin kapanması için ne kadar süre bekleyeceğinizi belirtir:
- Belirtilmemişse,
rmprocs
istenen tümpids
kaldırılana kadar bekleyecektir. - İstenilen
waitfor
saniyesinden önce tüm işçiler sonlandırılamazsa birErrorException
hatası oluşur. waitfor
değeri 0 olduğunda, çağrı hemen döner ve işçilerin kaldırılması farklı bir görevde planlanır. PlanlananTask
nesnesi döner. Kullanıcı, başka herhangi bir paralel çağrı yapmadan önce görev üzerindewait
çağrısı yapmalıdır.
Örnekler
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
Belirtilen işçilerde mevcut çalışan görevi kes. Bu, yerel makinede Ctrl-C tuşuna basmaya eşdeğerdir. Hiçbir argüman verilmezse, tüm işçiler kesilir.
interrupt(pids::AbstractVector=workers())
Belirtilen işçilerde mevcut çalışan görevi kes. Bu, yerel makinede Ctrl-C tuşlamaya eşdeğerdir. Hiçbir argüman verilmezse, tüm işçiler kesilir.
Distributed.myid
— Functionmyid()
Mevcut işlemin kimliğini alır.
Örnekler
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Koleksiyonu c
'yi, mevcut işçiler ve görevler kullanarak her bir elemana f
uygulayarak dönüştürün.
Birden fazla koleksiyon argümanı için, f
'yi eleman bazında uygulayın.
f
'nin tüm işçi süreçlerine erişilebilir olması gerektiğini unutmayın; detaylar için Kod Erişilebilirliği ve Paket Yükleme bölümüne bakın.
Bir işçi havuzu belirtilmezse, tüm mevcut işçiler CachingPool
aracılığıyla kullanılacaktır.
Varsayılan olarak, pmap
hesaplamayı belirtilen tüm işçilere dağıtır. Sadece yerel süreci kullanmak ve görevler üzerinden dağıtmak için distributed=false
belirtin. Bu, asyncmap
kullanmakla eşdeğerdir. Örneğin, pmap(f, c; distributed=false)
ifadesi asyncmap(f,c; ntasks=()->nworkers())
ile eşdeğerdir.
pmap
, batch_size
argümanı aracılığıyla süreçler ve görevlerin bir karışımını da kullanabilir. 1'den büyük grup boyutları için, koleksiyon birden fazla grup halinde işlenir; her grup batch_size
veya daha az uzunluktadır. Bir grup, serbest bir işçiye tek bir istek olarak gönderilir; burada yerel bir asyncmap
, gruptan elemanları birden fazla eşzamanlı görev kullanarak işler.
Herhangi bir hata, pmap
'ın koleksiyonun geri kalanını işlemesini durdurur. Bu davranışı geçersiz kılmak için, tek bir argüman alan bir hata işleme fonksiyonu on_error
argümanı aracılığıyla belirtilebilir; yani, istisna. Fonksiyon, hatayı yeniden fırlatarak işlemi durdurabilir veya devam etmek için, sonuçlarla birlikte çağırana döndürülen herhangi bir değeri döndürebilir.
Aşağıdaki iki örneği göz önünde bulundurun. İlk örnek, istisna nesnesini satır içi olarak döndürür, ikincisi ise herhangi bir istisna yerine 0 döndürür:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Hatalar, başarısız hesaplamaları yeniden deneyerek de işlenebilir. Anahtar kelime argümanları retry_delays
ve retry_check
, retry
fonksiyonuna delays
ve check
anahtar kelime argümanları olarak geçirilir. Gruplama belirtilirse ve bir grup tamamen başarısız olursa, gruptaki tüm öğeler yeniden denenir.
on_error
ve retry_delays
belirtilirse, on_error
kancası yeniden denemeden önce çağrılır. Eğer on_error
bir istisna fırlatmazsa (veya yeniden fırlatmazsa), eleman yeniden denenmeyecektir.
Örnek: Hatalarda, bir eleman üzerinde f
'yi maksimum 3 kez, yeniden denemeler arasında herhangi bir gecikme olmadan yeniden deneyin.
pmap(f, c; retry_delays = zeros(3))
Örnek: İstisna InexactError
türünde değilse f
'yi yeniden deneyin, 3 kez kadar artan gecikmelerle. Tüm InexactError
oluşumları için bir NaN
döndürün.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
Uzak hesaplamalarda meydana gelen istisnalar yakalanır ve yerel olarak yeniden fırlatılır. Bir RemoteException
, işçi pid
'sini ve yakalanan bir istisnayı sarar. Bir CapturedException
, uzak istisnayı ve istisnanın meydana geldiği anda çağrı yığınının seri hale getirilebilir bir biçimini yakalar.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
Bir istemci Julia süreci kapandıktan sonra, ölü çocuğa atıfta bulunma girişimleri bu istisnayı fırlatacaktır.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Bir Future
, bilinmeyen bir sonlanma durumu ve zamanı olan tek bir hesaplama için bir yer tutucudur. Birden fazla potansiyel hesaplama için RemoteChannel
'a bakın. Bir AbstractRemoteRef
'i tanımlamak için remoteref_id
'ye bakın.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
Bir Channel{Any}(1)
referansı oluşturur, pid
sürecinde. Varsayılan pid
, mevcut süreçtir.
RemoteChannel(f::Function, pid::Integer=myid())
Belirli bir boyut ve türdeki uzak kanallara referanslar oluşturur. f
, pid
üzerinde çalıştırıldığında bir AbstractChannel
uygulaması döndürmelidir.
Örneğin, RemoteChannel(()->Channel{Int}(10), pid)
, pid
üzerinde Int
türünde ve boyutu 10 olan bir kanala referans döndürecektir.
Varsayılan pid
, mevcut süreçtir.
Base.fetch
— Methodfetch(x::Future)
Bir Future
değerini bekleyin ve alın. Alınan değer yerel olarak önbelleğe alınır. Aynı referans üzerindeki fetch
çağrıları önbelleğe alınan değeri döndürür. Uzak değer bir istisna ise, uzak istisnayı ve geri izlemeyi yakalayan bir RemoteException
fırlatır.
Base.fetch
— Methodfetch(c::RemoteChannel)
Bir RemoteChannel
üzerinden bir değer almak için bekleyin. Yükseltilen istisnalar, bir Future
için olanlarla aynıdır. Alınan öğeyi kaldırmaz.
fetch(x::Any)
x
değerini döndür.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
Verilen argümanlar ile belirtilen süreçte bir f
fonksiyonunu asenkron olarak çağırır. Bir Future
döner. Varsa anahtar kelime argümanları f
'ye iletilir.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
Belirtilen Worker
üzerinde id
işçi kimliği ile tek bir mesajda daha hızlı bir wait(remotecall(...))
gerçekleştirin. Anahtar kelime argümanları, varsa, f
'ye iletilir.
Ayrıca bkz. wait
ve remotecall
.
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
fetch(remotecall(...))
işlemini tek bir mesajda gerçekleştirin. Anahtar kelime argümanları, varsa, f
'ye iletilir. Herhangi bir uzak istisna, RemoteException
içinde yakalanır ve fırlatılır.
Ayrıca fetch
ve remotecall
ile de bakabilirsiniz.
Örnekler
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
HATA: İşçi 2'de:
DomainError with -4.0:
sqrt, negatif bir reel argüman ile çağrıldığında yalnızca karmaşık bir argüman ile çağrıldığında karmaşık bir sonuç döndürecektir. sqrt(Complex(x)) denemeyi deneyin.
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
f
'yi işçi id
üzerinde asenkron olarak çalıştırır. remotecall
ile karşılaştırıldığında, hesaplamanın sonucunu saklamaz ve tamamlanmasını beklemenin bir yolu yoktur.
Başarılı bir çağrı, isteğin uzak düğümde yürütülmek üzere kabul edildiğini gösterir.
Aynı işçiye yapılan ardışık remotecall
'lar, çağrıldıkları sırayla sıralanırken, uzak işçi üzerindeki yürütme sırası belirsizdir. Örneğin, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
çağrısı f1
'e yapılan çağrıyı sıralar, ardından f2
ve f3
'ü bu sırayla sıralar. Ancak, f1
'in işçi 2 üzerinde f3
'ten önce yürütüleceği garanti edilmez.
f
tarafından fırlatılan herhangi bir istisna, uzak işçi üzerinde stderr
üzerine yazdırılır.
Anahtar argümanlar, varsa, f
'ye iletilir.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
Bir dizi değeri RemoteChannel
içine kaydedin. Kanal doluysa, alan mevcut olana kadar bekler. İlk argümanı döndürür.
Base.put!
— Methodput!(rr::Future, v)
Bir değeri Future
rr
'ye kaydedin. Future
'lar bir kez yazılabilen uzak referanslardır. Zaten ayarlanmış bir Future
üzerinde put!
çağrısı bir Exception
fırlatır. Tüm asenkron uzak çağrılar Future
döndürür ve tamamlandığında değeri çağrının dönüş değeri olarak ayarlar.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
Bir RemoteChannel
rr
'den değer(ler) alır ve bu süreçte değer(ler)i kaldırır.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
Bir RemoteChannel
içinde bir değer olup olmadığını belirler. Bu işlevin yarış koşullarına neden olabileceğini unutmayın, çünkü sonucu aldığınızda artık doğru olmayabilir. Ancak, yalnızca bir kez atandıkları için Future
üzerinde güvenle kullanılabilir.
Base.isready
— Methodisready(rr::Future)
Bir Future
değerinin depolandığını belirleyin.
Eğer Future
argümanı farklı bir düğüm tarafından sahipleniliyorsa, bu çağrı cevabı beklemek için engellenecektir. Bunun yerine rr
'yi ayrı bir görevde beklemek veya yerel bir Channel
kullanarak bir vekil olarak kullanmak önerilir:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # engellemeyecek
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
WorkerPool
ve CachingPool
gibi işçi havuzları için süper tip. Bir AbstractWorkerPool
şunları uygulamalıdır:
push!
- genel havuza (mevcut + meşgul) yeni bir işçi ekleput!
- bir işçiyi mevcut havuza geri koytake!
- mevcut havuzdan bir işçi al (uzaktan fonksiyon yürütmek için kullanılacak)length
- genel havuzda mevcut olan işçi sayısıisready
- havuzda birtake!
işleminin engelleneceği durumda false, aksi takdirde true döndür
Yukarıdakilerin varsayılan uygulamaları (bir AbstractWorkerPool
üzerinde) aşağıdaki alanları gerektirir:
channel::Channel{Int}
workers::Set{Int}
burada channel
serbest işçi pid'lerini ve workers
bu havuzla ilişkili tüm işçilerin kümesini içerir.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Bir işçi kimlikleri vektörü veya aralığından WorkerPool
oluşturun.
Örnekler
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
AbstractWorkerPool
'un bir uygulaması. remote
, remotecall_fetch
, pmap
(ve uzaktan işlevleri uzaktan çalıştıran diğer çağrılar) işçi düğümlerinde serileştirilmiş/serileştirilmemiş işlevlerin önbelleğe alınmasından faydalanır, özellikle büyük miktarda veri yakalayabilen kapanışlar için.
Uzaktan önbellek, döndürülen CachingPool
nesnesinin ömrü boyunca korunur. Önbelleği daha erken temizlemek için clear!(pool)
kullanın.
Küresel değişkenler için, yalnızca bağlamalar bir kapanışta yakalanır, veri değil. Küresel verileri yakalamak için let
blokları kullanılabilir.
Örnekler
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
Yukarıdaki kod, foo
'yu her işçiye yalnızca bir kez aktarır.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
boşta olan workers
içeren - remote(f)
ve pmap
(varsayılan olarak) tarafından kullanılır. default_worker_pool!(pool)
ile açıkça ayarlanmamışsa, varsayılan işçi havuzu bir WorkerPool
olarak başlatılır.
Örnekler
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
Modüllerdeki global bağlamaları nothing
ile başlatarak temizler. syms
Symbol
türünde veya Symbol
koleksiyonu olmalıdır. pids
ve mod
, global değişkenlerin yeniden başlatılacağı süreçleri ve modülü tanımlar. Sadece mod
altında tanımlı olan isimler temizlenir.
Bir global sabitin temizlenmesi istendiğinde bir istisna oluşur.
clear!(pool::CachingPool) -> pool
Tüm katılımcı işçilerden tüm önbelleğe alınmış fonksiyonları kaldırır.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
Mevcut bir işçi üzerinde (varsa WorkerPool
p
'den alınan) f
fonksiyonunu çalıştıran anonim bir fonksiyon döndürür. remotecall_fetch
kullanarak.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
varyantı remotecall(f, pid, ....)
. pool
'dan boş bir işçi bekleyin ve üzerinde remotecall
gerçekleştirin.
Örnekler
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
Bu örnekte, görev pid 2 üzerinde çalıştı, pid 1'den çağrıldı. ```
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
varyantı remotecall_wait(f, pid, ....)
. pool
'dan boş bir işçi bekleyin ve ona remotecall_wait
gerçekleştirin.
Örnekler
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> sonuç
WorkerPool
varyantı remotecall_fetch(f, pid, ....)
. pool
'dan boş bir işçi bekler ve alır ve üzerinde remotecall_fetch
gerçekleştirir.
Örnekler
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
varyantı remote_do(f, pid, ....)
'dir. pool
'dan boş bir işçi bekler ve alır ve üzerinde bir remote_do
gerçekleştirir.
Distributed.@spawn
— Macro@spawn expr
Bir ifadeyi saran bir closure oluşturur ve bunu otomatik olarak seçilen bir süreçte çalıştırarak sonuca bir Future
döner. Bu makro kullanımdan kaldırılmıştır; bunun yerine @spawnat :any expr
kullanılmalıdır.
Örnekler
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Julia 1.3 itibarıyla bu makro kullanımdan kaldırılmıştır. Bunun yerine @spawnat :any
kullanın.
Distributed.@spawnat
— Macro@spawnat p expr
Bir ifadeyi sarmalayan bir closure oluşturun ve closure'ı işlem p
üzerinde asenkron olarak çalıştırın. Sonuç için bir Future
döndürün. Eğer p
alıntılanmış sabit sembol :any
ise, sistem otomatik olarak kullanılacak bir işlemci seçecektir.
Örnekler
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
:any
argümanı Julia 1.3 itibarıyla mevcuttur.
Distributed.@fetch
— Macro@fetch expr
fetch(@spawnat :any expr)
ile eşdeğerdir. fetch
ve @spawnat
bakınız.
Örnekler
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
fetch(@spawnat p expr)
ile eşdeğerdir. fetch
ve @spawnat
bakınız.
Örnekler
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
Dağıtık bellek, aşağıdaki biçimde paralel bir for döngüsü:
@distributed [reducer] for var = range
body
end
Belirtilen aralık, tüm işçiler arasında bölünür ve yerel olarak yürütülür. Opsiyonel bir reducer fonksiyonu belirtilmişse, @distributed
her işçi üzerinde yerel azaltmalar yapar ve çağıran süreçte son bir azaltma gerçekleştirir.
Bir azaltıcı fonksiyonu olmadan, @distributed
asenkron olarak çalışır, yani tüm mevcut işçilerde bağımsız görevler başlatır ve tamamlanmayı beklemeden hemen döner. Tamamlanmayı beklemek için, çağrıyı @sync
ile ön ekleyin, şöyle:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
Tüm procs
üzerinde Main
altında bir ifadeyi çalıştırın. Herhangi bir süreçteki hatalar, bir CompositeException
içinde toplanır ve fırlatılır. Örneğin:
@everywhere bar = 1
tüm mevcut süreçlerde Main.bar
tanımlayacaktır. Daha sonra eklenen süreçler (örneğin addprocs()
ile) ifadenin tanımlı olmayacaktır.
@spawnat
ile karşılaştırıldığında, @everywhere
herhangi bir yerel değişkeni yakalamaz. Bunun yerine, yerel değişkenler interpolasyon kullanılarak yayılabilir:
foo = 1
@everywhere bar = $foo
İsteğe bağlı procs
argümanı, ifadenin çalıştırılacağı tüm süreçlerin bir alt kümesini belirtmeye olanak tanır.
remotecall_eval(Main, procs, expr)
çağrısına benzer, ancak iki ek özelliği vardır:
- `using` ve `import` ifadeleri önce çağıran süreçte çalıştırılır, böylece
paketlerin önceden derlenmesi sağlanır.
- `include` tarafından kullanılan mevcut kaynak dosya yolu diğer süreçlere iletilir.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
lar ve RemoteChannel
lar alanlarla tanımlanır:
where
- referansla belirtilen temel nesnenin/depolamanın gerçekten bulunduğu düğümü ifade eder.whence
- uzak referansın oluşturulduğu düğümü ifade eder. Bu, referansla belirtilen temel nesnenin gerçekten bulunduğu düğümden farklıdır. Örneğin, ana süreçtenRemoteChannel(2)
çağrıldığındawhere
değeri 2 vewhence
değeri 1 olur.id
,whence
ile belirtilen işçi tarafından oluşturulan tüm referanslar arasında benzersizdir.
Bir araya getirildiğinde, whence
ve id
tüm işçiler arasında bir referansı benzersiz şekilde tanımlar.
remoteref_id
, bir uzak referansın whence
ve id
değerlerini saran bir RRID
nesnesi döndüren düşük seviyeli bir API'dir.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
Bir id
için arka planda bulunan AbstractChannel
'ı döndüren düşük seviyeli bir API. Çağrı, yalnızca arka planda bulunan kanalın bulunduğu düğümde geçerlidir.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
Bir IO
bağlantısı veya bir Worker
verildiğinde, bağlı olduğu işçinin pid
'sini döndüren düşük seviyeli bir API. Bu, bir tür için özel serialize
yöntemleri yazarken faydalıdır; bu, yazılan verinin alıcı işlem kimliğine bağlı olarak optimize edilmesini sağlar.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
Küme çerezini döndür.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
Geçerli çerezi küme çerezi olarak ayarlar, ardından onu döndürür.
Cluster Manager Interface
Bu arayüz, farklı küme ortamlarında Julia işçilerini başlatmak ve yönetmek için bir mekanizma sağlar. Temel içinde iki tür yönetici bulunmaktadır: LocalManager
, aynı ana makinede ek işçiler başlatmak için ve SSHManager
, uzaktaki ana makinelerde ssh
aracılığıyla başlatmak için. İşlemler arasında bağlantı kurmak ve mesajları taşımak için TCP/IP soketleri kullanılır. Küme Yöneticilerinin farklı bir taşıma sağlaması mümkündür.
Distributed.ClusterManager
— TypeClusterManager
Küme yöneticileri için süper tip, işçi süreçlerini bir küme olarak kontrol eder. Küme yöneticileri, işçilerin nasıl ekleneceğini, çıkarılacağını ve iletişim kurulacağını uygular. SSHManager
ve LocalManager
bunun alt türleridir.
Distributed.WorkerConfig
— TypeWorkerConfig
ClusterManager
tarafından kümelerine eklenen işçileri kontrol etmek için kullanılan tür. Tüm küme yöneticileri tarafından bir ana makineye erişmek için kullanılan bazı alanlar:
io
– işçiye erişmek için kullanılan bağlantı (birIO
alt türü veyaNothing
)host
– ana makine adresi (ya birString
ya daNothing
)port
– işçiye bağlanmak için ana makinedeki port (ya birInt
ya daNothing
)
Bazıları, zaten başlatılmış bir ana makineye işçi eklemek için küme yöneticisi tarafından kullanılır:
count
– ana makinede başlatılacak işçi sayısıexename
– ana makinedeki Julia yürütülebilir dosyasının yolu, varsayılan olarak"$(Sys.BINDIR)/julia"
veya"$(Sys.BINDIR)/julia-debug"
exeflags
– Julia'yı uzaktan başlatırken kullanılacak bayraklar
userdata
alanı, dış yöneticiler tarafından her işçi için bilgi depolamak için kullanılır.
Bazı alanlar SSHManager
ve benzeri yöneticiler tarafından kullanılır:
tunnel
–true
(tünelleme kullan),false
(tünelleme kullanma) veyanothing
(yönetici için varsayılanı kullan)multiplex
–true
(tünelleme için SSH çoklama kullan) veyafalse
forward
– ssh'nin-L
seçeneği için kullanılan yönlendirme seçeneğibind_addr
– uzaktaki ana makinede bağlanacak adressshflags
– SSH bağlantısını kurarken kullanılacak bayraklarmax_parallel
– ana makinede paralel olarak bağlanacak maksimum işçi sayısı
Bazı alanlar hem LocalManager
hem de SSHManager
tarafından kullanılır:
connect_at
– bunun işçi-işçi veya sürücü-işçi kurulum çağrısı olup olmadığını belirlerprocess
– bağlanacak işlem (genellikle yönetici bunuaddprocs
sırasında atar)ospid
– ana makine işletim sistemine göre işlem kimliği, işçi süreçlerini kesmek için kullanılırenviron
– Local/SSH yöneticileri tarafından geçici bilgileri depolamak için kullanılan özel sözlükident
–ClusterManager
tarafından tanımlanan işçiconnect_idents
– işçinin özel bir topoloji kullanıyorsa bağlanması gereken işçi kimlikleri listesienable_threaded_blas
– işçilerde iş parçacıklı BLAS kullanılıp kullanılmayacağı,true
,false
veyanothing
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Küme yöneticileri tarafından uygulanır. Bu işlev tarafından başlatılan her Julia işçisi için, launched
dizisine bir WorkerConfig
girişi eklemeli ve launch_ntfy
'yi bildirmelidir. İşlev, manager
tarafından talep edilen tüm işçilerin başlatılmasıyla birlikte çıkmalıdır. params
, addprocs
ile çağrılan tüm anahtar kelime argümanlarının bir sözlüğüdür.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Küme yöneticileri tarafından uygulanır. Bir işçi süresi boyunca ana süreçte, uygun op
değerleri ile çağrılır:
- Bir işçi Julia işçi havuzuna eklendiğinde / kaldırıldığında
:register
/:deregister
ile. interrupt(workers)
çağrıldığında:interrupt
ile.ClusterManager
, uygun işçiye bir kesme sinyali göndermelidir.- Temizlik amaçları için
:finalize
ile.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Küme yöneticileri tarafından uygulanır. Ana süreçte, rmprocs
tarafından çağrılır. pid
ile belirtilen uzak işçinin çıkmasını sağlamalıdır. kill(manager::ClusterManager.....)
pid
üzerinde uzak bir exit()
çalıştırır.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Özel taşımalar kullanan küme yöneticileri tarafından uygulanmıştır. config
tarafından belirtilen pid
kimliğine sahip işçiye mantıksal bir bağlantı kurmalıdır ve bir çift IO
nesnesi döndürmelidir. pid
'den mevcut işleme gelen mesajlar instrm
üzerinden okunacakken, pid
'ye gönderilecek mesajlar outstrm
'ye yazılacaktır. Özel taşımacılık uygulaması, mesajların tamamen ve sıralı bir şekilde teslim edilmesini ve alınmasını sağlamalıdır. connect(manager::ClusterManager.....)
işçiler arasında TCP/IP soket bağlantıları kurar.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Özel taşımaları uygulayan küme yöneticileri tarafından çağrılır. Yeni başlatılan bir süreci işçi olarak başlatır. Komut satırı argümanı --worker[=<cookie>]
, bir süreci işçi olarak başlatma etkisine sahiptir ve taşımada TCP/IP soketlerini kullanır. cookie
, bir cluster_cookie
dir.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
, TCP/IP üzerinden bağlanan işçi süreçleri için varsayılan giriş noktası olan bir iç işlevdir. Süreci bir Julia küme işçisi olarak ayarlar.
host:port bilgisi out
akışına (varsayılan olarak stdout) yazılır.
Fonksiyon, gerekirse stdin'den çerezi okur ve boş bir portta (veya belirtilmişse, --bind-to
komut satırı seçeneğindeki portta) dinler ve gelen TCP bağlantılarını ve isteklerini işlemek için görevleri planlar. Ayrıca (isteğe bağlı olarak) stdin'i kapatır ve stderr'yi stdout'a yönlendirir.
Hiçbir şey döndürmez.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Özel taşıma yöntemleri kullanan küme yöneticileri tarafından çağrılır. Uzak bir işçiden ilk mesaj alındığında çağrılmalıdır. Özel taşıma yöntemi, uzak işçiye mantıksal bir bağlantıyı yönetmeli ve biri gelen mesajlar için diğeri uzak işçiye adreslenmiş mesajlar için olmak üzere iki IO
nesnesi sağlamalıdır. Eğer incoming
true
ise, uzak eş bağlantıyı başlatmıştır. Bağlantıyı başlatan çiftin hangisi olursa olsun, küme çerezini ve Julia sürüm numarasını kimlik doğrulama el sıkışması gerçekleştirmek için gönderir.
Ayrıca bkz. cluster_cookie
.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Küme yöneticileri tarafından uygulanmıştır. addprocs(mgr)
çağrıldığında geçirilen varsayılan anahtar kelime parametreleri. Minimum seçenek seti default_addprocs_params()
çağrılarak mevcuttur.