apeescape2.com
  • メイン
  • 製品ライフサイクル
  • バックエンド
  • 収益性と効率性
  • プロジェクト管理
バックエンド

並行プログラミング入門:ビギナーズガイド

並行プログラミングとは何ですか?簡単に言えば、それはあなたが同時に複数のことをしているときです。並列処理と混同しないように、同時実行とは、複数の操作シーケンスが重複する期間に実行される場合です。プログラミングの分野では、並行性はかなり複雑な問題です。スレッドやロックなどの構造を処理し、競合状態やデッドロックなどの問題を回避することは非常に面倒であり、並行プログラムの作成が困難になる可能性があります。並行性により、プログラムは、特定の構成で連携して動作する独立したプロセスとして設計できます。このような構造は、並列化される場合とされない場合があります。ただし、プログラムでこのような構造を実現すると、多くの利点があります。

並行プログラミング入門

この記事では、並行性のさまざまなモデルと、さまざまなプログラミング言語でそれらを実現する方法について説明します。 並行性のために設計された 。



共有可変状態モデル

カウンターとそれを増やす2つのスレッドを使用した簡単な例を見てみましょう。プログラムはそれほど複雑であってはなりません。メソッドの増加に伴って増加するカウンターを含むオブジェクトがあり、メソッドgetとそれを増加させる2つのスレッドでそれを取得します。

// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x <500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

この素朴なプログラムは、一見したほど素朴ではありません。このプログラムを何度も実行すると、異なる結果が得られます。私のラップトップで3回実行した後、3つの値があります。

java Counting 553706 java Counting 547818 java Counting 613014

これの理由は何ですか 予測できない動作 ?プログラムは、コマンドcounter ++を使用するメソッドincreaseで、カウンターを1か所で増やします。コマンドバイトコードを見ると、いくつかの部分で構成されていることがわかります。

  1. メモリからカウンター値を読み取る
  2. ローカルで価値を高める
  3. カウンタ値をメモリに保存する

これで、このシーケンスで何がうまくいかないかを想像できます。独立してカウンターを増やす2つのスレッドがある場合、次のシナリオが考えられます。

  1. カウンターバリューは115
  2. 最初のスレッドは、メモリからカウンタの値を読み取ります(115)
  3. 最初のスレッドはローカルカウンター値を増やします(116)
  4. 2番目のスレッドはメモリからカウンタの値を読み取ります(115)
  5. 2番目のスレッドはローカルカウンター値を増やします(116)
  6. 2番目のスレッドはローカルカウンター値をメモリに保存します(116)
  7. 最初のスレッドはローカルカウンター値をメモリに保存します(116)
  8. カウンターの値は116です

このシナリオでは、2つのスレッドが絡み合ってカウンター値が1増加しますが、各スレッドが1ずつ増加するため、カウンター値を2増やす必要があります。絡み合う異なるスレッドがプログラムの結果に影響します。プログラムの予測不可能性の理由は、プログラムがスレッドの絡み合いを制御できないが、オペレーティングシステムを制御できないためです。プログラムが実行されるたびに、スレッドは異なる方法で絡み合う可能性があります。このようにして、プログラムに偶発的な予測不可能性(非決定論)を導入しました。

この偶発的な予測不可能性(非決定論)を修正するには、プログラムがスレッドの絡み合いを制御する必要があります。 1つのスレッドがメソッドに含まれている場合、最初のスレッドが出てくるまで、別のスレッドが同じメソッドに含まれていてはなりません。このようにして、メソッド増加へのアクセスをシリアル化します。

// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i <500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

別の解決策は、アトミックに増加できるカウンターを使用することです。つまり、操作を複数の操作に分割することはできません。このように、同期する必要のあるコードのブロックを用意する必要はありません。 Javaのjava.util.concurrent.atomic名前空間にはアトミックデータ型があり、AtomicIntegerを使用します。

// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i <500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

アトミック整数には必要な操作があるため、Counterクラスの代わりに使用できます。アトミック整数のすべてのメソッドがロックを使用しないため、プログラムの設計を容易にするデッドロックの可能性がないことに注意するのは興味深いことです。

使用する 同期 重要なメソッドを同期するためのキーワードは、すべての問題を解決するはずですよね?別の口座への入金、引き出し、送金が可能な2つの口座があると想像してみてください。同時に、ある口座から別の口座に、またはその逆に送金したい場合はどうなりますか?例を見てみましょう。

// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i <100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println('Bob's balance: ' + bob.getBalance()); System.out.println('Joe's balance: ' + joe.getBalance()); } }

このプログラムをラップトップで実行すると、通常はスタックします。なぜこれが起こるのですか?よく見ると、送金するときに、同期された転送メソッドに入り、ソースアカウントのすべての同期されたメソッドへのアクセスがロックされ、次に宛先アカウントがロックされて、同期されたすべてのメソッドへのアクセスがロックされていることがわかります。

請負業者からフルタイムの給与への変換

次のシナリオを想像してみてください。

  1. 最初のスレッド呼び出しは、ボブのアカウントからジョーのアカウントに転送されます
  2. 2番目のスレッド呼び出しは、ジョーのアカウントからボブのアカウントへの転送を呼び出します
  3. 2番目のスレッドはジョーのアカウントからの金額を減らします
  4. 2番目のスレッドはBobのアカウントに金額を入金しますが、最初のスレッドが転送を完了するのを待ちます。
  5. 最初のスレッドはボブのアカウントからの金額を減らします
  6. 最初のスレッドはJoeのアカウントに金額を入金しますが、2番目のスレッドが転送を完了するのを待ちます。

このシナリオでは、あるスレッドが別のスレッドの転送が完了するのを待っています。その逆も同様です。それらは互いにスタックしていて、プログラムを続行できません。これは呼ばれます デッドロック 。デッドロックを回避するには、同じ順序でアカウントをロックする必要があります。プログラムを修正するために、各アカウントに一意の番号を付けて、送金時に同じ順序でアカウントをロックできるようにします。

// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order = amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i <100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println('Bob's balance: ' + bob.getBalance()); System.out.println('Joe's balance: ' + joe.getBalance()); } }

そのような間違いの予測不可能性のために、それらは時々起こりますが、常にではなく、再現するのは困難です。プログラムが予期しない動作をする場合、それは通常、偶発的な非決定論をもたらす並行性によって引き起こされます。偶発的な非決定論を回避するために、すべての絡み合いを考慮に入れるように事前にプログラムを設計する必要があります。

偶発的な非決定論を持つプログラムの例。

// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = 'Empty'; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = 'Fast'; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = 'Slow'; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

このプログラムには、偶発的な非決定論が含まれています。コンテナに最後に入力された値が表示されます。

java NonDeterminism Slow

遅いスレッドは後で値を入力し、この値が出力されます(遅い)。しかし、そうである必要はありません。コンピュータが大量のCPUリソースを必要とする別のプログラムを同時に実行した場合はどうなりますか?プログラムではなくオペレーティングシステムによって制御されるため、最後に値を入力するのが遅いスレッドになるという保証はありません。プログラムが一方のコンピューターで動作し、もう一方のコンピューターで動作が異なる状況が発生する可能性があります。このようなエラーは見つけるのが難しく、開発者にとって頭痛の種になります。これらすべての理由から、この並行性モデルを正しく実行することは非常に困難です。

機能的な方法

並列処理

関数型言語が使用している別のモデルを見てみましょう。たとえば、 Clojure 、ツールを使用して解釈できます ライニンゲン 。 Clojureは、並行性を適切にサポートする非常に興味深い言語です。以前の同時実行モデルは、共有された可変状態でした。私たちが使用するクラスは、APIからは明らかではないため、私たちが知らない変異する非表示の状態を持つこともあります。これまで見てきたように、このモデルは、注意しないと偶発的な非決定論やデッドロックを引き起こす可能性があります。関数型言語のデータ型は変化しないため、変更されるリスクなしに安全に共有できます。関数には、プロパティと他のデータ型があります。関数は、プログラムの実行中に作成し、パラメーターとして別の関数に渡すか、関数呼び出しの結果として返すことができます。

C +を学ぶ方法

並行プログラミングの基本的なプリミティブは、将来と将来性です。 Futureは、別のスレッドでコードのブロックを実行し、ブロックが実行されたときに入力されるfuture値のオブジェクトを返します。

; ; future.clj ; (let [a (future (println 'Started A') (Thread/sleep 1000) (println 'Finished A') (+ 1 2)) b (future (println 'Started B') (Thread/sleep 2000) (println 'Finished B') (+ 3 4))] (println 'Waiting for futures') (+ @a @b))

このスクリプトを実行すると、出力は次のようになります。

Started A Started B Waiting for futures Finished A Finished B 10

