Development Guide

Threading Model

Pillar aims to be highly multi-threaded, treating Plugins like Automata that interact with resources relevant to them to accomplish goals by communicating with different parts of Pillar. To this end, Pillar has a standard abstraction for workers based off of multiprocess.Process. The idea behind this, is that any process or plugin within pillar will inherit a mix-in interface specific to the worker class it needs to interface with.

As an example, the KeyManager class inherits the PillarDBMixIn class to allow for KeyManager to load and save keys to the database.

The most complete, and simplest, example of this can be found in the IPFSWorkerModule:

import aioipfs
from .multiproc import PillarThreadMixIn, \
    PillarThreadMethodsRegister, \
    PillarWorkerThread
import multiprocessing as mp
import logging

ipfs_worker_register = PillarThreadMethodsRegister()


class IPFSClient:

    def __init__(self, aioipfs_config: dict = None):
        self.aioipfs_config = aioipfs_config or {}

    async def get_file(self, cid: str, dstdir='.') -> None:
        client = self.get_client()
        await client.get(cid, dstdir)
        await client.close()

    async def add_file(self, *files: str, return_client=False, **kwargs, ):
        client = self.get_client()
        await client.add(*files, **kwargs)
        await client.close()
        if return_client:
            return client

    async def add_str(self, *args: str, **kwargs):
        client = self.get_client()
        result = await client.add_str(*args, **kwargs)
        await client.close()
        return result

    async def send_pubsub_message(self, queue_id: str, message: str) -> None:
        client = self.get_client()
        await client.pubsub.pub(queue_id, message)
        await client.close()

    async def get_pubsub_message(self, queue_id: str) -> str:
        client = self.get_client()
        async for message in client.pubsub.sub(queue_id):
            await client.close()
            yield message

    async def get_id(self) -> dict:
        client = self.get_client()
        id = await client.core.id()
        await client.close()
        return id

    def get_client(self) -> aioipfs.AsyncIPFS:
        return aioipfs.AsyncIPFS(**self.aioipfs_config)


class IPFSWorker(PillarWorkerThread):
    methods_register = ipfs_worker_register

    def __init__(self,
                 command_queue: mp.Queue,
                 output_queue: mp.Queue,
                 ipfs_client: IPFSClient = None):
        self.command_queue = command_queue
        self.output_queue = output_queue
        super().__init__()
        self.ipfs_client = ipfs_client or IPFSClient()
        self.logger = logging.getLogger(self.__repr__())

    @ipfs_worker_register.register_method
    async def get_file(self, cid: str, dstdir='.') -> None:
        return await self.ipfs_client.get_file(cid, dstdir=dstdir)

    @ipfs_worker_register.register_method
    async def add_str(self, *args: str, **kwargs):
        return await self.ipfs_client.add_str(*args, **kwargs)

    @ipfs_worker_register.register_method
    async def add_file(self, *files: str, **kwargs):
        return await self.ipfs_client.add_file(*files, **kwargs)

    def __repr__(self):
        return "<IPFSWorker>"


class IPFSMixIn(PillarThreadMixIn):
    queue_thread_class = IPFSWorker
    interface_name = "ipfs"

By adding MixIn interfaces that inherit the above classes to Plugins as well as other Pillar internal classes, threading can be accomplished in a consistent and transparent manner.