Giter VIP home page Giter VIP logo

Comments (4)

ctjlewis avatar ctjlewis commented on June 12, 2024

Let me get back to this in a few hours. There's some fires I'm trying to put out with coroutine resolution and then we'll also have a way to save additional row values with add() and set an index during init with Batch(index=[...]).

Could you give an example of the pattern you would ideally want?

from oaib.

ctjlewis avatar ctjlewis commented on June 12, 2024

Ah, I see what you mean now. I talked with @jvmncs about this yesterday, where we could get a response to some input and then chain them in add with a callback that might call another batch or something. Will get back to you on this, thoughts on the pattern are welcome.

from oaib.

aniemerg avatar aniemerg commented on June 12, 2024

My goal would be able to have a function like this:

async def process_input(batch, input_data):
    promise = await batch.add(input_data)
    result = await promise
    new_input = modify_result(result)
    second_promise = await batch.add(new_input)
    return await second_promise

where the dev experience is creating a process_input function that will receive inputs assigned to it, call OpenAI via the "batch" class, process the result, call OpenAI again via batch, and return the final output. This function itself could then be called via asyncio, like:

    batch = RateLimitedBatch(rpm=100, tpm=1_000, workers=5)
    batch.start_workers()  
    inputs = [1, 2, 3]
    results = await asyncio.gather(*(process_input(batch, input_data) for input_data in inputs))
    print(results)

    await batch.shutdown()  # Shutdown workers after all tasks are processed

Here's an toy implementation of RateLimitedBatch that I pulled together so this code can run:

import asyncio
from asyncio import Queue

class RateLimitedBatch:
    def __init__(self, rpm, tpm, workers):
        self.queue = Queue()
        self.semaphore = asyncio.Semaphore(workers)
        self.workers = []
        self.total_workers = 3
        self.rpm = rpm
        self.tpm = tpm

    async def add(self, input_data):
        future = asyncio.get_running_loop().create_future()
        await self.queue.put((input_data, future))
        return future

    async def worker(self):
        while True:
            input_data, future = await self.queue.get()
            if input_data is None:  # Shutdown signal
                future.set_result(None)  # Optionally handle shutdown result
                break
            # Simulate API call with a delay
            await asyncio.sleep(1)  # Placeholder for real work
            result = input_data * 2  # Example processing logic
            future.set_result(result)
            self.queue.task_done()

    def start_workers(self):
        for _ in range(self.total_workers):
            task = asyncio.create_task(self.worker())
            self.workers.append(task)

    async def shutdown(self):
        for _ in range(len(self.workers)):  # Send shutdown signal for each worker
            await self.queue.put((None, asyncio.get_running_loop().create_future()))
        await asyncio.gather(*self.workers)  # Wait for all workers to complete

from oaib.

ctjlewis avatar ctjlewis commented on June 12, 2024

@aniemerg I'm kind of tied up but it seems like this might be a higher level refactor, like you might have a better grasp on how to work this process with asyncio than I did originally writing it. Would you be open to maybe putting in a refactor to support this, and maybe reviewing existing logic and rewiring anything as appropriate?

from oaib.

Related Issues (4)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.