この例では、独立して実行される2つのfutureブロックがあります。プログラムは、まだ利用できないfutureオブジェクトから値を読み取るときにのみブロックします。私たちの場合、将来のブロックの両方の結果が合計されるのを待っています。動作は予測可能(決定論的)であり、共有された可変状態がないため、常に同じ結果が得られます。

並行性に使用されるもう1つのプリミティブは、promiseです。 Promiseは、値を1回入力できるコンテナーです。 promiseを読み取るとき、スレッドはpromiseの値が満たされるまで待機します。

; ; promise.clj ; (def result (promise)) (future (println 'The result is: ' @result)) (Thread/sleep 2000) (deliver result 42)

この例では、 未来 値が保存されないことが約束されている限り、結果の印刷を待機します。 2秒後、promiseには、将来のスレッドで出力される値42が格納されます。使用する 約束 将来とは対照的にデッドロックにつながる可能性があるため、promiseを使用するときは注意してください。

; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println 'The result is: ' + @promise-result) 13)) (println 'Future result is: ' @future-result) (deliver result 42)

この例では、futureの結果とpromiseの結果を使用しています。値の設定と読み取りの順序は、メインスレッドが将来のスレッドからの値を待機し、将来のスレッドがメインスレッドからの値を待機するというものです。この動作は予測可能(決定論的)であり、プログラムが実行されるたびに再生されるため、エラーの検出と削除が容易になります。

futureを使用すると、プログラムは、futureの実行結果が必要になるまで演習を続行できます。これにより、プログラムの実行が高速化されます。将来的に複数のプロセッサがある場合は、予測可能な(決定論的な)動作を持つプログラムを並列実行できます(毎回同じ結果が得られます)。そうすれば、コンピューターの能力をより有効に活用できます。

; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println 'Start serial calculation') (time (println 'The result is: ' (+ (fibonacci 36) (fibonacci 36)))) (println 'Start parallel calculation') (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println 'The result is: ' (parallel-fibonacci)))

この例では、futureを使用すると、コンピューターの速度をより有効に活用できることがわかります。合計すると2つのフィボナッチ数があります。プログラムが結果を2回計算していることがわかります。1回目は1つのスレッドで順次、2回目は2つのスレッドで並列に計算されます。私のラップトップにはマルチコアプロセッサが搭載されているため、並列実行は順次計算の2倍の速度で動作します。

私のラップトップでこのスクリプトを実行した結果:

Start serial calculation The result is: 29860704 'Elapsed time: 2568.816524 msecs' Start parallel calculation The result is: 29860704 'Elapsed time: 1216.991448 msecs'

並行性

Clojureプログラミング言語で同時実行性と予測不可能性をサポートするには、他のスレッドが変更を確認できるように可変のデータ型を使用する必要があります。最も単純な変数データ型はアトムです。 原子 別の値に置き換えることができる値を常に持つコンテナです。値を置き換えるには、新しい値を入力するか、古い値を取得して、より頻繁に使用される新しい値を返す関数を呼び出します。アトムがロックなしで実装され、スレッドで安全に使用できることは興味深いことです。つまり、デッドロックに達することは不可能です。内部的には、atomはjava.util.concurrent.AtomicReferenceライブラリを使用します。アトムで実装された反例を見てみましょう。

; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println 'The counter is: ' @counter) (println 'Number of attempts: ' @attempts)

私のラップトップでのスクリプト実行の結果:

The counter is: 1000000 Number of attempts: 1680212

この例では、カウンターの値を含むアトムを使用します。カウンターは(swap!counter inc)とともに増加します。スワップ関数は次のように機能します。1。カウンター値を取得して保存します。2。この値に対して、新しい値を計算する指定された関数を呼び出します。3。新しい値を保存するために、古い値が変更されたかどうかをチェックするアトミック操作を使用します。値が変更されていない場合は、新しい値3bを入力します。その間に値が変更された場合は、手順1に進みます。その間に値が変更された場合は、関数を再度呼び出すことができることがわかります。値は別のスレッドからのみ変更できます。したがって、新しい値を計算する関数には副作用がなく、何度呼び出されても問題がないことが重要です。アトムの1つの制限は、変更を1つの値に同期することです。

; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println 'Bob has in account: ' @bob) (println 'Joe has in account: ' @joe) (println 'Inconsistencies while transfer: ' @inconsistencies)

このスクリプトを実行すると、次のようになります。

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

