# rsa_dspy


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

------------------------------------------------------------------------

<a
href="https://github.com/risheekkumarb/llm_rsa/blob/main/llm_rsa/core.py#L18"
target="_blank" style="float:right; font-size:smaller">source</a>

### RSACandidate

``` python

def RSACandidate(
    id:str, loop_id:int, task_prompt:str, signature:type=None, candidates_str:str=None, response:str=None,
    parent_ids:list=None
):

```

*A candidate response in the RSA algorithm*

------------------------------------------------------------------------

<a
href="https://github.com/risheekkumarb/llm_rsa/blob/main/llm_rsa/core.py#L24"
target="_blank" style="float:right; font-size:smaller">source</a>

### RSA

``` python

def RSA(
    task_prompt:str, # The main task/question to solve
    solver:NoneType=None, # task signature
    aggregator:NoneType=None, # aggregator signature
    N:int=4, # Population size (candidates per loop)
    K:int=3, # Number of candidates to aggregate
    loops:int=2, # Number of aggregation loops
    history:list=None, # History of all candidates
):

```

*Recursive Self-Aggregation algorithm using DSPy*

``` python
a = RSA(task_prompt='A bat and ball cost $1.10 total. The bat costs $1 more than the ball. How much does the ball cost?', solver=TaskSolver, aggregator=AggregateResponses)
print(a)
```

``` python
c1 = RSACandidate(id='c1', loop_id=0, task_prompt='test', response='Answer A')
c2 = RSACandidate(id='c2', loop_id=0, task_prompt='test', response='Answer B')

print(a._mk_candidate_str([c1, c2]))
```

------------------------------------------------------------------------

<a
href="https://github.com/risheekkumarb/llm_rsa/blob/main/llm_rsa/core.py#L75"
target="_blank" style="float:right; font-size:smaller">source</a>

### RSA.get_prompts

``` python

def get_prompts(
    loop_id, cands:NoneType=None
):

```

*Generate candidate prompts for a given loop: N initial candidates, or
all C(n,K) combinations for aggregation*

``` python
# Test loop 0
cands = a.get_prompts(loop_id=0)
test_eq(len(cands), a.N)
test_eq(cands[0].signature, a.solver)
```

``` python
# Test loop 1+ (with prior candidates)
prior = L(RSACandidate(id=str(uuid.uuid4()), loop_id=0, task_prompt='test', response=f'Answer {i}') for i in range(8))
cands = a.get_prompts(loop_id=1, cands=prior)
test_eq(len(cands), a.N)
print(cands[0].task_prompt)
```

## Configuration

RSA-DSPy uses [dspy](https://dspy.ai/) for LLM calls. Configure your LM
globally:

``` python
dspy.configure(lm=dspy.LM('openrouter/google/gemini-3-flash-preview', temperature=1.0))
```

See [DSPy’s LM documentation](https://dspy.ai/learn/language_models/)
for supported providers.

``` python
dspy.configure(lm=dspy.LM('openrouter/google/gemini-3-flash-preview', temperature=1.0, cache=False))

cands = a._run_loop(loop_id=0)
test_eq(len(cands), a.N)
assert all(c.response is not None for c in cands)
assert cands[0].response != cands[1].response
```

``` python
cands[0].response
```

------------------------------------------------------------------------

<a
href="https://github.com/risheekkumarb/llm_rsa/blob/main/llm_rsa/core.py#L92"
target="_blank" style="float:right; font-size:smaller">source</a>

### RSA.run

``` python

def run(
    
):

```

*Run the full RSA algorithm for the configured number of loops and
return the final candidate pool*

``` python
a = RSA(task_prompt='A bat and ball cost $1.10 total. The bat costs $1 more than the ball. How much does the ball cost?', solver=TaskSolver, aggregator=AggregateResponses, loops=2)
result = a.run()
print(f"Final pool: {len(result)}, History: {len(a.history)}")
```

------------------------------------------------------------------------

<a
href="https://github.com/risheekkumarb/llm_rsa/blob/main/llm_rsa/rsa_dspy.py#L91"
target="_blank" style="float:right; font-size:smaller">source</a>

### RSA.final_aggregate

``` python

def final_aggregate(
    method:str='llm', signature:NoneType=None
):

```

*Final aggregation: one LLM call to aggregate all final loop candidates*

``` python
# Test 'llm' aggregation
result = a.final_aggregate(method='llm')
assert isinstance(result, dspy.Prediction)
assert len(result.response) > 0
print(result)

# Test 'random' aggregation
result = a.final_aggregate(method='random')
assert isinstance(result, dspy.Prediction)
assert len(result.response) > 0
```
