Concurrency

Change Topic

Some languages support multiple threads, thread-safe data structures and cross-thread communication. Some languages also support asynchronous invocation of functions that may be waiting for an external asynchronous operation (e.g. IO, external process).

  • Async/Await
  • External Async Operations
  • Multi-Threading
  • Atomic Operations and Mutexes
  • Semaphores and inter-thread communication
  • OS signal handlers
  • Shared Memory

Go Language

Error loading typescript


Async/Await

Goroutines: The Equivalent of async Functions

Go does not have an async/await keyword like JavaScript or Python. Instead, goroutines and channels provide lightweight concurrency primitives that allow asynchronous execution and communication.

A goroutine is a function that runs concurrently. It is similar to an async function but does not require an explicit await. Instead, it runs independently from the main execution flow.

func asyncTask() { fmt.Println("Task started") time.Sleep(2 * time.Second) // Simulate work fmt.Println("Task finished") } func main() { go asyncTask() // Runs concurrently fmt.Println("Main function continues...") time.Sleep(3 * time.Second) // Wait for goroutine to finish }

Await equivalent - Channels: Communicating Between Goroutines

Go uses channels to safely share data between goroutines. They act like typed message queues, allowing goroutines to communicate without locks.

func worker(ch chan string) { ch <- "Task completed" // Send value to channel } func main() { ch := make(chan string) // Create a channel go worker(ch) // Invokes run worker goroutine msg := <-ch // Like await, wait to receive from channel fmt.Println(msg) // Output: Task completed }

Buffered vs Unbuffered Channels

Unbuffered channels (default) block until both sender and receiver are ready However, a Buffered channel allows sending without immediate receiving (up to capacity).

func main() { ch := make(chan int, 2) // Buffered channel with capacity 2 ch <- 1 // Non-blocking send ch <- 2 // Non-blocking send fmt.Println(<-ch) // 1 fmt.Println(<-ch) // 2 }

Buffered channels allow sending multiple values without immediate receiving.


External Async Operations

Asynchronous File Processing with Channels.

import ( "bufio" "fmt" "os" ) func readLines(filePath string, ch chan string) { file, err := os.Open(filePath) if err != nil { close(ch) return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { ch <- scanner.Text() // Send line to channel } close(ch) // Close channel when done } func main() { ch := make(chan string) go readLines("example.txt", ch) for line := range ch { fmt.Println(line) // Process each line asynchronously } }

Multi-Threading Support

Go has built-in support for multi-threading, but goroutines are scheduled by the Go runtime, not OS threads.

  • Goroutines run on multiple OS threads (Go uses an M scheduler).
  • The runtime dynamically adjusts the number of threads.
  • The GOMAXPROCS setting controls concurrency levels.

Example: Running Multiple Goroutines

import ( "fmt" "runtime" "sync" ) func worker(id int, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("Worker %d started\n", id) } func main() { runtime.GOMAXPROCS(4) // Use up to 4 CPU cores var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) go worker(i, &wg) } wg.Wait() // Wait for all goroutines to complete fmt.Println("All workers finished") }

Select Statement: Go’s Equivalent of Promise.race. Go provides the select statement to handle multiple channels concurrently, similar to Promise.race in JavaScript.

Example: Handling Multiple Async Tasks

import ( "fmt" "time" ) func task1(ch chan string) { time.Sleep(2 * time.Second) ch <- "Task 1 completed" } func task2(ch chan string) { time.Sleep(1 * time.Second) ch <- "Task 2 completed" } func main() { ch1 := make(chan string) ch2 := make(chan string) go task1(ch1) go task2(ch2) select { case msg := <-ch1: fmt.Println(msg) // Prints whichever task finishes first case msg := <-ch2: fmt.Println(msg) } }

Timeouts: Avoiding Blocking

To prevent blocking indefinitely, use timeouts with select.

func slowTask(ch chan string) { time.Sleep(3 * time.Second) ch <- "Slow task finished" } func main() { ch := make(chan string) go slowTask(ch) select { case msg := <-ch: fmt.Println(msg) case <-time.After(2 * time.Second): // Timeout if task takes too long fmt.Println("Timeout! Task took too long") } }

Atomic Operations and Mutexes

Go’s atomic operations provide lock-free primitives for modifying shared variables safely.

Interlocked Increment (atomic.AddInt64)

import ( "fmt" "sync/atomic" ) func main() { var counter int64 = 0 atomic.AddInt64(&counter, 1) // Atomic increment fmt.Println("Counter:", counter) }

Compare and Swap (atomic.CompareAndSwapInt64)

import ( "fmt" "sync/atomic" ) func main() { var value int64 = 42 swapped := atomic.CompareAndSwapInt64(&value, 42, 100) fmt.Println("Swapped:", swapped, "New Value:", value) }

Mutexes (sync.Mutex): - Mutexes prevent multiple goroutines from modifying a shared resource simultaneously. Basic Mutex Example:

import ( "fmt" "sync" ) var ( counter int mu sync.Mutex ) func increment(wg *sync.WaitGroup) { defer wg.Done() mu.Lock() // Acquire lock counter++ // Modify shared variable mu.Unlock() // Release lock } func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go increment(&wg) } wg.Wait() fmt.Println("Final Counter:", counter) }

Semaphores and Inter-Thread communication

sync/WaitGroup & Channels - Go does not have built-in semaphores, but they can be simulated using channels or WaitGroups.

Using Channels as a Semaphore

import ( "fmt" "time" ) func worker(id int, sem chan struct{}) { sem <- struct{}{} // Acquire semaphore fmt.Printf("Worker %d started\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d finished\n", id) <-sem // Release semaphore } func main() { sem := make(chan struct{}, 3) // Limit to 3 concurrent workers for i := 1; i <= 5; i++ { go worker(i, sem) } time.Sleep(5 * time.Second) }

OS Signal Handling

os/signal - Go can handle OS signals (e.g., SIGINT, SIGTERM) for graceful shutdown.

Handling SIGINT (Ctrl+C)

import ( "fmt" "os" "os/signal" "syscall" ) func main() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) fmt.Println("Waiting for signal...") <-sigs // Block until signal received fmt.Println("Signal received. Exiting...") }

Shared Memory Segments

Go does not have built-in shared memory like POSIX shared memory (shmget), but mmap can be used for memory-mapped files.

import ( "fmt" "golang.org/x/sys/unix" "os" "syscall" "unsafe" ) func main() { fd, err := os.OpenFile("shared_mem.dat", os.O_RDWR|os.O_CREATE, 0666) if err != nil { panic(err) } defer fd.Close() unix.Ftruncate(int(fd.Fd()), 1024) // Resize file to 1024 bytes data, err := syscall.Mmap(int(fd.Fd()), 0, 1024, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED) if err != nil { panic(err) } copy(data, "Hello from shared memory") // Write to shared memory fmt.Println(string(data[:22])) // Read from shared memory }

Using shimipc

To implement inter-process communication (IPC) using shared memory in Go, the shmipc-go library offers a high-performance solution by leveraging Linux's shared memory technology. Below is an example demonstrating how two processes can communicate using an array of structs within a shared memory segment.

First, define the struct that will be shared between processes. For example:

type MarketData struct { Symbol [10]byte // Fixed-size array for symbol Price float64 Volume int64 }

Create the Shared Memory Segment:

One process (typically the server) creates the shared memory segment and initializes the data.

import ( "encoding/binary" "fmt" "github.com/cloudwego/shmipc-go" "os" "unsafe" ) const ( segmentSize = 1024 * 1024 // 1MB arraySize = 100 // Number of MarketData structs ) func main() { // Create a shared memory listener listener, err := shmipc.NewListener("shm_segment_name", segmentSize) if err != nil { fmt.Println("Failed to create listener:", err) os.Exit(1) } defer listener.Close() // Accept a connection conn, err := listener.Accept() if err != nil { fmt.Println("Failed to accept connection:", err) os.Exit(1) } defer conn.Close() // Allocate shared memory for the array of structs dataSize := int(unsafe.Sizeof(MarketData{})) * arraySize shmData, err := conn.CreateMemoryRegion(dataSize) if err != nil { fmt.Println("Failed to create memory region:", err) os.Exit(1) } // Initialize the shared memory with sample data for i := 0; i < arraySize; i++ { offset := i * int(unsafe.Sizeof(MarketData{})) md := MarketData{ Price: float64(i) * 100.0, Volume: int64(i) * 10, } copy(md.Symbol[:], fmt.Sprintf("SYM%04d", i)) binary.Write(shmData[offset:], binary.LittleEndian, &md) } fmt.Println("Shared memory initialized with market data.") select {} // Keep the server running }

The second process (typically the client) accesses the shared memory segment to read the data.

import ( "encoding/binary" "fmt" "github.com/cloudwego/shmipc-go" "os" "unsafe" ) const ( segmentSize = 1024 * 1024 // 1MB arraySize = 100 // Number of MarketData structs ) func main() { // Create a shared memory dialer dialer, err := shmipc.NewDialer("shm_segment_name") if err != nil { fmt.Println("Failed to create dialer:", err) os.Exit(1) } defer dialer.Close() // Establish a connection conn, err := dialer.Dial() if err != nil { fmt.Println("Failed to dial:", err) os.Exit(1) } defer conn.Close() // Access the shared memory region shmData, err := conn.OpenMemoryRegion() if err != nil { fmt.Println("Failed to open memory region:", err) os.Exit(1) } // Read and print the market data from shared memory for i := 0; i < arraySize; i++ { offset := i * int(unsafe.Sizeof(MarketData{})) var md MarketData binary.Read(shmData[offset:], binary.LittleEndian, &md) fmt.Printf("Symbol: %s, Price: %.2f, Volume: %d\n", string(md.Symbol[:]), md.Price, md.Volume) } }

Explanation:

  • Struct Definition: The MarketData struct represents the data to be shared. Fixed-size arrays are used for strings to ensure consistent memory layout.
  • Shared Memory Creation: The server process creates a shared memory segment using shmipc.NewListener and initializes it with an array of MarketData structs.
  • Data Initialization: The server populates the shared memory with sample market data, ensuring that each struct is written at the correct offset.
  • Shared Memory Access: The client process connects to the shared memory segment using shmipc.NewDialer and reads the data, interpreting each segment as a MarketData struct.

Considerations:

  • Synchronization: Implement proper synchronization mechanisms to manage concurrent access to the shared memory segment, preventing data races.
  • Error Handling: Include comprehensive error handling to manage scenarios where the shared memory segment is unavailable or corrupted.
  • Memory Alignment: Ensure that the struct's memory layout is consistent across processes, especially when interfacing with programs written in different languages.

By leveraging shmipc-go, you can efficiently share complex data structures like arrays of structs between processes, facilitating high-performance inter-process communication suitable for scenarios such as market data processing.