この例では、より多くの原子を変更する方法を確認できます。ある時点で、不整合が発生する可能性があります。ある時点での2つのアカウントの合計は同じではありません。複数の値の変更を調整する必要がある場合、2つの解決策があります。

  1. 1つの原子により多くの値を配置する
  2. 後で説明するように、参照とソフトウェアトランザクショナルメモリを使用します
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println 'Bob has in account: ' (get @accounts :bob)) (println 'Joe has in account: ' (get @accounts :joe)) (println 'Inconsistencies while transfer: ' @inconsistencies)

このスクリプトをコンピューターで実行すると、次のようになります。

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

この例では、地図を使用してより多くの価値を置くように調整が解決されています。アカウントから送金する場合は、その時点ですべてのアカウントを変更して、金額が同じにならないようにします。

次の変数データ型はエージェントです。エージェントは、値を変更する関数が別のスレッドで実行されるという点でのみアトムのように動作するため、変更が表示されるまでに時間がかかります。したがって、エージェントの値を読み取るときは、エージェントの値を変更するすべての関数が実行されるまで待機する関数を呼び出す必要があります。値を変更するアトム関数とは異なり、値は1回だけ呼び出されるため、副作用が発生する可能性があります。このタイプは、1つの値を同期することもでき、デッドロックすることはできません。

; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println 'The counter is: ' @counter) (println 'Number of attempts: ' @attempts)

このスクリプトをラップトップで実行すると、次のようになります。

The counter is: 1000000 Number of attempts: 1000000

この例は、アトムを使用したカウンターの実装と同じです。唯一の違いは、ここでは、awaitを使用して最終値を読み取る前に、すべてのエージェントの変更が完了するのを待っていることです。

最後の変数データ型は参照です。アトムとは異なり、参照は変更を複数の値に同期させることができます。参照の各操作は、dosyncを使用するトランザクションで行う必要があります。データを変更するこの方法は、ソフトウェアトランザクショナルメモリまたは略してSTMと呼ばれます。アカウントでの送金の例を見てみましょう。

; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println 'Bob has in account: ' @bob) (println 'Joe has in account: ' @joe) (println 'Inconsistencies while transfer: ' @inconsistencies) (println 'Attempts: ' @attempts) (println 'Transfers: ' @transfers)

このスクリプトを実行すると、次のようになります。

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

興味深いことに、行われたトランザクションの数よりも多くの試行がありました。これは、STMがロックを使用しないためです。そのため、競合が発生した場合(2つのスレッドが同じ値を変更しようとするなど)、トランザクションが再実行されます。このため、トランザクションに副作用が発生することはありません。トランザクション内で値が変化するエージェントが予測どおりに動作することがわかります。エージェントの値を変更する関数は、トランザクションの回数だけ評価されます。その理由は、エージェントがトランザクションを認識しているためです。トランザクションに副作用が必要な場合は、エージェント内で機能させる必要があります。このようにして、プログラムは予測可能な動作をします。おそらく常にSTMを使用する必要があると思うかもしれませんが、アトムはSTMよりも単純で高速であるため、経験豊富なプログラマーはアトムを使用することがよくあります。もちろん、それはそのようにプログラムを作ることが可能である場合です。副作用がある場合は、STMとエージェントを使用する以外に選択肢はありません。

アクターモデル

次の並行性のモデルは アクターモデル 。このモデルの原理は、現実の世界と似ています。建物など、たくさんの人と何かを作るという取引をする場合、建設現場の人それぞれにそれぞれの役割があります。群衆は監督者によって監督されています。労働者が職場で負傷した場合、監督者は負傷した男性の仕事を利用可能な他の人に割り当てます。必要に応じて、彼はその場所に新しい男を導くかもしれません。このサイトには、同時に(同時に)作業を行うだけでなく、同期するために互いに話し合う人が増えています。建設現場での作業をプログラムに入れると、すべての人が状態を持ち、独自のプロセスで実行される俳優になり、会話はメッセージに置き換えられます。このモデルに基づく人気のあるプログラミング言語はErlangです。この興味深い言語には、他のデータ型と同じプロパティを持つ不変のデータ型と関数があります。関数は、プログラムの実行中に作成し、引数として別の関数に渡すか、関数呼び出しの結果として返すことができます。例を挙げます エリクサー Erlang仮想マシンを使用する言語なので、Erlangと同じプログラミングモデルを使用しますが、構文が異なります。 Elixirで最も重要な3つのプリミティブは、スポーン、送信、受信です。 spawnは新しいプロセスで機能を実行し、sendはメッセージをプロセスに送信し、receiveは現在のプロセスに送信されるメッセージを受信します。

