Giter VIP home page Giter VIP logo

aiohttp-rpc's Introduction

aiohttp-rpc

PyPI PyPI - Python Version Scrutinizer Code Quality GitHub Issues Gitter License

A library for a simple integration of the JSON-RPC 2.0 protocol to a Python application using aiohttp. The motivation is to provide a simple, fast and reliable way to integrate the JSON-RPC 2.0 protocol into your application on the server and/or client side.

The library has only one dependency:

  • aiohttp - Async http client/server framework

Table Of Contents

Installation

pip

pip install aiohttp-rpc

Usage

HTTP Server Example

from aiohttp import web
import aiohttp_rpc


def echo(*args, **kwargs):
    return {
        'args': args,
        'kwargs': kwargs,
    }

# If the function has rpc_request in arguments, then it is automatically passed
async def ping(rpc_request):
    return 'pong'


if __name__ == '__main__':
    aiohttp_rpc.rpc_server.add_methods([
        ping,
        echo,
    ])

    app = web.Application()
    app.router.add_routes([
        web.post('/rpc', aiohttp_rpc.rpc_server.handle_http_request),
    ])
    web.run_app(app, host='0.0.0.0', port=8080)

HTTP Client Example

import aiohttp_rpc
import asyncio

async def run():
    async with aiohttp_rpc.JsonRpcClient('http://0.0.0.0:8080/rpc') as rpc:
        print('#1', await rpc.ping())
        print('#2', await rpc.echo('one', 'two'))
        print('#3', await rpc.call('echo', three='3'))
        print('#4', await rpc.notify('echo', 123))
        print('#5', await rpc.get_methods())
        print('#6', await rpc.batch([
            ['echo', 2], 
            'echo2',
            'ping',
        ]))

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

This prints:

#1 pong
#2 {'args': ['one', 'two'], 'kwargs': {}}
#3 {'args': [], 'kwargs': {'three': '3'}}
#4 None
#5 {'get_method': {'doc': None, 'args': ['name'], 'kwargs': []}, 'get_methods': {'doc': None, 'args': [], 'kwargs': []}, 'ping': {'doc': None, 'args': ['rpc_request'], 'kwargs': []}, 'echo': {'doc': None, 'args': [], 'kwargs': []}}
#6 ({'args': [2], 'kwargs': {}}, JsonRpcError(-32601, 'The method does not exist / is not available.'), 'pong')

back to top


โ†‘ This is enough to start ๐Ÿ˜Ž โ†‘


Integration

The purpose of this library is to simplify life, and not vice versa. And so, when you start adding existing functions, some problems may arise.

Existing functions can return objects that are not serialized, but this is easy to fix. You can write own json_serialize:

from aiohttp import web
import aiohttp_rpc
import uuid
import json
from dataclasses import dataclass
from functools import partial

@dataclass
class User:  # The object that is not serializable.
    uuid: uuid.UUID
    username: str = 'mike'
    email: str = '[email protected]'

async def get_user_by_uuid(user_uuid) -> User:
    # Some function which returns not serializable object.
    # For example, data may be taken from a database.
    return User(uuid=uuid.UUID(user_uuid))


def json_serialize_unknown_value(value):
    if isinstance(value, User):
        return {
            'uuid': str(value.uuid),
            'username': value.username,
            'email': value.email,
        }

    return repr(value)

if __name__ == '__main__':
    rpc_server = aiohttp_rpc.JsonRpcServer(
        json_serialize=partial(json.dumps, default=json_serialize_unknown_value),
    )
    rpc_server.add_method(get_user_by_uuid)
    
    app = web.Application()
    app.router.add_routes([
        web.post('/rpc', rpc_server.handle_http_request),
    ])
    web.run_app(app, host='0.0.0.0', port=8080)
...

"""
Example of response:
{
    "id": 1,
    "jsonrpc": "2.0",
    "result": {
        "uuid": "600d57b3-dda8-43d0-af79-3e81dbb344fa",
        "username": "mike",
        "email": "[email protected]"
    }
}
"""

