Comments (4)
Let me get back to this in a few hours. There's some fires I'm trying to put out with coroutine resolution and then we'll also have a way to save additional row values with add()
and set an index during init with Batch(index=[...])
.
Could you give an example of the pattern you would ideally want?
from oaib.
Ah, I see what you mean now. I talked with @jvmncs about this yesterday, where we could get a response to some input and then chain them in add
with a callback that might call another batch or something. Will get back to you on this, thoughts on the pattern are welcome.
from oaib.
My goal would be able to have a function like this:
async def process_input(batch, input_data):
promise = await batch.add(input_data)
result = await promise
new_input = modify_result(result)
second_promise = await batch.add(new_input)
return await second_promise
where the dev experience is creating a process_input
function that will receive inputs assigned to it, call OpenAI via the "batch" class, process the result, call OpenAI again via batch, and return the final output. This function itself could then be called via asyncio, like:
batch = RateLimitedBatch(rpm=100, tpm=1_000, workers=5)
batch.start_workers()
inputs = [1, 2, 3]
results = await asyncio.gather(*(process_input(batch, input_data) for input_data in inputs))
print(results)
await batch.shutdown() # Shutdown workers after all tasks are processed
Here's an toy implementation of RateLimitedBatch that I pulled together so this code can run:
import asyncio
from asyncio import Queue
class RateLimitedBatch:
def __init__(self, rpm, tpm, workers):
self.queue = Queue()
self.semaphore = asyncio.Semaphore(workers)
self.workers = []
self.total_workers = 3
self.rpm = rpm
self.tpm = tpm
async def add(self, input_data):
future = asyncio.get_running_loop().create_future()
await self.queue.put((input_data, future))
return future
async def worker(self):
while True:
input_data, future = await self.queue.get()
if input_data is None: # Shutdown signal
future.set_result(None) # Optionally handle shutdown result
break
# Simulate API call with a delay
await asyncio.sleep(1) # Placeholder for real work
result = input_data * 2 # Example processing logic
future.set_result(result)
self.queue.task_done()
def start_workers(self):
for _ in range(self.total_workers):
task = asyncio.create_task(self.worker())
self.workers.append(task)
async def shutdown(self):
for _ in range(len(self.workers)): # Send shutdown signal for each worker
await self.queue.put((None, asyncio.get_running_loop().create_future()))
await asyncio.gather(*self.workers) # Wait for all workers to complete
from oaib.
@aniemerg I'm kind of tied up but it seems like this might be a higher level refactor, like you might have a better grasp on how to work this process with asyncio than I did originally writing it. Would you be open to maybe putting in a refactor to support this, and maybe reviewing existing logic and rewiring anything as appropriate?
from oaib.
Related Issues (4)
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from oaib.