アクターモデルの最初の例は、同時にカウンターが増加します。このモデルでプログラムを作成するには、アクターにカウンターの値を持たせ、カウンターの値を設定および取得するメッセージを受信し、同時にカウンターの値を増やす2つのアクターを用意する必要があります。

# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts 'Starting counting processes' this = self counting1 = spawn fn -> IO.puts 'Counting A started' Counting.counting this, counter, 500_000 IO.puts 'Counting A finished' end counting2 = spawn fn -> IO.puts 'Counting B started' Counting.counting this, counter, 500_000 IO.puts 'Counting B finished' end IO.puts 'Waiting for counting to be done' receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts 'Counter is: #{value}' end

この例を実行すると、次のようになります。

Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

最終的に、カウンターは516827であり、予想どおり1000000ではないことがわかります。次回スクリプトを実行したときに、511010を受け取りました。この動作の理由は、カウンターが2つのメッセージを受信するためです。現在の値を取得することと新しい値を設定することです。カウンタを増やすには、プログラムは現在の値を取得し、それを1増やして、増やした値を設定する必要があります。 2つのプロセスは、カウンタープロセスに送信されるメッセージを使用して、カウンターの値を同時に読み取りおよび書き込みます。カウンタが受信するメッセージの順序は予測できず、プログラムはそれを制御できません。このシナリオを想像することができます:

  1. カウンターバリューは115
  2. プロセスAはカウンターの値を読み取ります(115)
  3. プロセスBはカウンターの値を読み取ります(115)
  4. プロセスBは値をローカルに増加させます(116)
  5. プロセスBは、増加した値をカウンターに設定します(116)
  6. プロセスAは、カウンターの値を増やします(116)
  7. プロセスAは増加した値をカウンターに設定します(116)
  8. カウンターバリューは116

シナリオを見ると、2つのプロセスでカウンターが1増加し、最終的にカウンターは2ではなく1増加します。このような絡み合いは予測できない回数発生する可能性があるため、カウンターの値は予測できません。この動作を防ぐには、増加操作を1つのメッセージで実行する必要があります。

# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts 'Starting counting processes' this = self counting1 = spawn fn -> IO.puts 'Counting A started' Counting.counting this, counter, 500_000 IO.puts 'Counting A finished' end counting2 = spawn fn -> IO.puts 'Counting B started' Counting.counting this, counter, 500_000 IO.puts 'Counting B finished' end IO.puts 'Waiting for counting to be done' receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts 'Counter is: #{value}' end

このスクリプトを実行すると、次のようになります。

Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

カウンターの値が正しいことがわかります。予測可能な(決定論的な)動作の理由は、カウンターの値が1メッセージずつ増加するため、カウンターを増やすメッセージのシーケンスが最終的な値に影響を与えないためです。アクターモデルを使用する場合、偶発的な予測不可能性(非決定論)を回避するために、メッセージがどのように絡み合い、メッセージとメッセージに対するアクションを注意深く設計できるかに注意を払う必要があります。

このモデルで2つの口座間で送金するにはどうすればよいですか?

# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %state {:amounts, accounts, sender } -> send sender, {:amounts, for account 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts 'Transfer A started' Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts 'Transfer A finished' end transfer2 = spawn fn -> IO.puts 'Transfer B started' Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts 'Transfer B finished' end IO.puts 'Waiting for transfers to be done' receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts 'Bob has in account: #{amounts[:bob]}' IO.puts 'Joe has in account: #{amounts[:joe]}' IO.puts 'Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}' end

このスクリプトを実行すると、次のようになります。

Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

プログラムの予測可能な動作を提供するアカウントの値を取得するために送金とメッセージ金額を転送するメッセージ転送を選択したため、送金は矛盾なく機能することがわかります。送金を行うときはいつでも、合計金額は同じでなければなりません。

アクターモデルはロックを引き起こし、デッドロックを引き起こす可能性があるため、プログラムを設計する際には注意が必要です。次のスクリプトは、ロックとデッドロックのシナリオをシミュレートする方法を示しています。

Web上のすべてのアニメーションフラッシュアニメーションです
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts 'Locking A, B started' spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts 'Locking A, B finished' send this, :done end IO.puts 'Locking B, A started' spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts 'Locking B, A finished' send this, :done end IO.puts 'Waiting for locking to be done' receive do :done -> nil end receive do :done -> nil End

このスクリプトをラップトップで実行すると、次のようになります。

Locking A, B started Locking B, A started Waiting for locking to be done