But you can go further. If you want to use functions that accept custom types, then you can do something like this:

# The function (RPC method) that takes a custom type.
def generate_user_token(user: User):
    return f'token-{str(user.uuid).split("-")[0]}'

async def replace_type(data):
    if not isinstance(data, dict) or '__type__' not in data:
        return data

    if data['__type__'] == 'user':
        return await get_user_by_uuid(data['uuid'])

    raise aiohttp_rpc.errors.InvalidParams

# The middleware that converts types
async def type_conversion_middleware(request, handler):
    request.set_args_and_kwargs(
        args=[await replace_type(arg) for arg in request.args],
        kwargs={key: await replace_type(value) for key, value in request.kwargs.items()},
    )
    return await handler(request)


rpc_server = aiohttp_rpc.JsonRpcServer(middlewares=[
    aiohttp_rpc.middlewares.exception_middleware,
    aiohttp_rpc.middlewares.extra_args_middleware,
    type_conversion_middleware,
])

"""
Request:
{
    "id": 1234,
    "jsonrpc": "2.0",
    "method": "generate_user_token",
    "params": [{"__type__": "user", "uuid": "600d57b3-dda8-43d0-af79-3e81dbb344fa"}]
}

Response:
{
    "id": 1234,
    "jsonrpc": "2.0",
    "result": "token-600d57b3"
}
"""

Middleware allows you to replace arguments, responses, and more.

If you want to add permission checking for each method, then you can override the class JsonRpcMethod or use middleware.

back to top


Middleware

Middleware is used for RPC Request / RPC Response processing. It has a similar interface as aiohttp middleware.

import aiohttp_rpc
import typing

async def simple_middleware(request: aiohttp_rpc.JsonRpcRequest, handler: typing.Callable) -> aiohttp_rpc.JsonRpcResponse:
    # Code to be executed for each RPC request before
    # the method (and later middleware) are called.

    response = await handler(request)

    # Code to be executed for each RPC request / RPC response after
    # the method is called.

    return response

rpc_server = aiohttp_rpc.JsonRpcServer(middlewares=[
     aiohttp_rpc.middlewares.exception_middleware,
     simple_middleware,
])

Or use aiohttp middlewares to process web.Request/web.Response.

back to top


WebSockets

WS Server Example

from aiohttp import web
import aiohttp_rpc


async def ping(rpc_request):
    return 'pong'


if __name__ == '__main__':
    rpc_server = aiohttp_rpc.WsJsonRpcServer(
        middlewares=aiohttp_rpc.middlewares.DEFAULT_MIDDLEWARES,
    )
    rpc_server.add_method(ping)

    app = web.Application()
    app.router.add_routes([
        web.get('/rpc', rpc_server.handle_http_request),
    ])
    app.on_shutdown.append(rpc_server.on_shutdown)
    web.run_app(app, host='0.0.0.0', port=8080)

WS Client Example

import aiohttp_rpc
import asyncio

async def run():
    async with aiohttp_rpc.WsJsonRpcClient('http://0.0.0.0:8080/rpc') as rpc:
        print(await rpc.ping())
        print(await rpc.notify('ping'))
        print(await rpc.batch([
            ['echo', 2], 
            'echo2',
            'ping',
        ]))

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

back to top


API Reference

server

  • class JsonRpcServer(BaseJsonRpcServer)

    • def __init__(self, *, json_serialize=json_serialize, middlewares=(), methods=None)
    • def add_method(self, method, *, replace=False) -> JsonRpcMethod
    • def add_methods(self, methods, replace=False) -> typing.List[JsonRpcMethod]
    • def get_method(self, name) -> Optional[Mapping]
    • def get_methods(self) -> Mapping[str, Mapping]
    • async def handle_http_request(self, http_request: web.Request) -> web.Response
  • class WsJsonRpcServer(BaseJsonRpcServer)

  • rpc_server: JsonRpcServer

