If it won't be simple, it simply won't be. [Hire me, source code] by Miki Tebeka, CEO, 353Solutions

Friday, September 16, 2016

Simple Object Pools

Sometimes we need object pools to limit the number of resource consumed. The most common example is database connnections.

In Go we sometime use a buffered channel as a simple object pool.

// Pool in go using buffer channels
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// worker simulates work of a goroutine
func worker(id int, pool chan int, start chan bool, wg *sync.WaitGroup) {
<-start // Wait for all goroutines
rsc := <-pool // Get item from the pool
defer func() { pool <- rsc }() // Return item at end
defer wg.Done() // Signal we're dong
time.Sleep(time.Duration(rand.Int()%100) * time.Millisecond)
fmt.Printf("worker %d got resource %d\n", id, rsc)
}
func main() {
var wg sync.WaitGroup
start := make(chan bool)
// Create and fill pool
pool := make(chan int, 3)
for i := 0; i < 3; i++ {
pool <- i
}
// Run workers
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(i, pool, start, &wg)
}
close(start) // Signal to start
wg.Wait() // Wait for goroutines to finish before exiting
}
view raw pool.go hosted with ❤ by GitHub
In Python, we can dome something similar with a Queue. Python's context manager makes the resource handing automatic so clients don't need to remember to return the object.


"""Simple Pool object"""
from queue import Queue
class Proxy:
"""Wraps original object with context manager that return the object to the
pool."""
def __init__(self, obj, pool):
self._obj = obj
self._pool = pool
def __enter__(self):
return self._obj
def __exit__(self, typ, val, tb):
self._pool._put(self._obj)
class Pool:
"""Pool of objects"""
def __init__(self, objects):
self._queue = Queue()
for obj in objects:
self._queue.put(obj)
def lease(self):
"""Lease an object from the pool, should be used as contect manger. e.g.:
with pool.lease() as conn:
cur = conn.cursor()
cur.execute('SELECT ...')
"""
return Proxy(self._queue.get(), self)
def _put(self, obj):
self._queue.put(obj)
if __name__ == '__main__':
from threading import Thread, Barrier
from time import sleep
from random import random
n = 10
b = Barrier(n)
p = Pool([1, 2, 3])
def worker(n, barrier, pool):
barrier.wait() # Wait for all threads to be ready
sleep(random() / 10)
with pool.lease() as val:
print('worker %d got resource %d' % (n, val))
for i in range(n):
Thread(target=worker, args=(i, b, p)).start()
view raw pool.py hosted with ❤ by GitHub
Here's the output of both programs:


$ go run pool.go
worker 7 got resource 0
worker 0 got resource 2
worker 3 got resource 1
worker 8 got resource 2
worker 1 got resource 0
worker 9 got resource 1
worker 5 got resource 1
worker 4 got resource 0
worker 2 got resource 2
worker 6 got resource 1

$ python pool.py
worker 5 got resource 1
worker 8 got resource 2
worker 1 got resource 3
worker 4 got resource 1
worker 0 got resource 2
worker 7 got resource 3
worker 6 got resource 1
worker 3 got resource 2
worker 9 got resource 3
worker 2 got resource 1

Blog Archive