Poczytaj mi Clojure, cz. 12C: Współbieżność – Referencje

Ref to typ referencyjny, dzięki któremu można wyrażać częste i jednoczesne zmiany stanów wielu współdzielonych tożsamości w synchroniczny sposób. Wykorzystywany jest tam, gdzie kilka wartości wskazywanych przez referencje zależy od siebie i modyfikacja ich wszystkich powinna być atomowa (np. przekazywanie środków między rachunkami bankowymi). Do obsługi Refów intensywnie wykorzystywana jest programowa pamięć transakcyjna (STM).

Współbieżność

Wykonywanie współbieżne (ang. concurrent) to cecha systemów i programów komputerowych, w których te same obliczenia dokonywane są jednocześnie (w tym samym czasie) przez więcej niż jeden komponent, przy opcjonalnej komunikacji między komponentami.

Więcej o współbieżności w Clojure można przeczytać w odcinku 12.

Referencje transakcyjne

Referencja transakcyjna (ang. transactional reference) to mechanizm, dzięki któremu można wyrażać częste, koordynowanewspółdzielone zmiany stanów wielu ustalonych tożsamości. Podobnie do Atomów podmiany wartości bieżących, wskazywanych odpowiednimi obiektami, dokonywane są synchronicznie, to znaczy operacja aktualizacji odniesień blokuje realizowanie wątku programu, w którym została wywołana, dopóki nie zostanie zakończona. W odróżnieniu od Atomów zmiany są najpierw szeregowane, a nie realizowane niezależnie dla każdego obiektu. Do blokowania dochodzi więc w trakcie operacji zgrupowanego aktualizowania stanów wielu obiektów.

Referencje transakcyjne wykorzystywane są do obsługi sytuacji, w których należy zmienić stan więcej niż jednego obiektu referencyjnego w tym samym czasie, a zmiana ta powinna być atomowa: albo wszystkie odniesienia do wartości zostaną podmienione, albo żadne. Przykładem może być tu przelew między rachunkami bankowymi, gdzie z jednego musi zostać odjęta pewna kwota, a na drugim musi się ona pojawić. Niedopuszczalna byłaby sytuacja, w której operacja ta zostałaby przerwana w połowie (po odjęciu kwoty z rachunku źródłowego) lub udałaby się tylko jej druga część (zasilenie rachunku docelowego).

Transakcyjne referencje (skr. Refy, ang. Refs) obsługiwane są przez referencyjny typ danych o nazwie Ref z wykorzystaniem programowej pamięci transakcyjnej (ang. Software Transactional Memory, skr. STM). Dzięki użyciu STM korzystanie z Refów nie doprowadzi nigdy do powstania tzw. zakleszczeń (ang. deadlocks) czy sytuacji wyścigów (ang. race conditions).

Korzystanie z Refów polega na utworzeniu odpowiednich obiektów, a następnie aktualizowaniu ich powiązań przez ustawienie współdzielonej wartości bieżącej lub przekazanie funkcji, która tego dokona.

Wartością współdzieloną (lub wartością bieżącą) Refa nazwiemy taką wartość, z którą powiązano dany obiekt typu Ref i odwołując się do niego w programie, możemy ją uzyskiwać. Proces ten nazywamy dereferencją.

Zazwyczaj obiekt typu Ref będzie dodatkowo w sposób stały utożsamiany z symbolem określonym zmienną globalną lub powiązaniem leksykalnym. W ten sposób można będzie identyfikować go z użyciem nazwy widocznej w odpowiednich obszarach programu.

Transakcje

W przeciwieństwie do Varów, Agentów czy Atomów, modyfikacje powiązań obiektów typu Ref muszą być dokonywane w obrębie tak zwanych transakcji (ang. transactions).

Transakcja jest zbiorem operacji, które powinny być traktowane jako niepodzielna całość: albo uda się wykonać je wszystkie i zaktualizować wiele powiązań referencyjnych, albo żadne zmiany nie zostaną wprowadzone (transakcja zostanie wycofana). Dzięki temu gwarantowane jest, że operacje na Refach będą:

  • atomowe – zmiany są jednoczesne i niepodzielne;
  • spójne – każdą aktualizowaną wartość referencji można poddać testowi poprawności z użyciem tzw. walidatorów lub dodatkowych funkcji umieszczonych w transakcji;
  • izolowane – powiązania obiektów typu Ref z wartościami nie zostaną nigdy w obrębie transakcji zmienione przez inną transakcję – są wersjami z momentu jej rozpoczęcia.

Wywołania funkcji operujących na powiązaniach obiektów typu Ref z wartościami w ramach konkretnej transakcji powinny być podane w S-wyrażeniach przekazywanych jako argument makra dosync:

Przykład transakcji
1
2
3
4
5
6
7
8
9
(let [x (ref 0)      ; referencja x (wartość początkowa 0)
      y (ref 0)]     ; referencja y (wartość początkowa 0)
  (dosync            ; transakcja:
    (ensure x)       ;  · zabezpieczamy x przed równoległymi zmianami
    (alter x inc)    ;  · zwiększamy x
    (alter y + @x))  ;  · zwiększamy y o wartość x
  (println @x @y))   ; wyświetlamy wartości x oraz y

; >> 1 1

Każda transakcja ma trzy etapy:

  1. rozpoczęcie,
  2. wykonanie,
  3. zakończenie (zamknięcie).

Transakcja rozpoczyna się, gdy pojawi się takie żądanie, a jej wykonywanie polega na obliczaniu nowych wartości bieżących pewnego zestawu referencji transakcyjnych. Zamknięcie transakcji oznacza albo jej zatwierdzenie (ang. commit), albo wycofanie (ang. rollback), które w przypadku STM oznacza jej unicestwienie (ang. kill).

Zatwierdzanie to nazwa procesu, w którym dochodzi do zastąpienia wartości bieżących Refów nowymi wartościami, obliczonymi w trakcie jej trwania. Z kolei unicestwienie oznacza, że do takiej aktualizacji nie dochodzi i zmiany są porzucane. Nie wyklucza to jednak ponownego zrealizowania transakcji, nawet automatycznego, co jest w STM częstą reakcją na konflikty.

W implementacji STM z Clojure możemy wyróżnić następujące stany transakcji:

  • działająca (RUNNING),
  • zatwierdzana (COMMITTING),
  • ponawiana (RETRY),
  • unicestwiona (KILLED),
  • zatwierdzona (COMMITTED).

Migawki

Zastosowana w Clojure implementacja programowej pamięci transakcyjnej bazuje na mechanizmie wielowersyjnej kontroli współbieżności (ang. multiversion concurrency control, skr. MCC lub MVCC) z izolowaniem migawek (ang. snapshot isolation) realizowanym przez adaptacyjne kolejkowanie historii zmian.

Migawka (ang. snapshot) jest zapamiętanym stanem wszystkich obiektów STM z danego punktu w czasie. Mechanizmy obsługi pamięci transakcyjnej korzystają z migawek, aby wykrywać zmiany i wytwarzać kopie Refów na użytek realizowanych transakcji. Dzięki temu w obrębie tych ostatnich będziemy mieli do czynienia z niezmieniającymi się, spójnymi zestawami danych.

Migawki są również wykorzystywane do utrzymywania tzw. historii zmian, która pozwala na dostęp do przeszłych wartości wybranych referencji transakcyjnych.

Wartości wewnątrztransakcyjne

Aktualizowanie powiązań Refów z wartościami w obrębie transakcji nie będzie polegało na bezpośrednim operowaniu na ich aktualnych wartościach bieżących, lecz na wersjach zapamiętanych w chwilach rozpoczynania transakcji. Te momenty nazywamy punktami odczytu (ang. read points).

Modyfikacje powiązań Refów z wartościami w obrębie transakcji są widoczne tylko w niej, do momentu zatwierdzenia zmian. Wartości tych powiązań nazywamy wartościami w transakcji lub wartościami wewnątrztransakcyjnymi (ang. in-transaction-values).

Pierwsza wartość wewnątrztransakcyjna powstaje, gdy dokonywana jest aktualizacja odpowiadającego jej Refa. Jeżeli modyfikacji nie dokonano, wartość taka nie jest tworzona, a ewentualne odczyty Refa dotyczą wartości z migawki (bieżących lub pochodzących z tzw. historii zmian).

Propagacja wartości zaktualizowanych Refów wewnątrztransakcyjnych do odpowiadających im obiektów współdzielonych nastąpi, gdy transakcja pomyślnie się zakończy i zostanie zatwierdzona (stan COMMITTED). Wyjątkiem jest użycie funkcji commute, która zmieniając stan Refa, operuje bezpośrednio na jego najbardziej aktualnej wartości współdzielonej i nie korzysta z wartości wewnątrztransakcyjnych, chociaż stwarza je i aktualizuje (w celu kompatybilności z funkcjami, które operują na tych wartościach). Modyfikacje zlecone z użyciem tej funkcji również oczekują na zatwierdzenie w odpowiedniej fazie transakcji.

Refy, które są wyłącznie odczytywane w ramach transakcji (nie dochodzi do aktualizowania ich powiązań), również korzystają z kopii (migawek) całej pamięci transakcyjnej. Opcjonalnie można więc użyć dosync nie tylko do aktualizowania powiązań Refów z nowymi wartościami, lecz także po to, aby zachować spójność relacji między logicznie zależnymi Refami, których wartości bieżących używamy w obliczeniach.

Obsługa błędów

Jeżeli podczas realizowania transakcji wystąpi jakiś krytyczny błąd, transakcja zostanie unicestwiona (stan KILLED), a żaden współdzielony obiekt typu Ref nie zostanie przez nią zaktualizowany.

Jeżeli podczas realizowania transakcji pojawi się wyjątek spowodowany brakiem akceptacji nowej wartości Refa przez przypisany do niego walidator lub funkcja walidująca zwróci wartość false, transakcja zostanie przerwana (stan KILLED).

Obsługa konfliktów

Jeżeli w trakcie wykonywania transakcji nie wystąpił błąd, lecz doszło do równoległej zmiany (w ramach innej, już zatwierdzonej transakcji) powiązania któregoś obiektów typu Ref, możemy mieć do czynienia z tzw. usterką (ang. fault).

