Development Guide¶
Threading Model¶
Pillar aims to be highly multi-threaded, treating Plugins like Automata
that interact with resources relevant to them to accomplish goals by
communicating with different parts of Pillar. To this end, Pillar has a
standard abstraction for workers based off of multiprocess.Process. The
idea behind this, is that any process or plugin within pillar will inherit
a mix-in interface specific to the worker class it needs to interface with.
As an example, the KeyManager class inherits the PillarDBMixIn class
to allow for KeyManager to load and save keys to the database.
The most complete, and simplest, example of this can be found in the IPFSWorkerModule:
import aioipfs
from .multiproc import PillarThreadMixIn, \
PillarThreadMethodsRegister, \
PillarWorkerThread
import multiprocessing as mp
import logging
ipfs_worker_register = PillarThreadMethodsRegister()
class IPFSClient:
def __init__(self, aioipfs_config: dict = None):
self.aioipfs_config = aioipfs_config or {}
async def get_file(self, cid: str, dstdir='.') -> None:
client = self.get_client()
await client.get(cid, dstdir)
await client.close()
async def add_file(self, *files: str, return_client=False, **kwargs, ):
client = self.get_client()
await client.add(*files, **kwargs)
await client.close()
if return_client:
return client
async def add_str(self, *args: str, **kwargs):
client = self.get_client()
result = await client.add_str(*args, **kwargs)
await client.close()
return result
async def send_pubsub_message(self, queue_id: str, message: str) -> None:
client = self.get_client()
await client.pubsub.pub(queue_id, message)
await client.close()
async def get_pubsub_message(self, queue_id: str) -> str:
client = self.get_client()
async for message in client.pubsub.sub(queue_id):
await client.close()
yield message
async def get_id(self) -> dict:
client = self.get_client()
id = await client.core.id()
await client.close()
return id
def get_client(self) -> aioipfs.AsyncIPFS:
return aioipfs.AsyncIPFS(**self.aioipfs_config)
class IPFSWorker(PillarWorkerThread):
methods_register = ipfs_worker_register
def __init__(self,
command_queue: mp.Queue,
output_queue: mp.Queue,
ipfs_client: IPFSClient = None):
self.command_queue = command_queue
self.output_queue = output_queue
super().__init__()
self.ipfs_client = ipfs_client or IPFSClient()
self.logger = logging.getLogger(self.__repr__())
@ipfs_worker_register.register_method
async def get_file(self, cid: str, dstdir='.') -> None:
return await self.ipfs_client.get_file(cid, dstdir=dstdir)
@ipfs_worker_register.register_method
async def add_str(self, *args: str, **kwargs):
return await self.ipfs_client.add_str(*args, **kwargs)
@ipfs_worker_register.register_method
async def add_file(self, *files: str, **kwargs):
return await self.ipfs_client.add_file(*files, **kwargs)
def __repr__(self):
return "<IPFSWorker>"
class IPFSMixIn(PillarThreadMixIn):
queue_thread_class = IPFSWorker
interface_name = "ipfs"
By adding MixIn interfaces that inherit the above classes to Plugins as well as other Pillar internal classes, threading can be accomplished in a consistent and transparent manner.