EchoTrio
 
Loading...
Searching...
No Matches
Director.cs
Go to the documentation of this file.
1using Newtonsoft.Json;
2using Newtonsoft.Json.Linq;
3using OpenAI;
4using OpenAI.Models;
5using OpenAI.Realtime;
6using System;
7using System.Buffers;
8using System.Collections.Generic;
9using System.IO;
10using System.Threading;
11using System.Threading.Tasks;
12using Unity.Collections;
13using UnityEngine;
14using UnityEngine.Events;
15using Utilities.Audio;
16using Utilities.Encoding.Wav;
17
18namespace EchoTrio {
19 /// The director is the OpenAI Realtime model whose main function is to listen to the user's speech and decides the order in which the actors reply.
20 /// For example, Athena can speak first, or Poseidon can speak first, or only one of them replies.
21 /// The director has a secondary function of triggering a Discussion if the user mentions certain topics.
22 public class Director {
23 /// Current status of the director.
24 public enum Status {
25 /// Idling and waiting for VoiceChat system.
26 Waiting,
27 /// Listening for user input.
29 /// Replying to text input.
31 /// Replying to voice input.
33 }
34
35 /// Helper class to act as a mutex for the status value, as it may be read by multiple threads.
36 public class StatusMutex {
37 public Status value = Status.Waiting;
38 }
39
40 public class Response {
41 public string userTranscript = null;
42 public List<string> speakerOrder = null;
43 public string discussionTopic = null;
44
45 public bool IsDone => userTranscript != null && (speakerOrder != null || discussionTopic != null);
46 }
47
48 // Public Properties
49 public bool EnableDebug { get; set; } = false;
50 public bool IsMicMuted { get; set; } = false;
51 public bool IsConnected { get; private set; } = false;
52
53 // Internal Variables & Properties
54 private DirectorConfig config = null;
55 private OpenAIClient api = null;
56 private RealtimeSession session = null;
58 private UnityAction<Director.Response> onDirectorResponse = null;
59 private List<OpenAI.Tool> tools = new List<OpenAI.Tool>();
60
61 // Concurrency/Async Variables
63 private string latestItemId = string.Empty;
64
65 // Public Interface
66 public Director() {
67 // Initialise OpenAI
68 api = new OpenAIClient(Authentication.GetOpenAIAuthentication()) { EnableDebug = this.EnableDebug };
69 }
70
71 /// Initialise the director and connect to OpenAI's server.
72 /// <param name="onDirectorResponse">The callback to invoke when the director has a response ready.</param>
73 /// <param name="cancellationToken">Cancellation token used to cancel any async actions when the program shuts down.</param>
74 public void Initialise(UnityAction<Director.Response> onDirectorResponse, CancellationToken cancellationToken) {
75 // Set the callback to inform the VoiceChat system whenever a director response is ready.
76 this.onDirectorResponse = onDirectorResponse;
77
78 // Run the director session in a separate thread.
79 Func<CancellationToken, Task> run = async (CancellationToken cancellationToken) => {
80 try {
81 // Create session.
82 session = await api.RealtimeEndpoint.CreateSessionAsync(GetSessionConfiguration(), cancellationToken);
83 // Start recording user audio.
84 RecordInputAudio(session, cancellationToken);
85 // Let the VoiceChat system know we are ready to receive input.
86 IsConnected = true;
87 // Receive server response in a loop.
88 await session.ReceiveUpdatesAsync<IServerEvent>(OnServerEvent, cancellationToken);
89 } catch (Exception e) {
90 switch (e) {
91 case TaskCanceledException: break;
92 case OperationCanceledException: break;
93 default: Debug.LogException(e); break;
94 }
95 } finally {
96 session?.Dispose();
97 session = null;
98 Debug.Log("Director's session disposed.");
99 }
100 };
101 _ = run(cancellationToken);
102 }
103
104 public bool IsStatus(Status value) { lock (statusMutex) { return statusMutex.value == value; } }
105
106 /// Listens for the next user input. This needs to be invoked at the start of every round, in order to let the director prepare for user input.
107 /// Before invoking, you need to check that the status is Waiting.
108 /// <param name="config">The director configuration.</param>
109 /// <param name="speakers">The list of actors that are possibly speaking. The speaker order will be determined by choosing actors from this list.</param>
110 /// <param name="topics">Possible discussion topics to be triggered by the user input.</param>
111 /// <param name="cancellationToken">Cancellation token used to cancel any async actions when the program shuts down.</param>
112 public async void ListenForNextUserInput(DirectorConfig config, List<string> speakers, List<string> topics, CancellationToken cancellationToken) {
113 if (!IsStatus(Status.Waiting)) {
114 throw new System.Exception("Director.ListenForNextInput can only be invoked if the status is Waiting!");
115 }
116
117 // Update director session configuration.
118 this.config = config;
119 this.tools = new List<Tool>() { BuildTriggerResponseTool(speakers), BuildTriggerDiscussionTool(topics) };
120 this.response = new Director.Response();
121
122 try {
123 await session.SendAsync(new UpdateSessionRequest(GetSessionConfiguration()), cancellationToken);
124 } catch (Exception e) {
125 Debug.LogException(e);
126 }
127
128 // Starting listening to the human user.
129 SetStatus(Status.Listening);
130 }
131
132 public bool CancelListen() {
133 lock (statusMutex) {
134 if (statusMutex.value == Status.Listening) {
135 statusMutex.value = Status.Waiting;
136 return true;
137 }
138 return false;
139 }
140 }
141
142 /// Submit the user text input. Used as an alternative to speaking into the microphone, usually for development & debugging purposes.
143 /// <param name="message">The user text input.</param>
144 /// <param name="cancellationToken">Cancellation token used to cancel any async actions when the program shuts down.</param>
145 public async Awaitable<bool> SubmitUserTextInput(string message, CancellationToken cancellationToken) {
146 // Check for status and update it.
147 if (!TestAndSetStatus(Status.Listening, Status.TextInput)) { return false; }
148
149 // Tell the director to clear everything it has heard.
150 await session.SendAsync(new InputAudioBufferClearRequest(), cancellationToken);
151
152 // Now tell it to reply to our text input.
153 response.userTranscript = message;
154 await session.SendAsync(new OpenAI.Realtime.ConversationItemCreateRequest(message), cancellationToken);
155 await session.SendAsync(new OpenAI.Realtime.CreateResponseRequest(), cancellationToken);
156
157 return true;
158 }
159
160 // Internal Functions
161 private OpenAI.Realtime.SessionConfiguration GetSessionConfiguration() {
162 return new OpenAI.Realtime.SessionConfiguration(
163 Model.GPT4oRealtime,
164 modalities: Modality.Text, // Text only since Director is not speaking to user directly.
165 instructions: this.config ? this.config.instructions : null,
166 inputAudioTranscriptionSettings: new OpenAI.Realtime.InputAudioTranscriptionSettings(Model.Transcribe_GPT_4o, language: "en"), // The settings we use to transcribe what the human says. Without this, the human's speech will not get transcibed. Apparently the language setting doesn't work.
167 turnDetectionSettings: new OpenAI.Realtime.ServerVAD(silenceDuration: 2000, createResponse: false), // We want Server VAD so that the AI automatically detects when speech starts or ends. But we don't want it to automatically trigger a response, because we have to make sure that a text input isn't already sent.
168 tools: this.tools,
169 toolChoice: "required"); // Set to auto or required to allow the AI to use tools.
170 }
171
172 private async void RecordInputAudio(OpenAI.Realtime.RealtimeSession session, CancellationToken cancellationToken) {
173 var memoryStream = new MemoryStream();
174 var semaphore = new SemaphoreSlim(1, 1);
175
176 try {
177 byte[] emptyBuffer = new byte[1024 * 16]; // 1 KB buffer.
178 async Task BufferCallback(NativeArray<byte> buffer) {
179 try {
180 await semaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false);
181 for (int i = 0; i < buffer.Length; ++i) { memoryStream.WriteByte(buffer[i]); }
182 } finally {
183 semaphore.Release();
184 }
185 }
186
187 // RecordingManager is from the com.utilities.audio package.
188 // We don't await this so that we can implement buffer copy and send response to realtime API.
189 RecordingManager.StartRecordingStream<WavEncoder>(BufferCallback, 24000, cancellationToken); // Sample rate has to be 24000 according to the InputAudioBufferAppendRequest API docs.
190
191 do {
192 byte[] voiceBuffer = ArrayPool<byte>.Shared.Rent(1024 * 16); // 16 KB buffer.
193 try {
194 int bytesRead = 0;
195 try {
196 await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
197 memoryStream.Position = 0;
198 bytesRead = await memoryStream.ReadAsync(voiceBuffer, 0, (int)Math.Min(voiceBuffer.Length, memoryStream.Length), cancellationToken).ConfigureAwait(false);
199 memoryStream.SetLength(0);
200 } finally {
201 semaphore.Release();
202 }
203
204 if (bytesRead > 0) {
205 // If we are recording, send what the microphone picks up.
206 if (!IsMicMuted && IsStatus(Status.Listening)) {
207 await session.SendAsync(new InputAudioBufferAppendRequest(voiceBuffer.AsMemory(0, bytesRead)), cancellationToken).ConfigureAwait(false);
208 }
209 // Otherwise, send silence. We want to continue sending data so that the model can trigger a response if it received silence.
210 else {
211 await session.SendAsync(new InputAudioBufferAppendRequest(emptyBuffer.AsMemory(0, bytesRead)), cancellationToken).ConfigureAwait(false);
212 }
213 } else {
214 await Task.Yield();
215 }
216 } catch (Exception e) {
217 switch (e) {
218 // Ignored
219 case TaskCanceledException: break;
220 case OperationCanceledException: break;
221 default: Debug.LogError(e); break;
222 }
223 } finally {
224 ArrayPool<byte>.Shared.Return(voiceBuffer);
225 }
226 } while (!cancellationToken.IsCancellationRequested);
227
228 RecordingManager.EndRecording();
229 } catch (Exception e) {
230 switch (e) {
231 // Ignored
232 case TaskCanceledException: break;
233 case OperationCanceledException: break;
234 default: Debug.LogError(e); break;
235 }
236 } finally {
237 await memoryStream.DisposeAsync();
238 }
239 }
240
241 /// If the director's response is ready, invoke the response callback.
242 /// <remarks>
243 /// Note that user transcript can come before or after the Response event!Note that user transcript can come before or after the Response event!
244 /// OpenAI API Link: https://platform.openai.com/docs/api-reference/realtime-server-events/conversation/item/input_audio_transcription?utm_source=chatgpt.com
245 /// </remarks>
247 if (response != null && response.IsDone) {
249 response = null;
250 SetStatus(Status.Waiting);
251 }
252 }
253
254 /// Callback function to receive server events from OpenAI.
255 /// <param name="event">The event received from OpenAI.</param>
256 private void OnServerEvent(IServerEvent @event) {
257 switch (@event) {
258 case RealtimeEventError error: throw error;
259 case SessionResponse sessionResponse: break;
260 case RealtimeConversationResponse conversationResponse: break;
261 case ConversationItemCreatedResponse conversationItemCreated: break;
262 case ConversationItemInputAudioTranscriptionResponse conversationItemTranscription:
263 if (!conversationItemTranscription.IsCompleted) { return; }
264 Debug.Log($"[{conversationItemTranscription.ItemId}] Director's User Transcription: " + conversationItemTranscription.Transcript.Trim());
265
266 // We only want to update the user transcript if this is the latest transcript.
267 // This is to handle the insane case where:
268 // 1. User sends text and voice input at the same time.
269 // 2. Voice input is ignored, but a InputAudioBufferCommittedResponse was received.
270 // 3. ConversationItemInputAudioTranscriptionResponse was not received.
271 // 4. User sends voice input.
272 // 5. Second InputAudioBufferCommittedResponse is received.
273 // 6. First outdated ConversationItemInputAudioTranscriptionResponse is received. (This needs to be ignored.)
274 // 7. Second correct ConversationItemInputAudioTranscriptionResponse is received.
275 lock (latestItemId) {
276 if (latestItemId != conversationItemTranscription.ItemId) {
277 Debug.Log("Director.OnServerEvent ConversationItemInputAudioTranscriptionResponse ignored as it is outdated.");
278 }
279 }
280
281 if (IsStatus(Status.VoiceInput)) {
282 response.userTranscript = conversationItemTranscription.Transcript.Trim();
284 } else {
285 Debug.Log("Director.OnServerEvent ConversationItemInputAudioTranscriptionResponse ignored as the status is not ReplyToVoice.");
286 }
287 break;
288 case ConversationItemTruncatedResponse conversationItemTruncated: break;
289 case ConversationItemDeletedResponse conversationItemDeleted: break;
290 case InputAudioBufferCommittedResponse committedResponse:
291 Debug.Log($"[{committedResponse.ItemId}] Director.OnServerEvent InputAudioBufferCommittedResponse");
292
293 // InputAudioBufferCommittedResponse should always be received before ConversationItemInputAudioTranscriptionResponse.
294 lock (latestItemId) { latestItemId = committedResponse.ItemId; }
295
296 if (TestAndSetStatus(Status.Listening, Status.VoiceInput)) {
297 session.Send(new OpenAI.Realtime.CreateResponseRequest()); // Now tell it to reply to our audio input.
298 } else {
299 Debug.Log("Director.OnServerEvent InputAudioBufferCommittedResponse ignored.");
300 }
301 break;
302 case InputAudioBufferClearedResponse clearedResponse:
303 Debug.Log($"Director.OnServerEvent InputAudioBufferClearedResponse");
304 break;
305 case InputAudioBufferStartedResponse startedResponse:
306 Debug.Log($"Director.OnServerEvent InputAudioBufferStartedResponse");
307 break;
308 case InputAudioBufferStoppedResponse stoppedResponse: break;
309 case RealtimeResponse realtimeResponse:
310 switch (realtimeResponse.Response.Status) {
311 case RealtimeResponseStatus.InProgress:
312 Debug.Log("Director Realtime Response InProgress.");
313 break;
314 case RealtimeResponseStatus.Completed:
315 Debug.Log("Director Realtime Response Completed.");
317 break;
318 case RealtimeResponseStatus.Cancelled:
319 Debug.Log("Director Realtime Response Cancelled.");
320 break;
321 case RealtimeResponseStatus.Failed:
322 Debug.Log("Director Realtime Response Failed.");
323 break;
324 case RealtimeResponseStatus.Incomplete:
325 Debug.Log("Director Realtime Response Incomplete.");
326 break;
327 }
328 break;
329 case ResponseOutputItemResponse outputItemResponse: break;
330 case ResponseContentPartResponse contentPartResponse: break;
331 case ResponseTextResponse textResponse: break; // Used if modality is Modality.Text only.
332 case ResponseAudioResponse audioResponse: break;
333 case ResponseAudioTranscriptResponse transcriptResponse: break; // Used if modality has Modality.Audio.
334 case ResponseFunctionCallArgumentsResponse functionCallArgumentsResponse:
335 if (!functionCallArgumentsResponse.IsDone) { return; }
336 Debug.Log($"Director's Function Call: " + functionCallArgumentsResponse.Name + ", Arguments: " + functionCallArgumentsResponse.Arguments.ToString());
337
338 // Handle function calls.
339 string output = string.Empty;
340 if (functionCallArgumentsResponse.Name == "trigger_discussion") {
341 response.discussionTopic = ParseDiscussionTopic(functionCallArgumentsResponse.Arguments.ToString());
342 } else if (functionCallArgumentsResponse.Name == "trigger_response") {
343 response.speakerOrder = ParseSpeakerOrder(functionCallArgumentsResponse.Arguments.ToString());
344 }
345
346 // Return the function call output to the model.
347 ConversationItem functionCallOutput = new ConversationItem((ToolCall)functionCallArgumentsResponse, output);
348 session.Send(new OpenAI.Realtime.ConversationItemCreateRequest(functionCallOutput));
349 break;
350 case RateLimitsResponse rateLimitsResponse: break;
351 default: break;
352 }
353 }
354
355 private void SetStatus(Status value) { lock (statusMutex) { statusMutex.value = value; } }
356
357 private bool TestAndSetStatus(Status condition, Status value) {
358 lock (statusMutex) {
359 if (statusMutex.value == condition) {
360 statusMutex.value = value;
361 return true;
362 }
363 return false;
364 }
365 }
366
367 // Director Tools
368 /// Create a function following OpenAI's JSON Schema for the director to decide upon the speaker order.
369 /// OpenAI API on function calling: https://platform.openai.com/docs/guides/function-calling
370 /// <param name="speakers">The names of the speakers.</param>
371 /// <returns>The function's JSON Object.</returns>
372 private OpenAI.Function BuildTriggerResponseTool(List<string> speakers) {
373 var args = new {
374 type = "object",
375 properties = new {
376 speaker_order = new {
377 type = "array",
378 description = "The order which the AI models will respond to the user.",
379 items = new {
380 type = "string",
381 @enum = speakers.ToArray() // Adding an enum means that the AI can only pick from this set of values. (Well, the AI still sometimes hallucinates invalid values.)
382 },
383 minItems = 1,
384 maxItems = speakers.Count,
385 uniqueItems = true
386 }
387 },
388 required = new[] { "speaker_order" }
389 };
390 string parameters = JsonConvert.SerializeObject(args, Formatting.Indented);
391 return new OpenAI.Function("trigger_response", "Triggers the AI models to respond to the user. No output is given.", JToken.Parse(parameters));
392 }
393
394 /// Create a function following OpenAI's JSON Schema for the director to trigger a discussion based on a topic.
395 /// OpenAI API on function calling: https://platform.openai.com/docs/guides/function-calling
396 /// <param name="topics">The possible topics to trigger a discussion for.</param>
397 /// <returns>The function's JSON Object.</returns>
398 /// <remarks>If there is no topic given, the AI tends to hallucinate a topic. To prevent that, add a couple of placeholder discussions with the most obscure, niche topic that the user is unlikely to ever trigger.</remarks>
399 private OpenAI.Function BuildTriggerDiscussionTool(List<string> topics) {
400 var args = new {
401 type = "object",
402 properties = new {
403 topic = new {
404 type = "string",
405 description = "The topic of the discussion to trigger.",
406 @enum = topics.ToArray() // Adding an enum means that the AI can only pick from this set of values. (Well, the AI still sometimes hallucinates invalid values.)
407 }
408 },
409 required = new[] { "topic" }
410 };
411 string parameters = JsonConvert.SerializeObject(args, Formatting.Indented);
412 return new OpenAI.Function("trigger_discussion", "Triggers a discussion based on a topic. No output is given. If there are no input choices given, do not invoke this function.", JToken.Parse(parameters));
413 }
414
415 private List<string> ParseSpeakerOrder(string args) {
416 JToken parsedArgs = JToken.Parse(args);
417 if (parsedArgs == null || parsedArgs["speaker_order"] == null) {
418 return new List<string>();
419 }
420 return new List<string>(parsedArgs["speaker_order"].ToObject<string[]>());
421 }
422
423 private string ParseDiscussionTopic(string args) {
424 JToken parsedArgs = JToken.Parse(args);
425 if (parsedArgs == null || parsedArgs["topic"] == null) {
426 return string.Empty;
427 }
428 return parsedArgs["topic"].ToString();
429 }
430 }
431}
Helper class to load the authentication file and retrieve API keys.
static OpenAIAuthentication GetOpenAIAuthentication()
List< string > speakerOrder
Definition: Director.cs:42
Helper class to act as a mutex for the status value, as it may be read by multiple threads.
Definition: Director.cs:36
OpenAIClient api
Definition: Director.cs:55
void SetStatus(Status value)
Definition: Director.cs:355
StatusMutex statusMutex
Definition: Director.cs:62
List< string > ParseSpeakerOrder(string args)
Definition: Director.cs:415
async void RecordInputAudio(OpenAI.Realtime.RealtimeSession session, CancellationToken cancellationToken)
Definition: Director.cs:172
string latestItemId
Definition: Director.cs:63
void Initialise(UnityAction< Director.Response > onDirectorResponse, CancellationToken cancellationToken)
Definition: Director.cs:74
bool IsStatus(Status value)
Definition: Director.cs:104
void InvokeOnDirectorResponse()
Definition: Director.cs:246
List< OpenAI.Tool > tools
Definition: Director.cs:59
OpenAI.Function BuildTriggerResponseTool(List< string > speakers)
Definition: Director.cs:372
bool TestAndSetStatus(Status condition, Status value)
Definition: Director.cs:357
Director.Response response
Definition: Director.cs:57
string ParseDiscussionTopic(string args)
Definition: Director.cs:423
bool CancelListen()
Definition: Director.cs:132
OpenAI.Realtime.SessionConfiguration GetSessionConfiguration()
Definition: Director.cs:161
async Awaitable< bool > SubmitUserTextInput(string message, CancellationToken cancellationToken)
Definition: Director.cs:145
void OnServerEvent(IServerEvent @event)
Definition: Director.cs:256
RealtimeSession session
Definition: Director.cs:56
OpenAI.Function BuildTriggerDiscussionTool(List< string > topics)
Definition: Director.cs:399
async void ListenForNextUserInput(DirectorConfig config, List< string > speakers, List< string > topics, CancellationToken cancellationToken)
Definition: Director.cs:112
UnityAction< Director.Response > onDirectorResponse
Definition: Director.cs:58
DirectorConfig config
Definition: Director.cs:54
Status
Current status of the director.
Definition: Director.cs:24
@ Waiting
Idling and waiting for VoiceChat system.
@ VoiceInput
Replying to voice input.
@ Listening
Listening for user input.
@ TextInput
Replying to text input.