Do usterki dochodzi wtedy, gdy:

  • w bieżącej transakcji wystąpi próba odczytu wartości obiektu typu Ref, ale nie można pobrać odpowiadającej jej wartości wewnątrztransakcyjnej (nie powstała ona jeszcze, np. w efekcie próby zmiany), a wartość uzyskana w punkcie odczytu lub w którymś z punktów historii zmian nie jest aktualną wartością współdzieloną, którą inna transakcja zdążyła zatwierdzić, zanim bieżąca się rozpoczęła;

  • w bieżącej transakcji wystąpi próba odczytu wartości obiektu typu Ref, lecz inna transakcja wygrała z nią tzw. spór;

  • w bieżącej transakcji wystąpi próba aktualizacji wartości obiektu typu Ref (z użyciem ref-set lub alter), lecz wystąpi jedna z trzech sytuacji:

    • nie można zablokować możliwości zapisu do odniesienia (bo blokada zapisu lub odczytu należy do innego wątku),
    • w międzyczasie już zmieniono wartość współdzieloną,
    • inna transakcja wytworzyła wartość wewnątrztransakcyjną danego Refa i bieżąca rozpoczęła z nią tzw. spór, lecz przegrała go;
  • bieżąca transakcja jest w fazie zatwierdzania zmian, ale inna uruchomiona transakcja dokonała zmiany wartości wewnątrztransakcyjnej aktualizowanego Refa, a tzw. spór z nią został przegrany;

  • w bieżącej transakcji dokonano próby zmiany wartości (z użyciem ref-set, alter lub commute) lub użyto ensure w stosunku do danego Refa, lecz inna transakcja rozpoczęła i wygrała z nią tzw. spór.

Warto pamiętać, że odczytywanie wartości bieżących Refów to nie tylko ich wyrażone wprost przez programistę dereferencje (z użyciem funkcji deref czy makra @), ale również aktualizacje, z wykorzystaniem alter, które wymagają wcześniejszego poznania aktualnych wartości współdzielonych lub wartości wewnątrztransakcyjnych.

Reakcją na usterkę jest ponowienie transakcji (bieżącej lub pozostającej z nią w konflikcie). Wyrażenia przekazane do makra dosync będą w takim wypadku przeliczone ponownie z przyjęciem nowego punktu odczytu. W ten sposób realizowana jest synchroniczność aktualizowania referencji transakcyjnych.

Realizowanie transakcji będzie ponawiane do czasu, aż uda się zatwierdzić ją bez konfliktów, chyba że przekroczona zostanie maksymalna liczba prób określona stałą wartością równą 10000. Jeżeli tak się stanie, zgłoszony zostanie wyjątek RetryEx.

Spory

Gdy wykryty jest potencjalny konflikt (dwie transakcje operują na tych samych obiektach typu Ref i przy próbie zatwierdzania pojawiłby się problem), jedna z transakcji może rozpocząć tzw. spór (ang. barge), aby udowodnić, że druga powinna zostać przerwana i ponowiona. Aby tak się stało, spełnione muszą być następujące warunki:

  • bieżąca transakcja rozpoczęła się wcześniej, niż druga;
  • bieżąca transakcja musi być realizowana co najmniej przez czas określony stałą BARGE_WAIT_NANOS (domyślnie 1 setna sekundy);
  • druga transakcja jest realizowana (stan RUNNING) i jeszcze nie zdążyła rozpocząć procesu zatwierdzania zmian.

Wygranie sporu przez bieżącą transakcję oznacza, że kontynuuje ona pracę, a druga zostaje przerwana (stan KILLED). Do ponowienia drugiej transakcji (stan RETRY) dojdzie, gdy wejdzie ona w fazę zatwierdzania zmian – pierwszym warunkiem jest wtedy sprawdzenie, czy nie przegrała sporu z inną transakcją.

Przegranie sporu przez bieżącą transakcję sprawia, że jej wykonywanie jest wstrzymywane na przynajmniej 1 setną sekundy (wspomniana stała BARGE_WAIT_NANOS) w oczekiwaniu na zakończenie drugiej transakcji.

Aby wykrywać, która transakcja zdarzyła się wcześniej, STM korzysta z globalnego licznika zwiększanego przy każdym rozpoczęciu transakcji. Wartość tego licznika jest następnie używana przez mechanizm obsługi transakcji, aby pamiętać jej czas rozpoczęcia i czas zatwierdzenia.

Dzięki rozpoczynaniu procesu sporu transakcja, która aktualizuje referencje, może upewnić się, że to ona kontroluje ich globalne, współdzielone stany i przygotowuje się do wejścia w fazę COMMITTING.

W praktyce powyższy sposób reakcji na konflikty zostanie zastosowany z uwzględnieniem wszystkich Refów, w odniesieniu do których w transakcji wywołano funkcję ref-set, alter lub ensure. Ta ostatnia nie aktualizuje co prawda powiązania z wartością, ale zapewnia, że obiekt typu Ref będzie chroniony przed modyfikacjami, które mogą być efektem równoległego realizowania innych transakcji.

Z uwagi na to, że transakcje mogą być automatycznie ponawiane, należy w nich unikać funkcji, które mają efekty uboczne (np. korzystają z podsystemu wejścia/wyjścia).

Komutacja wartości bieżącej

Ciekawy mechanizm aktualizowania referencji wprowadza wspomniana już funkcja commute. Z jej pomocą można zaktualizować powiązanie obiektu typu Ref, jednak w przypadku wykrycia równoległej aktualizacji nie będzie konieczne ponawianie transakcji. Dzieje się tak dlatego, że podczas zatwierdzania Refów zmienianych z użyciem commute nie korzysta się z wartości wewnątrztransakcyjnej, lecz wywołuje funkcję modyfikującą na ostatniej wartości współdzielonej. Jeżeli więc inna transakcja zmieni powiązanie obiektu typu Ref w trakcie realizowania bieżącej transakcji, to i tak dojdzie do aktualizacji odniesienia.

Podczas zatwierdzania operacji zmiany spowodowanej użyciem commute wywołana zostanie funkcja przeliczająca, która została do niej przekazana. Jako argument tej ostatniej przekazana będzie dopiero co zaktualizowana wartość bieżąca Refa (ustawiona przez wcześniej zakończoną transakcję). Przypomina to trochę działanie funkcji swap! w odniesieniu do Atomów. Zmiany będą wciąż atomowe i synchroniczne, chociaż bez zachowania kolejności operacji. Może na przykład zdarzyć się, że jedna z kilku równolegle realizowanych transakcji zostanie wykonana wcześniej, niż przewidywano, więc odniesienia, których zmiana jest efektem użycia commute, zostaną zaktualizowane w różnej kolejności. Należy mieć to na uwadze, korzystając z tej funkcji.

Historia zmian

Obsługa konfliktów i utrzymywanie spójności danych na poziomie STM możliwe jest dzięki użyciu migawek, a dokładniej porównywaniu stanu współdzielonych referencji zapisanych podczas rozpoczęcia transakcji ze stanami w momencie jej zatwierdzania. Przez spójność rozumiemy tu zestaw zależnych od siebie logicznie wartości bieżących, który jest niezmienny w pewnym okresie czasu, więc można bezpiecznie przeprowadzać na nim operacje.

Powyższe podejście zadziała w prostych przypadkach, jednak możemy mieć kłopoty, jeżeli zmiany będą bardzo częste. Wyobraźmy sobie dwie referencje transakcyjne nazwane x oraz y, których wartościami początkowymi są odpowiednio 1 i 2. Następnie załóżmy, że wykonujemy transakcję, wewnątrz której obliczamy sumę wartości wskazywanych przez Refy. Jeżeli będzie to jedna transakcja, rezultatem będzie 3:

transakcja              x   y
------------------------------------------
1.  sumowanie [start]   1   2
1.  sumowanie [stop]    1   2 (x + y = 3)

Komplikacje zaczną się, gdy równolegle rozpoczęta zostanie kolejna transakcja, zmieniająca wartość bieżącą powiązaną z x:

transakcja              x   y
------------------------------------------
1.  sumowanie [start]   1   2
2.  zmiana x  [start]   1   2
2.  zmiana x  [stop]    2   2
1.  sumowanie [stop]    1   2 (konflikt x)
1'. sumowanie [start]   2   2
1'. sumowanie [stop]    2   2

Ponieważ współdzielona wartość bieżąca x zmieniła się w trakcie wykonywania transakcji nr 1 (sumowanie), więc transakcja ta została wycofana i ponowiona (nr 1’), aby skorzystać z wartości zaktualizowanej w transakcji nr 2.

Przykład ponawiania transakcji po wykryciu konfliktu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
(def x (ref 1))
(def y (ref 2))

(future
  (dosync
    (Thread/sleep 350)
    (println "2. zapis  [start] x =" @x "+ 1")
    (alter x inc)
    (println "2. zapis   [stop] x =" @x)))

(dosync
  (println "1. odczyt [start]")
  (Thread/sleep 500)
  (println "1. odczyt  [stop] x =" @x))

Zobaczmy, co się stanie, gdy zmiany x będą częstsze:

transakcja              x   y
------------------------------------------
1.  sumowanie [start]   1   2
2.  zmiana x  [start]   1   2
2.  zmiana x  [stop]    2   2
1.  sumowanie [stop]    1   2 (konflikt x)
1'. sumowanie [start]   2   2
3.  zmiana x  [start]   2   2
3.  zmiana x  [stop]    3   2
1'. sumowanie [stop]    2   2 (konflikt x)

W transakcji nr 1 chcieliśmy sumować xy, ale w trakcie realizowania tej operacji zdążyła rozpocząć się i zakończyć transakcja nr 2, która zmieniła powiązanie referencji x z wartością. Porównanie migawek doprowadziło do wykrycia tego konfliktu i transakcja nr 1 została wycofana, a następnie ponowiona (jako transakcja nr 1’). Niestety, podczas ponawiania znowu doszło do zmiany współdzielonego stanu referencji x i jej poprzednia wartość bieżąca przestała być aktualna. Porównanie migawek znów doprowadziło do wykrycia konfliktu i operacja sumowania kolejny raz została wycofana…

Widzimy więc, że bardzo częste zmiany x (w transakcjach nr 2 i 3) prowadzą do sytuacji teoretycznie nieskończonego ponawiania transakcji nr 1. Jak można sobie z tym poradzić?

Do przeprowadzenia sensownych obliczeń potrzebujemy spójnych wartości bieżących referencji transakcyjnych, jednak w tym przypadku pojawia się problem z uzyskaniem aktualnej wersji jednej z nich, ponieważ w praktyce przy tak częstych operacjach nigdy nie uda się trafić z odczytem dokładnie w moment następujący po ostatniej aktualizacji – ulega ona zbyt szybkim zmianom.

