Concurrency 101

07.11.2018

Но преди това...

Въпрос за мъфин #1

Какво е интерфейс?

Въпрос за мъфин #2

Как се създават нови типове?

Въпрос за мъфин #3

type A struct {
    a int
}
type B struct {
    a float64
}
type C struct {
    A
    B
}
var c C

Какво е c.a?

Отговор:
- Компилатора ще ни каже грешка защото не знае кое a имаме предвид.
- c.A.a е int
- c.B.a е float64

Въпрос за мъфин #4

По какъв начин показваме, че тип имплементира интерфейс?

Какво ще говорим днес?

Що е то конкурентност?

Магически паралелизъм?

Конкурентност

Паралелизъм

Конкурентност с/у Паралелизъм

Обяснение с малко повече gophers

CPU скорост vs. производителност

IO-bound vs. CPU-bound

Подходи за конкурентност

А как синхронизираме различните задачи?

В C ползват вилици

#include <stdio.h>

int main()
{
    printf("before\n");
    if (fork())
        printf("parent\n");
    else
        printf("child\n");
    printf("both\n");
}

Синхронизация на вилици

#include <stdio.h>
#include <unistd.h>

int main()
{
    pid_t pid = fork();
    if (pid == 0) {
        printf("child sleeping...\n");
        execl("/bin/sleep", "/bin/sleep", "2", (char *) 0);
    } else {
        waitpid(pid, NULL, 0);
    }
    printf("done!\n");
    return 0;
}

Предимства и недостатъци на fork

Против:

За:

В Go се правим на модерни

Нишки

Нишки в C

void *ticker(void *x_void_ptr) {
    while (42) {
        sleep(1);
        printf("tick\n");
    }

    return NULL;
}

int main() {
    pthread_t ticker_thread;

    if(pthread_create(&ticker_thread, NULL, ticker, NULL)) {
        fprintf(stderr, "Error creating thread\n");
        return 1;
    }

    if(pthread_join(ticker_thread, NULL)) {
        fprintf(stderr, "Error joining thread\n");
        return 2;
    }

    return 0;
}

Нишки в Python

def ticker():
    while 42:
        print("Tick!")
        time.sleep(1)

thread = threading.Thread(target=ticker)
thread.daemon = True
thread.start()

или

class TickerThread(threading.Thread):
    def run(self):
        while 42:
            print("Tick!")
            time.sleep(1)

thread = TickerThread()
thread.start()
# ...
thread.join()

Goroutines

Скучно

За да се съсредоточим върху това, което се опитваме да кажем ще дадем скучен пример.

func main() {
    boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(1 * time.Second)
    }
}

За конкурентноста тайминга е важен. Нека е малко по - непредвидим.

Малко по - малко скучно

Ще сложим случайно време за сън.

func main() {
    boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

Скучната ни програма ще продължи да работи така до безкрайност. Като много скучна лекция, от която ви е неудобно да си тръгнете.

Да я игнорираме

Скучната програма не заслужава вниманието ни, нека не я чакаме.

С go пускаме функция нормално, но пускащия няма нужда чака приключването й.

Пускаме goroutine.

func main() {
    go boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

Когато main приключи програмата спира.

Да я игнорираме малко по - малко

func main() {
    go boring("boring!")
    fmt.Println("Listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You are way too boring. I am leaving.")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

Изпълнявахме main и скучната функция едновременно.

С края на main дойде и края на скучната функция.

Какво е Goroutine

Вдъхновено от

Проблеми, свързани с нишки

От това, че имат една и съща памет, следва, че могат да достъпват едни и същи променливи

int i = 0

thread1 { i++ }
thread2 { i++ }

wait { thread1 } { thread2 }
print i

Тук i накрая може да бъде 1 или 2.

Критични секции

В Go имаме Semaphores и Message passing

Communicate by sharing vs. Share by communicating

Communicate by sharing vs. Share by communicating

Channels

Употреба на канали

intChannel := make(chan int)
ch := make(chan []string, 100)
ch <- 64
read := <-ch

IO в канал

Операциите по изпращане и получаване се изпълняват с оператора <-

Simple demo:

func main() {
    ch := make(chan string)
    go func() {
        fmt.Printf("Goroutine received: %s\n", <-ch)
        ch <- "Hello from the other side"
    }()

    ch <- "Hello, can you hear me?"
    fmt.Printf("Main received: %s", <-ch)
}

Затваряне

Канал може да бъде затворен:

close(ch)

Каналите са първокласни обекти в Go

c := make(chan chan int)
func doSomething(input chan string) {
  // do something
}
func doSomethingElse() chan string {
  result := make(chan string)
  return result
}

range

Помните ли как ви казахме, че range е нещо супер яко?

for val := range ch {
    fmt.Printf("Recieved: %#v\n", val)
}

Ограничени канали

func randomFeed(count, max int) <-chan int {
    c := make(chan int)

    go func() {
        for i := 0; i < count; i++ {
            c <- rand.Intn(max)
        }
        close(c)
    }()

    return c
}

func main() {
    feed := randomFeed(10, 100)
    for v := range feed {
        fmt.Println(v)
    }
}

Deadlock

func main() {
    c := make(chan int)
    c <- 42
    val := <-c
    println(val)
}

nil channel

Никога не използвайте неинициализиран канал!

package main

func main() {
    var c chan string
    c <- "ping" // deadlock
}
package main

import "fmt"

func main() {
    var c chan string
    fmt.Println(<-c) // deadlock
}

Пример за синхронизация

func main() {
    c := make(chan int)

    go func() {
        fmt.Println("SCV: Reportin' for duty")
        time.Sleep(2 * time.Second)
        fmt.Println("SCV: Job's finished!")
        c <- 1
    }()

    fmt.Println("Main does other time-consuming work...")
    time.Sleep(1 * time.Second)
    fmt.Println("Main is done")
    <-c

    fmt.Println("Everyone is done")
}

По-сложен пример

var sem = make(chan struct{}, MaxOutstanding)

func init() {
    for i := 0; i < MaxOutstanding; i++ {
        sem <- struct{}{}
    }
}

func handle(r *Request) {
    <-sem
    process(r)
    sem <- struct{}{}
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)
    }
}

Затваряне на канали

package main

import "fmt"

func main() {
    ch := make(chan string)

    go func(output chan string) {
        for i := 0; i < 5; i++ {
            output <- fmt.Sprintf("sending N=%d", i)
        }
        close(output)
    }(ch)

    for i := 0; i < 7; i++ {
        val, ok := <-ch
        fmt.Printf("Recieved: %#v, %#v\n", val, ok)
    }

    ch <- fmt.Sprintf("where is your towel?")
}

Домашно

Въпроси?