client

  • class JsonRpcClient(BaseJsonRpcClient)

    • async def connect(self)
    • async def disconnect(self)
    • async def call(self, method: str, *args, **kwargs)
    • async def notify(self, method: str, *args, **kwargs)
    • async def batch(self, methods])
    • async def batch_notify(self, methods)
  • class WsJsonRpcClient(BaseJsonRpcClient)

protocol

  • class JsonRpcRequest

    • id: Union[int, str, None]
    • method: str
    • jsonrpc: str
    • extra_args: MutableMapping
    • context: MutableMapping
    • params: Any
    • args: Optional[Sequence]
    • kwargs: Optional[Mapping]
    • is_notification: bool
  • class JsonRpcResponse

    • id: Union[int, str, None]
    • jsonrpc: str
    • result: Any
    • error: Optional[JsonRpcError]
    • context: MutableMapping
  • class JsonRpcMethod(BaseJsonRpcMethod)

    • def __init__(self, func, *, name=None, add_extra_args=True, prepare_result=None)
  • class JsonRpcUnlinkedResults

  • class JsonRpcDuplicatedResults

decorators

  • def rpc_method(*, rpc_server=default_rpc_server, name=None, add_extra_args=True)

errors

  • class JsonRpcError(RuntimeError)
  • class ServerError(JsonRpcError)
  • class ParseError(JsonRpcError)
  • class InvalidRequest(JsonRpcError)
  • class MethodNotFound(JsonRpcError)
  • class InvalidParams(JsonRpcError)
  • class InternalError(JsonRpcError)
  • DEFAULT_KNOWN_ERRORS

middlewares

  • async def extra_args_middleware(request, handler)
  • async def exception_middleware(request, handler)
  • DEFAULT_MIDDLEWARES

utils

  • def json_serialize(*args, **kwargs)

constants

  • NOTHING
  • VERSION_2_0

back to top


More examples

The library allows you to add methods in many ways:

import aiohttp_rpc

def ping_1(rpc_request): return 'pong 1'
def ping_2(rpc_request): return 'pong 2'
def ping_3(rpc_request): return 'pong 3'

rpc_server = aiohttp_rpc.JsonRpcServer()
rpc_server.add_method(ping_1)  # 'ping_1'
rpc_server.add_method(aiohttp_rpc.JsonRpcMethod(ping_2))  # 'ping_2'
rpc_server.add_method(aiohttp_rpc.JsonRpcMethod(ping_3, name='third_ping'))  # 'third_ping'
rpc_server.add_methods([ping_3])  # 'ping_3'

# Replace method
rpc_server.add_method(ping_1, replace=True)  # 'ping_1'
rpc_server.add_methods([ping_1, ping_2], replace=True)  # 'ping_1', 'ping_2'

Example with built-in functions:

# Server
import aiohttp_rpc

rpc_server = aiohttp_rpc.JsonRpcServer(middlewares=[aiohttp_rpc.middlewares.extra_args_middleware])
rpc_server.add_method(sum)
rpc_server.add_method(aiohttp_rpc.JsonRpcMethod(zip, prepare_result=list))
...

# Client
async with aiohttp_rpc.JsonRpcClient('/rpc') as rpc:
    assert await rpc.sum([1, 2, 3]) == 6
    assert await rpc.zip(['a', 'b'], [1, 2]) == [['a', 1], ['b', 2]]

Example with the decorator:

import aiohttp_rpc
from aiohttp import web

@aiohttp_rpc.rpc_method()
def echo(*args, **kwargs):
    return {
        'args': args,
        'kwargs': kwargs,
    }

if __name__ == '__main__':
    app = web.Application()
    app.router.add_routes([
        web.post('/rpc', aiohttp_rpc.rpc_server.handle_http_request),
    ])
    web.run_app(app, host='0.0.0.0', port=8080)