Problemem nie jest jednak to, że aktualnych danych nie ma, lecz to, że przyjmujemy optymistyczne założenie o odczycie i użyciu wartości pochodzących z bieżącego punktu w czasie. Warto sobie w tym miejscu przypomnieć, że priorytetem STM nie jest wcale dostarczanie wartości teraźniejszych, lecz przede wszystkim spójnych zestawów wartości, na których kopiach można przeprowadzać sensowne obliczenia.

Rozwiązaniem problemu jest użycie historii zmian. Dzięki niej można będzie skorzystać z wartości minionej, pochodzącej z momentu, w którym również była ona spójna z innymi wartościami. Aby to osiągnąć, implementacja STM w Clojure tworzy dodatkowe migawki stanów dla tych obiektów typu Ref, które są aktualizowane. Są one obsługiwane z użyciem specjalnych kolejek, do których trafiają poprzednie wartości Refów podczas ich aktualizowania. W efekcie porównywanie migawek wykonywane jest nie tylko dla początku i końca transakcji, ale też dla momentów, gdy z powodu następujących po sobie konfliktów nie można użyć najnowszych wartości bieżących.

transakcja              x   y
-------------------------------------------------------
1.  sumowanie [start]   1   2
2.  zmiana x  [start]   1   2
2.  zmiana x  [stop]    2   2 (historia: x0 = 1)
3.  zmiana x  [start]   2   2
3.  zmiana x  [stop]    3   2 (historia: x1 = 2)
1'. sumowanie [stop]    1   2 (x0 + y = 3)

Odczyt wartości bieżącej Refa polega więc na sprawdzeniu czasu aktualizacji jego bieżącej, współdzielonej wartości, a jeżeli jest on późniejszy niż czas rozpoczęcia bieżącej transakcji, sprawdzany jest łańcuch historii zmian. Dla każdej umieszczonej w nim wartości historycznej (od najnowszej do najstarszej) sprawdzony będzie w taki sam sposób zapisany tam czas aktualizacji. Gdy wartość spełniająca warunek zostanie odnaleziona, to będzie wykorzystana, a blokada odczytu zwolniona.

Może zdarzyć się tak, że łańcuch historii nie będzie zawierał odpowiedniego wpisu. W takim przypadku transakcja jest ponawiana, a w obiekcie typu Ref zwiększany jest wewnętrzny licznik usterek. Licznik ten używany jest do dynamicznego zwiększania długości łańcucha historii zmian w fazie zatwierdzania transakcji (po rozszerzeniu długości łańcucha licznik będzie zerowany).

Przykład wykorzystania historii zmian po wykryciu konfliktu transakcji
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(def x (ref 1))
(ref-min-history x 5)

(future
  (Thread/sleep 350)
  (dotimes [n 5]
    (dosync
      (println (str (+ 2 n) ".") "zapis  [start] x =" @x "+ 1")
      (alter x inc)
      (println (str (+ 2 n) ".") "zapis   [stop] x =" @x))))

(dosync
  (println "1. odczyt [start]")
  (Thread/sleep 500)
  (println "1. odczyt  [stop] x =" @x))

W naszym przykładzie pierwsza transakcja nie zostanie przerwana z powodu zmiany współdzielonej wartości referencji x w trakcie jej trwania, ponieważ wykorzystana zostanie wartość pochodząca z historii zmian.

Warto zauważyć, że użyliśmy funkcji ref-min-history. Bez niej transakcja odczytująca x zostałaby po prostu ponowiona, bo historia zaczęłaby się tworzyć dopiero po wykryciu konfliktu. Dopiero każda kolejna próba odczytania nieaktualnej wartości bieżącej zwiększałaby rozmiar łańcucha historii o jeden. W ten sposób STM dba o to, aby zasoby systemowe nie były trwonione na zapamiętywanie i przeszukiwanie dodatkowych struktur, gdy nie ma takiej potrzeby. W przykładzie zależało nam jednak na tym, aby historia była używana od razu i stąd ustawienie jej minimalnej długości.

Realizowanie transakcji

Transakcja rozpoczyna się wywołaniem makra dosync, które przyjmuje zestaw wyrażeń stanowiących jej ciało. Zostaną one przekształcone do obiektu typu Callable i skojarzone z obiektem reprezentującym transakcję. W obiekcie transakcji znajdziemy też odwołania do dwóch tworzonych list: pierwsza zawierała będzie Refy, na które nałożono blokadę, a druga obiekty typu Notify (zawierające obiekty typu Ref, oraz powiązane z nimi poprzednie i bieżące wartości), które będą wykorzystane podczas wywoływania funkcji obserwujących.

Kolejnym krokiem jest uruchomienie pętli ponowień, która wykonuje 10000 powtórzeń, chyba że transakcja zakończyła się (z powodu błędu lub zatwierdzenia). Gdy wszystkie powtórzenia są zrealizowane, a transakcja nadal nie została zamknięta, zgłaszany jest wyjątek RetryEx.

Na początku pętli aktualizowana jest wartość tzw. punktu początkowego. W przypadku, gdy pętla wykonuje pierwszy przebieg, punkt ten będzie tożsamy z punktem odczytu. Dodatkowo stan transakcji (wyrażony instancją klasy Info) jest ustawiany na wartość RUNNING.

Kolejnym krokiem jest uruchomienie podprogramu obiektu typu Callable, czyli rozpoczęcie przeliczania wyrażeń przekazanych do makra dosync. Po udanym wykonaniu wyrażeń sprawdzany jest status transakcji, aby wykluczyć ewentualny spór rozpoczęty przez inną transakcję. Jeżeli takowy wystąpił, a bieżąca transakcja go przegrała, jej stan będzie ustawiony na KILLED. Jeżeli nie wykryto zmiany stanu, realizowanie jest kontynuowane, a stan zmieniany jest na COMMITTING i rozpoczyna się faza zatwierdzania.

Zatwierdzanie transakcji

Jeżeli operacje aktualizowania Refów przebiegną pomyślnie, podejmowana jest próba zatwierdzenia transakcji (stan COMMITTING).

Funkcje używane do wprowadzania zmian, które zostaną obsłużone podczas tego etapu to ref-set, altercommute. Dwie pierwsze sprawiają, że dochodzi do aktualizacji wartości wewnątrztransakcyjnych, a w momencie zatwierdzenia zmian do atomowej podmiany powiązań w odpowiednich, współdzielonych obiektach typu Ref. Odniesieniami nowych powiązań są wtedy pochodzące z transakcji, obliczone wcześniej wartości wewnątrztransakcyjne.

Inaczej działa ostatnia z wymienionych wyżej funkcji (commute). Dla każdego Refa, na którym ją zastosowano, jego wartość wewnątrztransakcyjna również będzie na bieżąco aktualizowana, ale podczas zatwierdzania transakcji nie zostanie ona użyta do podmiany powiązania współdzielonego Refa. Zamiast tego dojdzie do ponownego wywołania funkcji przeliczającej przekazanej do commute. Przekazywanym jej argumentem będzie aktualna wartość współdzielona, do której odnosi się obiekt typu Ref, a nie izolowana w transakcji wartość pochodząca z punktu odczytu. W tym czasie żaden inny wątek nie będzie mógł dokonywać aktualizacji współdzielonej wartości Refa. W praktyce oznacza to, że zachowana jest synchroniczność operacji, ale może dojść do zamiany kolejności ich realizowania w czasie.

Jeżeli przed wywołaniem commute stworzono wartość wewnątrztransakcyjną Refa z użyciem alter lub ref-set, to powyższy krok jest pomijany, a aktualizacja współdzielonych obiektów będzie polegała na powiązaniu ich z wartościami wewnątrztransakcyjnymi (które zostały zaktualizowane również z użyciem commute i przekazanej jej funkcji).

Dalsza obsługa referencji aktualizowanych z użyciem commute polega na zarządzaniu blokadami dostępu. Jeżeli w transakcji użyto ensure w odniesieniu do jakichś referencji, to zwalniane są ich blokady odczytu. Zaraz po tym na obiekty zakładane są blokady zapisu, a każdy z nich dodawany jest do listy zablokowanych Refów utrzymywanej w obiekcie reprezentującym transakcję. Wykonywane jest to w takiej kolejności, w jakiej tworzone były obiekty typu Ref, aby uniknąć sytuacji wyścigów.

Poza tym, jeżeli w stosunku do którejś referencji użyto funkcji ensure, a w trakcie realizowania transakcji inna transakcja jednak zmieniła jej wartość współdzieloną (co nie powinno się wydarzyć), bieżąca transakcja jest ponawiana. Dzieje się tak również wtedy, gdy równolegle realizowana transakcja zaktualizowała referencję zaraz przed tym, gdy w bieżącej transakcji doszło do wywołania ensure.

Jeżeli inna transakcja dokonała w międzyczasie zmiany wewnątrztransakcyjnej wartości Refa, który był modyfikowany z użyciem commute, bieżąca transakcja rozpoczyna z nią spór. Jeżeli spór zostanie przegrany, bieżąca transakcja jest ponawiana, a jeżeli wygrany, odczytana zostanie najbardziej aktualna, współdzielona wartość powiązana z referencją i będzie ona użyta jako wartość wewnątrztransakcyjna.

Po tych operacjach wywoływane są funkcje przekazane wcześniej do commute użytych względem Refów i przekazywane są im (poza innymi argumentami) wartości wewnątrztransakcyjne obiektów. Wartości wewnątrztransakcyjne są aktualizowane wartościami zwracanymi przez wywołania funkcji przeliczających.

Kolejnym etapem zatwierdzania transakcji jest założenie blokady zapisu na każdego Refa, które ma być zaktualizowany. Dotyczy to zarówno obiektów, których wartości wewnątrztransakcyjne zostały zmienione z użyciem alterref-set, jak również tych, dla których wcześniej wyliczono odpowiadające im wartości wewnątrztransakcyjne opracowywane przy okazji obsługi wywołań commute. Jeżeli dany obiekt typu Ref już został zablokowany przez inną transakcję, bieżąca transakcja jest ponawiana (następuje ustawienie stanu RETRY, a potem powrót do początku pętli).

Jeżeli uda się zablokować możliwość zapisu wszystkich aktualizowanych Refów, dochodzi do wywołania przypisanych do nich walidatorów, gdy takowe ustawiono. Jeśli któraś funkcja walidująca zwróci false lub zgłosi wyjątek, transakcja jest przerywana.

