In Go we sometime use a buffered channel as a simple object pool.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Pool in go using buffer channels | |
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"sync" | |
"time" | |
) | |
// worker simulates work of a goroutine | |
func worker(id int, pool chan int, start chan bool, wg *sync.WaitGroup) { | |
<-start // Wait for all goroutines | |
rsc := <-pool // Get item from the pool | |
defer func() { pool <- rsc }() // Return item at end | |
defer wg.Done() // Signal we're dong | |
time.Sleep(time.Duration(rand.Int()%100) * time.Millisecond) | |
fmt.Printf("worker %d got resource %d\n", id, rsc) | |
} | |
func main() { | |
var wg sync.WaitGroup | |
start := make(chan bool) | |
// Create and fill pool | |
pool := make(chan int, 3) | |
for i := 0; i < 3; i++ { | |
pool <- i | |
} | |
// Run workers | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go worker(i, pool, start, &wg) | |
} | |
close(start) // Signal to start | |
wg.Wait() // Wait for goroutines to finish before exiting | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple Pool object""" | |
from queue import Queue | |
class Proxy: | |
"""Wraps original object with context manager that return the object to the | |
pool.""" | |
def __init__(self, obj, pool): | |
self._obj = obj | |
self._pool = pool | |
def __enter__(self): | |
return self._obj | |
def __exit__(self, typ, val, tb): | |
self._pool._put(self._obj) | |
class Pool: | |
"""Pool of objects""" | |
def __init__(self, objects): | |
self._queue = Queue() | |
for obj in objects: | |
self._queue.put(obj) | |
def lease(self): | |
"""Lease an object from the pool, should be used as contect manger. e.g.: | |
with pool.lease() as conn: | |
cur = conn.cursor() | |
cur.execute('SELECT ...') | |
""" | |
return Proxy(self._queue.get(), self) | |
def _put(self, obj): | |
self._queue.put(obj) | |
if __name__ == '__main__': | |
from threading import Thread, Barrier | |
from time import sleep | |
from random import random | |
n = 10 | |
b = Barrier(n) | |
p = Pool([1, 2, 3]) | |
def worker(n, barrier, pool): | |
barrier.wait() # Wait for all threads to be ready | |
sleep(random() / 10) | |
with pool.lease() as val: | |
print('worker %d got resource %d' % (n, val)) | |
for i in range(n): | |
Thread(target=worker, args=(i, b, p)).start() |
$ go run pool.go
worker 7 got resource 0
worker 0 got resource 2
worker 3 got resource 1
worker 8 got resource 2
worker 1 got resource 0
worker 9 got resource 1
worker 5 got resource 1
worker 4 got resource 0
worker 2 got resource 2
worker 6 got resource 1
$ python pool.py
worker 5 got resource 1
worker 8 got resource 2
worker 1 got resource 3
worker 4 got resource 1
worker 0 got resource 2
worker 7 got resource 3
worker 6 got resource 1
worker 3 got resource 2
worker 9 got resource 3
worker 2 got resource 1