It is possible to pass params into aiohttp request via direct_call/direct_batch:

import aiohttp_rpc

jsonrpc_request = aiohttp_rpc.JsonRpcRequest(method_name='test', params={'test_value': 1})
async with aiohttp_rpc.JsonRpcClient('/rpc') as rpc:
    await rpc.direct_call(jsonrpc_request, headers={'My-Customer-Header': 'custom value'}, timeout=10)

back to top


License

MIT

aiohttp-rpc's People

Contributors

expert-m avatar kaveshnikov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

aiohttp-rpc's Issues

Problem/Solution: JSONRPC over HTTPS with self-signed certs

aiohttp-rpc framework is great, but doesn't seem to provide easy way to support self-signed certs and sessions (or user-provided SSLContext objects, or user provided CA bundle files).

To do this, I've needed to subclass off JsonRpcClient, to:

  • accept a constructor arg to provide custom SSL info
  • override .connect() to use a subclass of aiohttp.ClientSession
  • override .call() to capture session key after successful login

Also, subclass aiohttp.ClientSession to:

  • override .post() to inject ssl keyword (for user-provided custom SSL info), plus session key header

With these mods, I can now use self-signed certs on the server and validate them in the client with an SSLContext based on the server's CA public key

This is working, but it took me a fair effort to get it solid. I'd recommend allowing for a constructor arg for aiohttp_rpc.JsonRpcServer to allow users to provide their own SSL objects, to cover for cases of private connections with self-signed certs.

I'm happy to rework the company-proprietary code I did for this and send in proof of concept if needed.

Cheers
David

Mutating state of started aiohttp.web.Application is deprecated

I migrated one of my codebases to aiohttp-rpc today and had pytest emit the following warning:

/[...]/aiohttp_rpc/server/websocket.py:22: DeprecationWarning: Changing state of started or joined application is deprecated
  http_request.app[self.ws_state_key] = weakref.WeakSet()

I aso do not expect that a library should interact with the app dictionary at all - a user could conceivably store something in app["rcp_websockets"] which aiohttp-rpc would then overwrite.

Provide a way to send notification from the server

To deliver notifications from the server to the client, it's conventional for the server to act as a client by sending an id-less request to the client - the JSON-RPC spec defines a client as the origin of request and the server as the origin of response objects and either node can fill either one or both of those roles. Would you consider providing a way for the server to send Request objects to the client?

Custom aiohttp POST params

Hi, what do you think about possibility to pass params in aiohttp post request?
E.g, I want to pass my custom unique per request header or timeout.
I suppose, we can start from direct_call and send_json.
They can be modified this way:

async def direct_call(self, rpc_request: JsonRpcRequest, **kwargs) -> JsonRpcResponse:

and

async def send_json(self,
                        data: typing.Any, *,
                        without_response: bool = False, **kwargs) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
        merged_kwargs = merge_kwargs(self.request_kwargs, kwargs) (IMO, kwargs should be prioritized)
        http_response = await self.session.post(self.url, json=data, **merged_kwargs)

The idea is, if you need some request customization, you will use direct_call, otherwise call.

WsJsonRpcClient callers not receiving ConnectionResetError

Within a WsJsonRpcClient async context, if an underlying socket connection fails while one or more tasks' API calls are in progress (ie, taking a few seconds, with either a long timeout or no timeout) then all these waiting tasks just hang in limbo, and don't see any exceptions.

It seems there's no mechanism for detecting the connection has died, and then calling .notify_all_about_error(exc) to throw the relevant exception up the chain for all of the current pending callers.

Is there any way to ensure access to all relevant exceptions for tasks calling remote API methods on a WsJsonRpcClient websocket connection?

PEP 561 compatibility

Hello.
Thank you for your library. It saves us much time for RPC-API implementation. Unfortunately, It is not compatible with PEP 561, so we cannot use its type hints for type checking with mypy.

I can make a PR to fix this if you don't mind.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.