Jeżeli podczas walidacji, ale przed zablokowaniem możliwości zapisu do aktualizowanych Refów, wykryta zostanie zmiana jednego z nich (dokonana przez inną transakcję), bieżąca transakcja jest ponawiana.

Ostatni etap zatwierdzania polega na umieszczeniu zaktualizowanych wartości Refów w łańcuchach historii zmian. Łańcuch historii jest rozszerzany o nowy element, jeżeli jego długość jest zerowa lub mniejsza niż wartość określająca minimalną długość (można ją ustawić podczas tworzenia obiektu typu Ref lub korzystając z funkcji ref-min-history). Dodanie nowego elementu nastąpi również wtedy, gdy dany Ref uległ wcześniej usterce (konfliktowi z inną transakcją) i przypisany mu licznik usterek jest niezerowy, a długość łańcucha historii jest mniejsza, niż wartość określająca maksymalną długość (można ją ustawić, korzystając z funkcji ref-max-history, lub podczas tworzenia obiektu typu Ref).

Jeżeli żaden z powyższych warunków nie zostanie spełniony, łańcuch historii nie będzie rozszerzony o nowy element, lecz ostatni element stanie się początkowym, a jego wartość zastąpiona bieżącą wartością referencyjną danego Refa. Dojdzie więc do rotacji historii zmian (przesunięcia wszystkich elementów o jeden w dół z nadpisaniem pierwszego).

Po aktualizacji historii zmian przygotowywana jest lista obiektów typu Notify. Zawierają one zmienione Refy wraz z poprzednimi i bieżącymi wartościami, a użyte zostaną później, aby wywołać [funkcje obserwujące], które zostały z nimi skojarzone.

Kolejny etap to przejście do ostatniej fazy zatwierdzania. Zwalniane są założone wcześniej i nie zdjęte jeszcze blokady odczytu (np. z powodu użycia ensure). Jeżeli transakcja zakończyła się pomyślnie (jest w stanie COMMITTING), zostaje oznaczona jako zatwierdzona (stan COMMITTED), a w przeciwnym przypadku jako ponawiana (stan RETRY). Poza tym uruchamiane są funkcje obserwujące zgromadzone w obiektach typu Notify utworzonej wcześniej listy.

Gdy zatwierdzanie się powiedzie, wartości bieżące współdzielonych Refów, których wersje wewnątrztransakcyjne zmieniły się, zostaną zastąpione tymi ostatnimi. Moment, w którym dochodzi do podmiany współdzielonych powiązań podczas zatwierdzania transakcji nazywamy punktem zapisu (ang. write point).

Blokowanie równoległych operacji

Operacje transakcyjne można rozpatrywać pod kątem tego, czy powodują blokowanie realizacji równolegle wykonywanych operacji na obiektach referencyjnych. Do blokowania dochodzi wtedy, gdy mechanizmy programowej pamięci transakcyjnej muszą zapewnić synchroniczność i jednolitość wprowadzania zmian.

Zasady są następujące:

  • funkcje używane do odczytywania wartości (dereferencji) obiektów typu Ref nie blokują równoległego wykonywania funkcji używanych do: odczytywania, aktualizowaniaaktualizowania przemiennego, chyba że dereferencja następuje w transakcji i dotyczy wartości współdzielonej (bieżącej lub historycznej) a nie wewnątrztransakcyjnej – w takim przypadku na czas odczytu blokowana jest możliwość aktualizowania danego Refa;
  • funkcje używane do aktualizowania przemiennego obiektów typu Ref nie blokują równoległego wykonywania funkcji używanych do: odczytywania, aktualizowaniaaktualizowania przemiennego;
  • funkcje używane do aktualizowania obiektów typu Ref nie blokują równoległego wykonywania funkcji używanych do: odczytywania, i aktualizowania przemiennego.

Kiedy dojdzie do blokowania (oczekiwania jednej operacji na drugą)? Gdy w stosunku do obiektu typu Ref pojawi się operacja aktualizowania jego współdzielonego powiązania. Z blokowaniem będziemy też mieli do czynienia, gdy w jednej transakcji dochodzi do aktualizowania wartości bieżącej, a w równolegle realizowanej wywołano na tym samym Refie funkcję ensure.

Struktury danych

Obiekty typu Ref powinny zawierać odniesienia do niemutowalnych typów danych i struktur trwałych. Powiązanie referencji transakcyjnej z pamięciowym obiektem, który jest mutowalny, sprawi, że jej obsługa z użyciem mechanizmów języka Clojure przestanie być bezpieczne pod względem przetwarzania współbieżnego.

Zaleca się ponadto, aby tam, gdzie to możliwe, jako wartości wskazywanych referencjami transakcyjnymi używać kolekcji, ponieważ wyposażone są one w wydajne i oszczędne pod względem wykorzystania pamięci mechanizmy tworzenia wersji. STM może więc uczynić z nich użytek, tworząc wartości wewnątrztransakcyjne, czy aktualizując współdzielone stany bieżące obiektów typu Ref.

Tworzenie

Tworzenie obiektów typu Ref, ref

Do tworzenia obiektów typu Ref służy funkcja ref. Przyjmuje ona jeden obowiązkowy argument – wartość początkową (z którą powiązana zostanie referencja) – a także argumenty nieobowiązkowe opcji wyrażane w sposób nazwany, czyli w postaci par klucz–wartość.

Funkcja zwraca obiekt typu Ref.

Użycie:

  • (ref wartość-początkowa & opcje).

Możliwe opcje to:

  • :meta mapa-metadanowa,
  • :validator funkcja,
  • :min-history liczba (domyślnie 0),
  • :max-history liczba (domyślnie 10).

Mapa metadanowa po słowie kluczowym :meta powinna być mapą wyrażoną z użyciem powiązanego z nią symbolu lub osadzoną w kodzie jako tzw. mapowe S-wyrażenie. Podane asocjacje staną się metadanymi tworzonego obiektu referencyjnego.

Funkcja podawana po słowie kluczowym :validator powinna być jednoargumentowym obiektem funkcyjnym, który nie generuje efektów ubocznych, albo wartością nil. Funkcja ta będzie wywoływana za każdym razem, gdy zażądano zmiany stanu obiektu typu Ref i przekazywana jej będzie proponowana, nowa wartość. Powinna ona zwracać false lub zgłaszać wyjątek, gdy wartość ta jest nieakceptowalna. Operację taką nazywamy walidacją, a wyrażającą ją funkcję walidatorem.

Parametry :min-history:max-history pozwalają ustawiać minimalną i maksymalną liczbę elementów łańcucha historii zmian, które będą używane do zapamiętywania poprzednich wartości współdzielonych (z powodu aktualizacji lub w efekcie zastosowania funkcji ensure).