出力から、AとBをロックするプロセスがスタックしていることがわかります。これは、最初のプロセスが2番目のプロセスがBを解放するのを待っているのに対し、2番目のプロセスが最初のプロセスがAを解放するのを待っているために発生します。このロックを回避するには、順序を常に同じにするか、ロックを使用しないようにプログラムを設計する必要があります(つまり、特定のメッセージを待機しません)。次のリストは常に最初にA、次にBをロックします。

# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts 'Locking A, B started' spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts 'Locking A, B finished' send this, :done end IO.puts 'Locking A, B started' spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts 'Locking A, B finished' send this, :done end IO.puts 'Waiting for locking to be done' receive do :done -> nil end receive do :done -> nil End

このスクリプトをラップトップで実行すると、次のようになります。

Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

そして今、デッドロックはもうありません。

要約

並行プログラミングの概要として、いくつかの並行性モデルについて説明しました。この記事は大きすぎるため、すべてのモデルを網羅しているわけではありません。ほんの数例を挙げると、チャネルとリアクティブストリームは、他の一般的に使用されている同時実行モデルの一部です。チャネルとリアクティブストリームには、アクターモデルと多くの類似点があります。それらはすべてメッセージを送信しますが、多くのスレッドは1つのチャネルからメッセージを受信でき、リアクティブストリームはメッセージを一方向に送信して、処理の結果として一方の端からメッセージを受信し、もう一方の端からメッセージを送信する有向グラフを形成します。

共有された可変状態モデルは、先を考えないと簡単に失敗する可能性があります。競合状態とデッドロックの問題があります。異なる並行プログラミングモデルから選択できる場合は、実装と保守が簡単になりますが、そうでない場合は、何をするかについて非常に注意する必要があります。

機能的な方法は、推論と実装がはるかに簡単です。デッドロックは発生しません。このモデルは、共有可変状態モデルよりもパフォーマンスが低下する可能性がありますが、機能するプログラムは、機能しないプログラムよりも常に高速です。

アクターモデルは、並行プログラミングに適しています。競合状態とデッドロックの問題がありますが、プロセスが通信する唯一の方法はメッセージを介するため、共有可変状態モデルよりも発生する可能性は低くなります。プロセス間の優れたメッセージ設計により、それを回避できます。問題が発生した場合、それはプロセス間の通信におけるメッセージの順序または意味にあり、どこを見ればよいかがわかります。

この記事が、並行プログラミングとは何か、そしてそれがあなたが書くプログラムにどのように構造を与えるかについての洞察をあなたに与えてくれることを願っています。

関連: Rubyの並行性と並列性:実用的なチュートリアル

自動車産業におけるバーチャルリアリティ

技術

自動車産業におけるバーチャルリアリティ
インサイドセールスエグゼクティブ-東部地域

インサイドセールスエグゼクティブ-東部地域

その他

人気の投稿
Django開発者が犯す間違いトップ10
Django開発者が犯す間違いトップ10
iOSで無限ランナーを構築する方法:Cocos2D、自動化など
iOSで無限ランナーを構築する方法:Cocos2D、自動化など
iOSでT9検索を実装する方法
iOSでT9検索を実装する方法
ASP.NETCoreを使用したASP.NETWebAPIの構築
ASP.NETCoreを使用したASP.NETWebAPIの構築
より良いUXのためのUIスタイルガイドの作成
より良いUXのためのUIスタイルガイドの作成
 
なぜ自社株買いは失敗するのですか?いくつかの提案された救済策
なぜ自社株買いは失敗するのですか?いくつかの提案された救済策
10最も一般的なWebセキュリティの脆弱性
10最も一般的なWebセキュリティの脆弱性
iOSアニメーションと効率のためのチューニング
iOSアニメーションと効率のためのチューニング
ElmでWebフロントエンドの信頼性を高める
ElmでWebフロントエンドの信頼性を高める
感情分析の精度の4つの落とし穴
感情分析の精度の4つの落とし穴
人気の投稿
  • リフトvsユーバービジネスモデル
  • ビッグデータの視覚化ツール
  • awsソリューションアーキテクト合格点
  • 企業はIPO前の段階でこれを行います
  • HTMLでブートストラップを実装する方法
カテゴリー
人とチーム Uiデザイン 収益性と効率性 仕事の未来 投資家と資金調達 技術 製品ライフサイクル 収益と成長 ヒントとツール モバイルデザイン

© 2021 | 全著作権所有

apeescape2.com