|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package gogpt
-
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/json"
- "io"
- "net/http"
- )
-
- type ChatCompletionStreamChoiceDelta struct {
- Content string `json:"content"`
- }
-
- type ChatCompletionStreamChoice struct {
- Index int `json:"index"`
- Delta ChatCompletionStreamChoiceDelta `json:"delta"`
- FinishReason string `json:"finish_reason"`
- }
-
- type ChatCompletionStreamResponse struct {
- ID string `json:"id"`
- Object string `json:"object"`
- Created int64 `json:"created"`
- Model string `json:"model"`
- Choices []ChatCompletionStreamChoice `json:"choices"`
- }
-
- // ChatCompletionStream
- // Note: Perhaps it is more elegant to abstract Stream using generics.
- type ChatCompletionStream struct {
- emptyMessagesLimit uint
- isFinished bool
-
- reader *bufio.Reader
- response *http.Response
- }
-
- func (stream *ChatCompletionStream) Recv() (response ChatCompletionStreamResponse, err error) {
- if stream.isFinished {
- err = io.EOF
- return
- }
-
- var emptyMessagesCount uint
-
- waitForData:
- line, err := stream.reader.ReadBytes('\n')
- if err != nil {
- return
- }
-
- var headerData = []byte("data: ")
- line = bytes.TrimSpace(line)
- if !bytes.HasPrefix(line, headerData) {
- emptyMessagesCount++
- if emptyMessagesCount > stream.emptyMessagesLimit {
- err = ErrTooManyEmptyStreamMessages
- return
- }
-
- goto waitForData
- }
-
- line = bytes.TrimPrefix(line, headerData)
- if string(line) == "[DONE]" {
- stream.isFinished = true
- err = io.EOF
- return
- }
-
- err = json.Unmarshal(line, &response)
- return
- }
-
- func (stream *ChatCompletionStream) Close() {
- stream.response.Body.Close()
- }
-
- // CreateChatCompletionStream — API call to create a chat completion w/ streaming
- // support. It sets whether to stream back partial progress. If set, tokens will be
- // sent as data-only server-sent events as they become available, with the
- // stream terminated by a data: [DONE] message.
- func (c *Client) CreateChatCompletionStream(
- ctx context.Context,
- request ChatCompletionRequest,
- ) (stream *ChatCompletionStream, err error) {
- request.Stream = true
- req, err := c.newStreamRequest(ctx, "POST", "/chat/completions", request)
- if err != nil {
- return
- }
-
- resp, err := c.config.HTTPClient.Do(req) //nolint:bodyclose // body is closed in stream.Close()
- if err != nil {
- return
- }
-
- stream = &ChatCompletionStream{
- emptyMessagesLimit: c.config.EmptyMessagesLimit,
- reader: bufio.NewReader(resp.Body),
- response: resp,
- }
- return
- }
|