Przykład użycia funkcji ref
1
2
3
4
5
(ref  0                     )  ; => #<[email protected]e019b8e: 0>
(ref  0         :meta {:a 1})  ; => #<[email protected]: 0>
(ref                 [1 2 3])  ; => #<[email protected]: [1 2 3]>
(ref 10 :validator #(> %1 1))  ; => #<[email protected]: 10>
(ref  0 :validator #(> %1 1))  ; >> java.lang.IllegalStateException

Wyrażenie z linii nr 1 tworzy obiekt referencyjny typu Ref, który odnosi się do wartości 0.

Wyrażenie drugie (z linii nr 2) działa tak samo, lecz dodatkowo ustawia metadaną.

Kolejne wyrażenie przykładu (z linii nr 3) ustawia wartość początkową na wektor stworzony z użyciem wyrażenia wektorowego.

Dwa ostatnie wyrażenia dotyczą sytuacji, w których przekazywana jest funkcja walidująca. Ostatnie wywołanie powoduje zgłoszenie wyjątku, ponieważ podana wartość początkowa nie jest poprawna.

Identyfikowanie Refów

Refy są obiektami, z którymi, podobnie jak z innymi pamięciowymi wartościami, można powiązać symboliczne identyfikatory i w ten sposób określać ich tożsamości. Możemy więc stwarzać odnoszące się do obiektów typu Ref globalne zmienne, jak również korzystać z innych rodzajów powiązań (np. leksykalnych).

Przykłady tworzenia powiązań z obiektami typu Ref
1
2
(def referencja (ref 0))     ; zmienna globalna
(let [refik (ref 0)] refik)  ; powiązanie leksykalne

Zarządzanie transakcją

Tworzenie transakcji, dosync

Makro dosync służy do tworzenia transakcji. Przyjmuje ono zero lub więcej wyrażeń, które staną się ciałem transakcji, tzn. będą przeliczane podczas jej realizowania.

Transakcja rozpoczyna się, jeżeli inna transakcja jeszcze nie działa w bieżącym wątku. Jeżeli działa, to ciało drugiej transakcji (syntaktycznie zagnieżdżonej) zostanie dołączone do ciała pierwszej.

Jeżeli podczas obliczania wartości podanych wyrażeń zgłoszony zostanie nieprzechwycony wyjątek, transakcja zostanie przerwana i nastąpi wyjście z dosync.

S-wyrażenia podane w dosync mogą być uruchomione więcej niż raz, ponieważ transakcja może zostać ponowiona. Należy więc unikać obsługi wejścia/wyjścia i stosowania innych funkcji mających efekty uboczne.

Wartością zwracaną przez makro dosync jest wartość ostatnio przeliczanego wyrażenia z jej ciała.

Użycie:

  • (dosync & wyrażenie…).
Przykłady użycia makra dosync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
(let [x (ref 0)
      y (ref 1)]
  (dosync
    (ref-set x 10)
    (alter   x inc)
    (alter   y + (ensure x))))

; => 12

;; transakcje złączone – jeden wątek
(dosync
  (println "[start] pierwsza")
  (dosync
    (println "[start] druga")
    (Thread/sleep 500)
    (println "[stop]  druga"))
  (println "[stop]  pierwsza"))

; >> [start] pierwsza
; >> [start] druga
; >> [stop]  druga
; >> [stop]  pierwsza

;; transakcje równoległe – dwa wątki
(dosync
  (println "[start] pierwsza")
  (future (dosync
            (println "[start] druga")
            (Thread/sleep 500)
            (println "[stop]  druga")))
  (println "[stop]  pierwsza"))

; >> [start] pierwsza
; >> [start] druga
; >> [stop]  pierwsza
; >> [stop]  druga

Blokowanie wejścia/wyjścia, io!

W transakcjach powinniśmy unikać stosowania operacji, które są blokujące. Należą do nich m.in. funkcje obsługi wejścia/wyjścia. Dzięki makru io! możemy oznaczyć te fragmenty programów, które są blokujące (związane z obsługą wejścia lub wyjścia).

Jeżeli kod zawierający wywołanie io! zostanie uruchomiony z wnętrza transakcji, dojdzie do zgłoszenia wyjątku IllegalStateException, a jej wykonywanie zostanie przerwane.

Argumentami io! powinny być wyrażenia, które mają być przeliczone. Jeżeli pierwszym przekazywanym argumentem będzie literał łańcucha znakowego, to wyrażony nim tekst zostanie użyty jako komunikat towarzyszący wyjątkowi.

Użycie:

  • (io! komunikat & wyrażenie…),
  • (io! & wyrażenie…).

Pobieranie wartości

Odczytywanie wartości Refów, deref

Żeby odczytać wartość referencji transakcyjnej, możemy użyć funkcji deref. Przyjmuje ona jeden argument, którym powinien być obiekt typu Ref, a zwraca wartość, do której odniesienie jest przechowywane w tym obiekcie.

Dereferencji można użyć w dowolnym miejscu programu, ale jeżeli zależy nam na spójności danych w obrębie pewnego zestawu Refów, powinniśmy rozważyć wywołanie tej funkcji w transakcji. Zostaną wtedy założone odpowiednie blokady, a pobierana wartość będzie wartością spójną z innymi. Jeżeli którejś z używanych wartości nie modyfikujemy, zaleca się w odniesieniu do niej użyć funkcji ensure.

Wywołanie deref w transakcji zwróci najbardziej aktualną wartość wewnątrztransakcyjną, natomiast poza transakcją wartość współdzieloną (bieżącą) obiektu typu Ref.

Użycie:

  • (deref ref).
Przykład użycia funkcji deref
1
2
3
(def referencja (ref 0))
(deref referencja)
; => 0

Zobacz także:

Dereferencja Refów, makro @

Makro czytnika @ (znak małpki) umieszczone przed wyrażeniem sprawia, że jeśli zwracany przez nie obiekt jest typem referencyjnym, to wywołana zostanie na nim funkcja odpowiedzialna za odczyt wskazywanej wartości. W przypadku Refów będzie to omówiona wyżej funkcja deref.

Dereferencji można użyć w dowolnym miejscu programu, ale jeżeli zależy nam na spójności danych, powinniśmy rozważyć wywołanie tej funkcji w transakcji. Gdy w obliczeniach korzystamy z wartości, których nie modyfikujemy, zaleca się użycie w odniesieniu do nich funkcji ensure, aby przyjęty zestaw Refów był spójny.

Wywołanie makra @ w transakcji zwróci najbardziej aktualną wartość wewnątrztransakcyjną, natomiast poza transakcją wartość współdzieloną (bieżącą) obiektu typu Ref.

Użycie:

  • @ref.
Przykład dereferencji Refa z użyciem makra czytnika @
1
2
3
(def referencja (ref 0))
@referencja
; => 0

Zmiany wartości

Aktualizowanie wartości, alter

Zmiana stanu obiektu typu Ref, czyli aktualizacja współdzielonego odniesienia do wartości, możliwa jest z zastosowaniem funkcji alter.

Pierwszy przyjmowany argument powinien być symbolem identyfikującym obiekt typu Ref lub inne wyrażenie, które przeliczone zwróci ten obiekt. Drugim argumentem powinna być funkcja, która jako pierwszy argument przyjmie wartość bieżącą (do której odnosi się Ref) i na jej podstawie obliczy nową wartość. Zostanie ona użyta do zastąpienia poprzedniego, referencyjnego odniesienia w Refie. Opcjonalnie możemy podać dodatkowe argumenty, które zostaną przekazane jako kolejne argumenty wywoływanej funkcji.

Wywołanie funkcji alter musi odbywać się w transakcji. Efektem jej działania jest natychmiastowa zmiana lub utworzenie wartości wewnątrztransakcyjnej, natomiast do ewentualnej aktualizacji wartości współdzielonej dochodzi dopiero wtedy, gdy cała transakcja z powodzeniem się zakończy.

Przekazana funkcja obliczająca nową wartość bieżącą Refa powinna być nieblokująca i bez efektów ubocznych, ponieważ będzie realizowana w wątku należącym do transakcji, która może być ponawiana, zakładać blokady na obiekty referencji transakcyjnych i sprawiać, że inne transakcje będą działały mniej wydajnie.

Wartość bieżąca Refa przekazywana funkcji przeliczającej będzie wartością wewnątrztransakcyjną (jeśli taką już utworzono poprzednią modyfikacją) lub wartością współdzieloną, uzyskaną w punkcie odczytu. Jeżeli podczas odczytywania wartości okaże się, że wartość współdzielona uległa zmianie (z powodu działania innej transakcji) i wartość z punktu odczytu przestała być aktualna, przeszukany zostanie łańcuch historii zmian. Jeżeli nawet tam nie zostanie znaleziona aktualna wartość współdzielona Refa, transakcja będzie ponowiona.

Gdyby któryś z kroków pośrednich (między odczytem wartości bieżącej a aktualizacją referencji) nie powiódł się, będziemy mieli do czynienia z usterką Refa, czyli sytuacją wyjątkową. Może ona wystąpić z powodu błędu lub niespełnienia wymogów skojarzonej z Refem funkcji walidującej. Innym powodem usterki może być spór z równolegle realizowaną transakcją podczas próby odczytania aktualnej wartości bieżącej. W przypadku nieudanej walidacji, transakcja jest przerywana, a w przypadku przegranego sporu ponawiana zaraz po wejściu w fazę zatwierdzania.

Wartością zwracaną przez funkcję alter jest ustawiona wartość obiektu typu Ref, która jest wartością wewnątrztransakcyjną.

Użycie:

  • (alter ref funkcja & argument…).
Przykład użycia funkcji alter
1
2
3
4
5
(def referencja (ref 0))

@referencja                      ; => 0
(dosync (alter referencja inc))  ; => 1
@referencja                      ; => 1

Ustawianie wartości, ref-set

Ustawienie stanu obiektu typu Ref, czyli bezwarunkowa zmiana współdzielonego odniesienia do wartości, możliwa jest z zastosowaniem funkcji ref-set.

Pierwszy przyjmowany argument powinien być symbolem identyfikującym obiekt typu Ref lub inne wyrażenie, które przeliczone zwróci ten obiekt. Drugim argumentem powinna być wartość, z którą chcemy powiązać Refa, zastępując obecną wartość współdzieloną.

Wywołanie funkcji ref-set musi odbywać się w transakcji. Efektem jej działania jest natychmiastowa zmiana lub utworzenie wartości wewnątrztransakcyjnej, natomiast do ewentualnej aktualizacji wartości współdzielonej dochodzi dopiero wtedy, gdy cała transakcja z powodzeniem się zakończy.

Tuż przed zatwierdzeniem zmian dokonanych w transakcji z użyciem ref-set wywołane będą przypisane do Refów walidatory (jeśli je ustawiono). W wypadku niepowodzenia (walidator zwróci false lub zgłosi wyjątek) cała transakcja zostanie ponowiona.

Istotną różnicą względem alter – poza tym, że nie mamy do czynienia z funkcją przeliczającą – jest brak fazy odczytywania wartości bieżącej referencji. Oznacza to mniej wewnętrznych blokad i znacznie mniejszą szansę na wystąpienie konfliktów.

Wartością zwracaną przez funkcję ref-set jest ustawiona wartość obiektu typu Ref, która jest wartością wewnątrztransakcyjną.

Użycie:

  • (ref-set ref wartość).
Przykład użycia funkcji ref-set
1
2
3
4
5
(def referencja (ref 0))

@referencja                      ; => 0
(dosync (ref-set referencja 1))  ; => 1
@referencja                      ; => 1

Podmiana wartości, commute

Komutacja stanu obiektu typu Ref, czyli podmiana współdzielonego odniesienia do wartości bieżącej, możliwa jest z zastosowaniem funkcji commute. Działa ona podobnie do alter, jednak przekazana funkcja przeliczająca będzie wywołana na samym końcu transakcji, w momencie jej zatwierdzania, a zwracana przez nią wartość zastąpi wartość współdzieloną Refa. Możemy zaobserwować, że analogiczna w działaniu jest funkcja swap! związana z typem Atom, chociaż nie korzysta ona z dobrodziejstw STM.

Podczas użycia pozostałych funkcji aktualizujących najpierw dochodzi do podmiany wartości wewnątrztransakcyjnej, a dopiero ona podczas finalizowania transakcji staje się wartością współdzieloną. W przypadku commute to nie wartość wewnątrztransakcyjna stanie się wartością współdzieloną, lecz w fazie zatwierdzania transakcji po raz kolejny wywołana zostanie podana jako argument commute funkcja przeliczająca, a zwrócony przez nią wynik będzie użyty do zmiany stanu.

W tym samym czasie tylko jedna funkcja przeliczająca będzie zmieniała stan danego obiektu typu Ref, a przekazywaną jako jej argument wartością będzie aktualna wartość bieżąca Refa, a nie wartość wewnątrztransakcyjna. Wyjątek stanowią sytuacje, gdy w transakcji poza commute skorzystano z innych funkcji aktualizujących w odniesieniu do tego samego obiektu.

Używając commute, pomijamy fazę porównywania współdzielonej wartości bieżącej Refa z migawką wykonaną w punkcie odczytu, ponieważ nie korzystamy z wartości wewnątrztransakcyjnej, która w czasie realizowania transakcji mogłaby się zdezaktualizować. Zmiana dokonywana jest w sposób atomowy i polega na zablokowaniu zapisu do danego Refa na czas uruchomienia podanej funkcji. W efekcie zyskujemy na prędkości realizacji w przypadku wielu wątków, ponieważ dochodzi do mniejszej liczby ponowień transakcji spowodowanej brakiem konfliktów. Tracimy na tym, że może nie być zachowana kolejność operacji. Następcze wywoływanie commute w jednym wątku może zajmować tyle samo lub nawet więcej czasu, co korzystanie alter. Zysk wydajnościowy osiągniemy wówczas, gdy wykorzystamy tę funkcję w wywołaniach współbieżnych.

Pierwszy przyjmowany przez funkcję commute argument powinien być symbolem identyfikującym obiekt typu Ref lub inne wyrażenie, które przeliczone zwróci ten obiekt. Drugim argumentem powinna być funkcja przeliczająca, która jako pierwszy argument przyjmie wartość bieżącą Refa i na jej podstawie obliczy nową wartość. Ta ostatnia zostanie wskazana odniesieniem w Refie i zastąpi tym samym zastaną wartość bieżącą. Opcjonalnie możemy podać dodatkowe argumenty, które zostaną przekazane jako kolejne argumenty funkcji przeliczającej.

Wywołanie funkcji commute musi odbywać się w transakcji. Efektem jej działania jest natychmiastowa zmiana lub utworzenie wartości wewnątrztransakcyjnej, a dodatkowo zmiana wartości współdzielonej.

Przekazana funkcja obliczająca nową wartość bieżącą Refa powinna być nieblokująca i bez efektów ubocznych, ponieważ będzie realizowana w wątku należącym do transakcji, która może być ponawiana, zakładać blokady na obiekty referencji transakcyjnych i sprawiać, że inne transakcje będą działały mniej wydajnie.

Wartość bieżąca Refa przekazywana funkcji przeliczającej będzie ostatnią wartością współdzieloną, a nie wartością wewnątrztransakcyjną, chyba że przed wywołaniem commute utworzono wartość wewnątrztransakcyjną danego Refa z użyciem alter lub ref-set. W takim przypadku będzie to wartość wewnątrztransakcyjna.

Jeżeli w transakcji używamy commute (bez wcześniejszego wywołania na danym obiekcie typu Ref alter lub ref-set), to w przypadku wykrycia równoległej aktualizacji, która miała miejsce w trakcie realizowania bieżącej transakcji, nie będzie konieczne jej ponawianie. Jeżeli więc inna transakcja zmieni powiązanie obiektu typu Ref, to i tak dojdzie do pomyślnej aktualizacji odniesienia.

Zmiany wprowadzane z użyciem commute są atomowe i synchroniczne, ale wartość bieżąca Refa może być niespójna z innymi wartościami migawki utworzonej na potrzeby realizowania transakcji. Oznacza to, że funkcja przekazana do commute, jak i projektowana w transakcji operacja na danym Refie, powinny być przemienne (komutatywne), czyli zakładać, że kolejność aktualizowania wartości nie jest istotna. Przykładem takiej operacji jest dokonywanie wpłat na rachunek bankowy (ale już nie wykonywanie przelewu!). Nie jest ważne w jakiej kolejności wpłaty się pojawią – po jakimś czasie stan rachunku będzie się zgadzał.

Powyższy proces można obsłużyć, używając alter, jednak zajmie on wtedy więcej czasu, bo w przypadku transakcji równoległych dojdzie do sporów i ponowień. Stanie się tak, ponieważ korzystając z alter przyjmujemy założenie, że różne wartości Refów obsługiwanych w ramach tej samej transakcji muszą tworzyć spójne zestawy, a więc jedne transakcje powinny oczekiwać na zakończenia innych, jeśli dotyczą obiektów, których wartości były aktualizowane. Funkcja commute jest wolna od tego założenia.

Tuż przed zatwierdzeniem zmian dokonanych w transakcji z użyciem commute wywołane będą przypisane do Refów walidatory (jeśli je ustawiono). W wypadku niepowodzenia (walidator zwróci false lub zgłosi wyjątek) transakcja zostanie wycofana.

Wartością zwracaną przez funkcję commute jest ustawiona wartość obiektu typu Ref, która jest wartością wewnątrztransakcyjną. Warto jednak pamiętać, że rzeczywiście ustawiona wartość współdzielona może być od niej różna, gdy dojdzie do ostatecznego wywołania przekazanej do commute funkcji. Dzieje się tak, ponieważ do jej wyliczenia użyta może zostać wartość współdzielona Refa ustawiona przez równolegle realizowaną transakcję, która zakończyła się wcześniej.

Użycie:

  • (commute ref funkcja & argument…).
Przykład użycia funkcji commute i porównanie czasu z alter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(def x (ref 0))
(time
  (dorun
    (apply pcalls (repeat 50
                          #(dosync
                             (Thread/sleep 50)
                             (commute x inc))))))

; >> "Elapsed time: 511.852803 msecs"

(def x (ref 0))
(time
  (dorun
    (apply pcalls (repeat 50
                          #(dosync
                             (Thread/sleep 50)
                             (alter x inc))))))

