Client Quickstart

Most common tasks you’ll need to perform in asyncio against a hpfeeds broker can be accomplished with an instance of ClientSession.

Publishing an event

Usage example:

import asyncio
from hpfeeds.asyncio import ClientSession


async def main():
    async with ClientSession('localhost', 20000, 'ident', 'secret') as client:
        client.publish('channel', b'{"data": "fefefefefefef"}')


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Listening to events

You can just async for over your client to read from the broker forever:

import asyncio
from hpfeeds.asyncio import ClientSession


async def main():
    async with ClientSession('localhost', 20000, 'ident', 'secret') as client:
        client.subscribe('channel')

        async for ident, channel, payload in client:
            print(payload)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Publishing events from an asynchronous iterator

You can now construct asynchronous generators in Python 3, and then have hpfeeds publish directly from the iterator:

import asyncio

async def test_iterator():
    while True:
        wait asyncio.sleep(1)
        yield b'payload'

async def main():
    async with ClientSession('localhost', 20000, 'ident', 'secret') as client:
        client.publish_async_iterable('channel', test_iterator())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Using hpfeeds with aiostream

aiostream is like an asynchronous version of itertools.

For example, you could merge together the output from multiple brokers, perform transformations on it and send it into another:

import asyncio

from asyncio.stream import iterable, merge
from hpfeeds.asyncio import ClientSession

async def main():
    brokers = []
    for port in (10000, 10001, 10002):
        session = ClientSession('localhost', 10000, 'ident', 'secret')
        session.subscribe('in-channel')
        brokers.append(session)

    pipeline = (
        # Merge feed from multiple brokers
        merge(*brokers) |

        # Decode JSON payload
        map(lambda ident, channel, payload: json.loads(payload.decode('utf-8'))) |

        # Only interested in events that have hashes associated with them
        filter(lambda payload: len(payload['hashes']) > 0) |

        # Reencode payload for transmission
        map(lambda payload: json.dumps(payload).encode('utf-8'))
    )

    output = ClientSession('localhost', 10004, 'ident', 'secret')
    await output.publish_async_iterable('out-channel', combined)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())