; >> "Elapsed time: 2512.555332 msecs"

Zmiany stanów struktur złożonych

Nierzadko będziemy mieli do czynienia z sytuacjami, w których Ref został powiązany z wartością, która jest złożoną strukturą danych, na przykład wektorem czy mapą. Podmiany wartości można wtedy dokonać, przekazując odpowiednią funkcję przeliczającą, na przykład w taki sposób:

Przykład niepoprawnego aktualizowania struktury złożonej
1
2
3
4
(def x (ref {:imię "Paweł" :wynik 107}))

(dosync (commute x assoc :wynik (inc (:wynik @x))))
; => {:imię "Paweł" :wynik 108}

W powyższym przykładzie założyliśmy, że klucz :wynik identyfikuje wartość, w przypadku której kolejność prowadzenia obliczeń nie ma znaczenia i do zmiany stanu użyliśmy commute. Problematyczne jest jednak to, że do funkcji assoc odpowiadającej za aktualizację mapy, przekazujemy przez wartość rezultat uprzedniego wywołania funkcji inc na odczytanym elemencie mapy, którą uzyskaliśmy dzięki dereferencji. Operacja ta nie jest więc atomowa i istnieje ryzyko, że funkcja przeliczająca otrzyma rezultat bazujący na nieaktualnej już strukturze pochodzącej z migawki, a nie na wartości bieżącej. Bezpiecznym wariantem byłby więc kod korzystający z alterensure:

Przykład aktualizowania struktury złożonej
1
2
3
4
5
6
(def x (ref {:imię "Paweł" :wynik 107}))

(dosync
  (ensure x)
  (alter x assoc :wynik (inc (:wynik @x))))
; => {:imię "Paweł" :wynik 108}

W powyższym przykładzie tracimy jednak na ewentualnej prędkości operacji wykonywanych współbieżnie, ponieważ dokonujemy wyłącznej blokady zapisu do Refa x na czas trwania transakcji.

Można zaradzić opisywanemu tu problemowi w nieco inny sposób, korzystając z jednej struktury, która jest przekazywana jako argument do funkcji przeliczającej. Chcemy pozbyć się przekazywania wartości pochodzącej z migawki, więc powinniśmy utworzyć funkcję aktualizującą samodzielnie (np. z użyciem fn lub lambda-wyrażenia):

Przykład aktualizowania struktury złożonej z użyciem własnej funkcji
1
2
3
4
5
6
(def x (ref {:imię "Paweł" :wynik 107}))

(dosync
  (commute x (fn [mapa]
               (assoc mapa :wynik (inc (:wynik mapa))))))
; => {:imię "Paweł" :wynik 108}

Nasz przykład można jednak zoptymalizować, ponieważ język Clojure wyposażono w odpowiednie funkcje służące do funkcyjnego aktualizowania wbudowanych struktur złożonych:

Przykład aktualizowania struktury złożonej z użyciem wbudowanej funkcji
1
2
3
4
(def x (ref {:imię "Paweł" :wynik 107}))

(dosync (commute x update :wynik inc))
; => {:imię "Paweł" :wynik 108}

Prawda, że bardziej elegancko? Skorzystaliśmy z funkcji commute, której przekazaliśmy funkcję update w celu zaktualizowania elementu mapy wskazywanej przez x o podanym kluczu (:wynik). Sama operacja aktualizacji również jest obiektem funkcyjnym przekazywanym jako argument wywołania update – w tym przypadku używamy wbudowanej funkcji inc, zwiększającej wartość o 1.

Powyższa czynność zostanie wykonana w momencie zatwierdzania transakcji, a w trakcie realizowania obliczeń w obrębie transakcji nie będziemy dokonywali dereferencji, ani tworzyli własnych funkcji.

W odniesieniu do wbudowanych, złożonych struktur możemy używać następujących operacji:

  • mapy:
    • update – aktualizacja wartości elementu o podanym kluczu,
    • update-in – jak update, ale dla struktur zagnieżdżonych;
  • wektory:
    • update – aktualizacja wartości elementu o podanym numerze indeksu,
    • update-in – jak update, ale dla struktur zagnieżdżonych.

Oczywiście w przypadkach, gdzie element złożonej struktury musi zostać zmieniony na bazie wartości stanowiącej element innej struktury ten sposób nie wystarczy. Warto wtedy korzystać z funkcji ensure, a zmian dokonywać z użyciem alter, aby mieć pewność, że w obrębie transakcji będziemy mieli do czynienia ze spójnym zestawem wartości, które nie zostaną zmodyfikowane w trakcie jej trwania.

Sterowanie odczytem

Ochrona przed zmianami, ensure

Funkcja ensure sprawia, że podany obiekt typu Ref będzie chroniony przed modyfikacjami, które mogłyby być efektem zatwierdzenia zmian przez równolegle realizowane transakcje. Uzyskana wartość będzie zawsze aktualną wartością współdzieloną. Jeżeli nie uda się takiej uzyskać od razu, transakcja będzie ponowiona, chociaż najczęściej dojdzie do sytuacji, w których równoległe transakcje (dokonujące aktualizacji danego Refa) będą ponawiane, dopóki transakcja z wywołaniem ensure się nie zakończy.

Funkcja ensure musi być wywołana w transakcji.

Jeżeli w transakcji użyto ensure w odniesieniu do obiektu typu Ref, a w trakcie jej realizowania inna transakcja jednak zmieniła wartość współdzieloną tego Refa (co nie powinno się wydarzyć), to bieżąca transakcja zostanie ponowiona. Stanie się tak również wtedy, gdy równolegle realizowana transakcja zaktualizowała referencję zaraz przed tym, gdy w bieżącej transakcji doszło do wywołania ensure.

Wartością zwracaną jest wartość wewnątrztransakcyjna obiektu typu Ref.

Użycie:

  • (ensure ref).
Przykład użycie funkcji ensure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(def x (ref 1))
(def y (ref 2))

(future
  (dosync
    (Thread/sleep 500)
    (println "2. zapis  [start] x =" @x "+ 1")
    (alter x inc)
    (println "2. zapis   [stop] x =" @x)))

(dosync
  (ensure x)
  (println "1. odczyt [start]")
  (Thread/sleep 2000)
  (println "1. odczyt  [stop] x =" @x))

; >> 1. odczyt [start]
; >> 2. zapis  [start] x = 1 + 1
; >> 2. zapis  [start] x = 1 + 1
; >> 2. zapis  [start] x = 1 + 1
; >> 1. odczyt  [stop] x = 1
; >> 2. zapis  [start] x = 1 + 1
; >> 2. zapis   [stop] x = 2

Sterowanie historią zmian

Maksymalna długość historii, ref-max-history

Funkcja ref-max-history służy do pobierania bądź ustawiania wartości określającej maksymalną długość łańcucha historii zmian podanego obiektu typu Ref.

Pierwszym, obowiązkowym argumentem powinien być obiekt referencji transakcyjnej. Jeżeli nie podano więcej argumentów, funkcja odczyta bieżącą wartość określającą maksymalną długość łańcucha. Jeżeli podano kolejny argument, oczekuje się, że będzie to liczba całkowita stanowiąca nową wartość maksymalnej długości łańcucha.

Mechanizmy STM odpowiedzialne za utrzymywanie łańcucha historii zmian dla Refów aktualizowanych w transakcjach korzystają z ustalonej z użyciem ref-max-history wartości podczas automatycznego rozszerzania historii. Domyślną wartością jest 10, ale możemy ją zwiększyć przez co historia zmian będzie dłuższa, co może być konieczne w pewnych sytuacjach.

Funkcja zwraca maksymalną długość historii zmian dla podanego Refa.

Użycie:

  • (ref-max-history ref),
  • (ref-max-history ref długość).
Przykład użycia funkcji ref-max-history
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
(def x (ref 1))
(ref-max-history x 20)

(future
  (Thread/sleep 350)
  (dotimes [n 5]
    (dosync
      (Thread/sleep 100)
      (alter x inc))))

(dotimes [_ 3]
  (dosync
    (Thread/sleep 100)
    (println "x =" @x)))

; >> x = 1
; >> x = 1
; >> x = 2

(def x (ref 1))
(ref-max-history x 0)

(future
  (Thread/sleep 350)
  (dotimes [n 5]
    (dosync
      (Thread/sleep 100)
      (alter x inc))))

(dotimes [_ 3]
  (dosync
    (Thread/sleep 100)
    (println "x =" @x)))

; >> x = 1
; >> x = 1
; >> x = 6

Minimalna długość historii, ref-min-history

Funkcja ref-min-history służy do pobierania bądź ustawiania wartości określającej minimalną długość łańcucha historii zmian podanego obiektu typu Ref.

Pierwszym, obowiązkowym argumentem powinien być obiekt referencji transakcyjnej. Jeżeli nie podano więcej argumentów, funkcja odczyta bieżącą wartość określającą minimalną długość łańcucha. Jeżeli podano kolejny argument, oczekuje się, że będzie to liczba całkowita stanowiąca nową wartość minimalnej długości łańcucha.

Mechanizmy STM odpowiedzialne za utrzymywanie łańcucha historii zmian dla Refów aktualizowanych w transakcjach korzystają z ustalonej z użyciem ref-min-history wartości podczas automatycznego rozszerzania historii. Domyślną wartością jest 0, ale możemy ją zwiększyć przez co historia będzie tworzona już przy pierwszej aktualizacji wartości danego Refa, a nie dopiero w reakcji na wcześniejsze konflikty i niepowodzenia.

Funkcja zwraca minimalną długość historii zmian dla podanego Refa.

Użycie:

  • (ref-min-history ref),
  • (ref-min-history ref długość).
Przykład użycia funkcji ref-min-history
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(def x (ref 1))
(ref-min-history x 0)

(future
  (Thread/sleep 350)
  (dotimes [n 5]
    (dosync
      (alter x inc))))

(dosync
  (Thread/sleep 500)
  (println "x =" @x))

; >> x = 6

(def x (ref 1))
(ref-min-history x 20)

(future
  (Thread/sleep 350)
  (dotimes [n 5]
    (dosync
      (alter x inc))))

(dosync
  (Thread/sleep 500)
  (println "x =" @x))

; >> x = 1

Długość historii, ref-history-count

Funkcja ref-history-count pozwala poznać aktualną długość historii zmian, czyli liczbę zapisanych, poprzednich wartości współdzielonych danego Refa.

Funkcja przyjmuje jeden obowiązkowy argument, którym powinien być obiekt typu Ref, a zwraca liczbę całkowitą wyrażającą liczbę elementów łańcucha historii zmian.

Użycie:

  • (ref-history-count ref).
Przykład użycia funkcji ref-history-count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
(def x (ref 1))
(ref-min-history x 20)

(future
  (Thread/sleep 350)
  (dotimes [n 5]
    (dosync
      (alter x inc))))

(dosync
  (Thread/sleep 500)
  (ref-history-count x))

; => 5

Walidatory

Referencje transakcyjne można opcjonalnie wyposażyć w mechanizmy testujące poprawność ustawianych wartości bieżących. Służą do tego funkcje set-validator!get-validator.

Walidator (ang. validator) to w tym przypadku czysta (wolna od efektów ubocznych), jednoargumentowa funkcja, która przyjmuje wartość poddawaną sprawdzaniu. Jeśli jest ona niedopuszczalna, funkcja powinna zwrócić wartość false lub wygenerować wyjątek.

Zarządzanie walidatorami, set-validator!

Funkcja set-validator! umożliwia ustawianie lub usuwanie walidatora dla podanego jako pierwszy argument obiektu typu Ref.

Jako drugi argument należy podać funkcję, która powinna przyjmować jeden argument i nie generować efektów ubocznych. Będzie ona wywoływana za każdym razem, gdy dojdzie do zmiany stanu referencji, a jako argument przekazywana będzie jej wartość, z którą zażądano powiązania. Funkcja ta powinna zwracać wartość false lub zgłaszać wyjątek, gdy przyjmowana wartość jest nieakceptowalna.

Pierwsze sprawdzenie stanu jest dokonywane już w momencie ustawiania walidatora i w wątku, w którym doszło do wywołania funkcji. Bieżąca wartość współdzielona wskazywana referencją musi więc już wtedy być odpowiednia. Kolejne wywołania walidatora odbywają się w transakcjach, podczas ich zatwierdzania (faza COMMITTING).

Jeżeli sprawdzanie poprawności nie powiedzie się, transakcja, w której ustawiono Refa zostanie ponowiona.

Jeżeli zamiast obiektu funkcyjnego podamy wartość nil, to walidator zostanie odłączony od podanego obiektu typu Ref.

Funkcja set-validator! zwraca wartość nil, jeżeli udało się ustawić walidator.

Przykład użycia funkcji set-validator!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
(def referencja (ref 1))
(def druga      (ref 1))

(set-validator! referencja pos?)  ;; akceptujemy tylko liczby dodatnie

(dosync (ref-set referencja 2))   ; => 2
(dosync (ref-set referencja 1))   ; => 1
(dosync (ref-set referencja -1))  ; >> java.lang.IllegalStateException:
                                  ; >> Invalid reference state

(dosync
  (println "start")
  (alter referencja dec)
  (alter druga inc)
  (println "stop"))

; >> start
; >> stop
; >> java.lang.IllegalStateException: Invalid reference state

@referencja                       ; => 1
@druga                            ; => 1

Pobieranie walidatora, get-validator

Mając obiekt typu Ref, możemy pobrać funkcyjny obiekt jego walidatora, posługując się funkcją get-validator. Przyjmuje ona jeden argument, który powinien być obiektem typu Ref, a zwraca obiekt funkcji lub nil, jeśli walidatora nie ustawiono.

Funkcja zwraca wartość nil.

Użycie:

  • (get-validator ref).
Przykład użycia funkcji get-validator
1
2
3
4
5
6
7
8
(def referencja (ref 0))
(def nasz-walidator #(< % 5))

(set-validator! referencja nasz-walidator)
; => nil

(get-validator referencja)
; => #<user$nasz_walidator [email protected]>

Przykłady zastosowań

Globalne tożsamości

Korzystając ze zmiennych globalnych, jesteśmy w stanie tworzyć stałe tożsamości dla wyrażających zmienne stany (przez odniesienia do różnych wartości) obiektów typu Ref. W ten sposób możemy w całym programie odwoływać się do konkretnego Refa z użyciem nadanej mu symbolicznej nazwy o globalnym zasięgu.

Spróbujmy zaimplementować przykładowy licznik w postaci funkcji, która za każdym wywołaniem będzie zwiększała jego wartość o jeden, a jeżeli podamy argument, dokonane zostanie przypisanie konkretnej wartości początkowej:

Przykład globalnego licznika i wieloczłonowej funkcji obsługującej
1
2
3
4
5
6
7
8
9
10
11
12
(defonce ref-licznika (ref -1))

(defn licznik
  ([x] (dosync (ref-set ref-licznika (int x))))
  ([]  (dosync (commute ref-licznika inc))))

(licznik)        ; => 0
(licznik)        ; => 1
(licznik)        ; => 2

(licznik 100)    ; => 100
(licznik)        ; => 101

Przelew bankowy

Spójrzmy na przykładową implementację mechanizmu przelewu między rachunkami bankowymi z wykorzystaniem referencji transakcyjnych. Załóżmy, że mamy do dyspozycji dwa obiekty referencyjne reprezentujące rachunki bankowe: rachunek-arachunek-b. Wartościami bieżącymi tych rachunków będą mapy, zawierające odpowiednie, opisujące je elementy, włączając w to salda. Saldo wyrażane będzie liczbą zdawkowych jednostek waluty (np. groszy), aby uniknąć konieczności osobnego przeprowadzania operacji na jednostkach podstawowych i mniejszych oraz wykorzystania typów danych, które przechowują części ułamkowe.

Dodatkowo wprowadzimy też współdzieloną strukturę danych zawierającą historię operacji wykonywanych na rachunkach: wektor, do którego dopisywane będą kolejne operacje jako mapy. Każdy rachunek również będzie wyposażony w historię operacji. Jej elementami będą te same obiekty, co historii globalnej, jednak będą one przechowywane w postaci sortowanej mapy, gdzie kluczem sortowania jest znacznik czasowy raportowanej operacji.

Przykład użycia Refów do obsługi przelewów między rachunkami bankowymi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
;; 
;; dziennik operacji

(def dziennik-operacji (ref []))

;; 
;; pobieranie aktualnego czasu w milisekundach

(def aktualny-czas #(System/currentTimeMillis))

;; 
;; przeliczanie kwot na grosze i odwrotnie

(defn PLN
  ([podstawa] (* 100 podstawa))
  ([podstawa reszta] (+ (* 100 podstawa) reszta)))

(defn PLN-podziel
  [reszta]
  (let [podstawa (long (quot reszta 100))
        reszta   (long (mod  reszta 100))]
    [podstawa reszta]))

(defn w-PLN
  [reszta]
  (/ reszta 100))

;; 
;; odczytywanie numerów rachunków i sald

(defn nr-rachunku
  [rachunek]
  (:numer @rachunek))

(defn nr-rachunku-lub-nil
  [rachunek]
  (when-not (nil? rachunek) (:numer @rachunek)))

(defn rachunek-nr?
  [rachunek]
  (and
   (= clojure.lang.Ref (type rachunek))
   (integer? (:numer @rachunek))))

(defn rachunek-nr-lub-nil?
  [rachunek]
  (or (nil? rachunek) (rachunek-nr? rachunek)))

(defn saldo
  [rachunek]
  (:stan @rachunek))

(defn saldo-lub-nil
  [rachunek]
  (when-not (nil? rachunek) (:stan @rachunek)))

;; 
;; obsługa raportowania

(defn log-operacji
  [{:keys [czas rodzaj kwota tytułem komunikat]}]
  {:pre [(keyword? rodzaj)]}
  (let [kwota     (or kwota 0)
        komunikat (or komunikat tytułem)]
    {:rodzaj    rodzaj,
     :kwota     kwota,
     :komunikat komunikat}))

(defn log-konto-kasowa
  [args]
  (dosync
    (let [log (log-operacji args)
          s   (saldo-lub-nil (:rachunek args))]
      (assoc log :saldo s))))

(defn log-konto-przelew
  [kierunek args]
  (dosync
    (let [log (log-operacji args)
          r-z (:na-rachunek args)
          r-n (:z-rachunku  args)
          [r2 r op] (if (= :z kierunek)
                        [r-z r-n :przelew-wychodzący]
                        [r-n r-z :przelew-przychodzący])
          s      (saldo-lub-nil r)
          nr-2   (nr-rachunku-lub-nil r2)]
      (assoc log :rachunek nr-2 :saldo s :rodzaj op))))

(defn log-historia-kasowa
  [args]
  (dosync
    (let [nr    (nr-rachunku-lub-nil (:rachunek args))
          czas  (or (:czas args) (aktualny-czas))]
      [(assoc (log-operacji args)
              :rachunek nr
              :czas czas)])))

(defn log-historia-przelew
  [args]
  (dosync
    (let [nr-z  (nr-rachunku-lub-nil (:z-rachunku  args))
          nr-na (nr-rachunku-lub-nil (:na-rachunek args))
          czas  (or (:czas args) (aktualny-czas))]
      [(assoc (log-operacji args)
              :z-rachunku  nr-z
              :na-rachunek nr-na
              :czas czas)])))

;; 
;; obsługa operacji kasowych

(defn op-kasowa
  [op rodzaj rachunek {:keys [kwota] :as fargs}]
  {:pre [(integer?      kwota)
         (pos?          kwota)
         (keyword?      rodzaj)
         (fn?          op)
         (rachunek-nr? rachunek)]}
  (dosync
    (let [czas (aktualny-czas)
          args (assoc fargs :rodzaj rodzaj :czas czas :rachunek rachunek)]
      (alter   rachunek update-in [:stan] #(op % kwota))
      (commute rachunek assoc-in  [:historia czas] (log-konto-kasowa args))
      (commute dziennik-operacji into (log-historia-kasowa args)))
    (saldo-lub-nil rachunek)))

(defn wpłata
  [& {:keys [na-rachunek] :as args}]
  (op-kasowa + :wpłata na-rachunek (assoc args :komunikat "wpłata własna")))

(defn wypłata
  [& {:keys [z-rachunku] :as args}]
  (op-kasowa - :wypłata z-rachunku (assoc args :komunikat "wypłata własna")))

;;
;; obsługa przelewów

(defn przelew
  [& {:keys [na-rachunek z-rachunku kwota tytułem] :as fargs}]
  {:pre [(integer? kwota)
         (pos?     kwota)]}
  (dosync
    (let [czas (aktualny-czas)
          args (assoc fargs :rodzaj :przelew :czas czas)]
      (alter   z-rachunku   update-in [:stan] #(- % kwota))
      (alter   na-rachunek  update-in [:stan] #(+ % kwota))
      (commute z-rachunku   assoc-in  [:historia czas] (log-konto-przelew :z  args))
      (commute na-rachunek  assoc-in  [:historia czas] (log-konto-przelew :na args))
      (commute dziennik-operacji into (log-historia-przelew args)))
    [(saldo-lub-nil z-rachunku) (saldo-lub-nil na-rachunek)]))
;; 
;; dane rachunków

(def rachunek-a (ref {:numer 123456,
                      :stan 0,
                      :historia (sorted-map)}))

(def rachunek-b (ref {:numer 543454,
                      :stan 0,
                      :historia (sorted-map)}))

;; 
;; operacje na rachunkach

(w-PLN (wpłata  :kwota (PLN 400) :na-rachunek rachunek-a))  ; => 400
(w-PLN (wpłata  :kwota (PLN 200) :na-rachunek rachunek-b))  ; => 200
(w-PLN (wypłata :kwota (PLN 100) :z-rachunku  rachunek-b))  ; => 100 

(map w-PLN (przelew :z-rachunku  rachunek-a
                    :na-rachunek rachunek-b
                    :kwota       (PLN 200)
                    :tytułem     "za jagody"))
; => (200 300)

;;
;; stan po operacjach

@rachunek-a

; => {:numer 123456
; =>  :stan 20000
; =>  :historia {1440861812143 {:komunikat "wpłata własna"
; =>                            :kwota 40000
; =>                            :rodzaj :wpłata
; =>                            :saldo 40000}
; =>             1440861823220 {:komunikat "za jagody"
; =>                            :kwota 20000
; =>                            :rachunek 543454
; =>                            :rodzaj :przelew-wychodzący
; =>                            :saldo 20000}}}

@rachunek-b

; =>{:numer 543454
; => :stan 30000
; => :historia {1440861812223 {:komunikat "wpłata własna"
; =>                           :kwota 20000
; =>                           :rodzaj :wpłata
; =>                           :saldo 20000}
; =>            1440861812299 {:komunikat "wypłata własna"
; =>                           :kwota 10000
; =>                           :rodzaj :wypłata
; =>                           :saldo 10000}
; =>            1440861823220 {:komunikat "za jagody"
; =>                           :kwota 20000
; =>                           :rachunek 123456
; =>                           :rodzaj :przelew-przychodzący
; =>                           :saldo 30000}}

@dziennik-operacji

; => [{:czas 1440861812143
; =>   :komunikat "wpłata własna"
; =>   :kwota 40000
; =>   :rachunek 123456
; =>   :rodzaj :wpłata}
; =>  {:czas 1440861812223
; =>   :komunikat "wpłata własna"
; =>   :kwota 20000
; =>   :rachunek 543454
; =>   :rodzaj :wpłata}
; =>  {:czas 1440861812299
; =>   :komunikat "wypłata własna"
; =>   :kwota 10000
; =>   :rachunek 543454
; =>   :rodzaj :wypłata}
; =>  {:czas 1440861823220
; =>   :komunikat "za jagody"
; =>   :kwota 20000
; =>   :na-rachunek 543454
; =>   :rodzaj :przelew
; =>   :z-rachunku 123456}]

;;
;; sprawdzanie współbieżnego realizowania operacji

(w-PLN (saldo rachunek-a))  ; => 200
(w-PLN (saldo rachunek-b))  ; => 300

(future
  (Thread/sleep 500)
  (dotimes [_ 500]
    (Thread/sleep 10)
    (przelew :z-rachunku  rachunek-a
             :na-rachunek rachunek-b
             :kwota       (PLN 100 50)
             :tytułem     "za maliny")))

(future
  (Thread/sleep 500)
  (dotimes [_ 500]
    (Thread/sleep 20)
    (przelew :z-rachunku  rachunek-b
             :na-rachunek rachunek-a
             :kwota       (PLN 100 50)
             :tytułem     "za jabłka")))

(future
  (Thread/sleep 300)
  (dotimes [_ 100]
    (Thread/sleep 100)
    (przelew :z-rachunku  rachunek-a
             :na-rachunek rachunek-b
             :kwota       (PLN 10 50)
             :tytułem     "za ojczyznę")))

(future
  (Thread/sleep 200)
  (dotimes [_ 100]
    (Thread/sleep 40)
    (przelew :z-rachunku  rachunek-b
             :na-rachunek rachunek-a
             :kwota       (PLN 10 50)
             :tytułem     "za tych z Radomia")))

(future
  (dotimes [_ 200]
    (Thread/sleep 100)
    (wpłata :kwota (PLN 5 10) :na-rachunek rachunek-a)
    (wpłata :kwota (PLN 15 1) :na-rachunek rachunek-b)))

(future
  (dotimes [_ 200]
    (Thread/sleep 110)
    (wypłata :kwota (PLN 5 10) :z-rachunku rachunek-a)
    (wypłata :kwota (PLN 15 1) :z-rachunku rachunek-b)))

(Thread/sleep 22000)

(w-PLN (saldo rachunek-a))  ; => 200
(w-PLN (saldo rachunek-b))  ; => 300

;;
;; obliczamy prędkość wykonywania przelewów

(time (dotimes [n 6000]
  (przelew :z-rachunku  rachunek-b
           :na-rachunek rachunek-a
           :kwota       (PLN n 1)
           :tytułem     "za tych z Radomia")
  (przelew :z-rachunku  rachunek-a
           :na-rachunek rachunek-b
           :kwota       (PLN n 1)
           :tytułem     "za tych z Radomia")))

; >> "Elapsed time: 818.965037 msecs"

;; (ok. 12000 koordynowanych operacji między rachunkami na sekundę
;;  przy 1 rdzeniu)

Ten materiał jest częścią większej całości, którą można zobaczyć odwiedzając poniższy odnośnik:

Komentarze