Browse Source

dataqueue: Make thread safe.

Each data queue gets its own mutex and each function obtains it.

Fixes #7390.
Ryan C. Gordon 2 years ago
parent
commit
8b9a938413
2 changed files with 41 additions and 6 deletions
  1. 40 6
      src/SDL_dataqueue.c
  2. 1 0
      src/SDL_dataqueue.h

+ 40 - 6
src/SDL_dataqueue.c

@@ -32,6 +32,7 @@ typedef struct SDL_DataQueuePacket
 
 struct SDL_DataQueue
 {
+    SDL_mutex *lock;
     SDL_DataQueuePacket *head; /* device fed from here. */
     SDL_DataQueuePacket *tail; /* queue fills to here. */
     SDL_DataQueuePacket *pool; /* these are unused packets. */
@@ -48,24 +49,26 @@ static void SDL_FreeDataQueueList(SDL_DataQueuePacket *packet)
     }
 }
 
-/* this all expects that you managed thread safety elsewhere. */
-
 SDL_DataQueue *
 SDL_CreateDataQueue(const size_t _packetlen, const size_t initialslack)
 {
-    SDL_DataQueue *queue = (SDL_DataQueue *)SDL_malloc(sizeof(SDL_DataQueue));
+    SDL_DataQueue *queue = (SDL_DataQueue *)SDL_calloc(1, sizeof(SDL_DataQueue));
 
     if (queue == NULL) {
         SDL_OutOfMemory();
-        return NULL;
     } else {
         const size_t packetlen = _packetlen ? _packetlen : 1024;
         const size_t wantpackets = (initialslack + (packetlen - 1)) / packetlen;
         size_t i;
 
-        SDL_zerop(queue);
         queue->packet_size = packetlen;
 
+        queue->lock = SDL_CreateMutex();
+        if (!queue->lock) {
+            SDL_free(queue);
+            return NULL;
+        }
+
         for (i = 0; i < wantpackets; i++) {
             SDL_DataQueuePacket *packet = (SDL_DataQueuePacket *)SDL_malloc(sizeof(SDL_DataQueuePacket) + packetlen);
             if (packet) { /* don't care if this fails, we'll deal later. */
@@ -85,6 +88,7 @@ void SDL_DestroyDataQueue(SDL_DataQueue *queue)
     if (queue) {
         SDL_FreeDataQueueList(queue->head);
         SDL_FreeDataQueueList(queue->pool);
+        SDL_DestroyMutex(queue->lock);
         SDL_free(queue);
     }
 }
@@ -101,6 +105,8 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
         return;
     }
 
+    SDL_LockMutex(queue->lock);
+
     packet = queue->head;
 
     /* merge the available pool and the current queue into one list. */
@@ -128,9 +134,12 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
         queue->pool = NULL;
     }
 
+    SDL_UnlockMutex(queue->lock);
+
     SDL_FreeDataQueueList(packet); /* free extra packets */
 }
 
+/* You must hold queue->lock before calling this! */
 static SDL_DataQueuePacket *AllocateDataQueuePacket(SDL_DataQueue *queue)
 {
     SDL_DataQueuePacket *packet;
@@ -177,6 +186,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
         return SDL_InvalidParamError("queue");
     }
 
+    SDL_LockMutex(queue->lock);
+
     orighead = queue->head;
     origtail = queue->tail;
     origlen = origtail ? origtail->datalen : 0;
@@ -200,6 +211,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
                 queue->tail = origtail;
                 queue->pool = NULL;
 
+                SDL_UnlockMutex(queue->lock);
                 SDL_FreeDataQueueList(packet); /* give back what we can. */
                 return SDL_OutOfMemory();
             }
@@ -213,6 +225,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
         queue->queued_bytes += datalen;
     }
 
+    SDL_UnlockMutex(queue->lock);
+
     return 0;
 }
 
@@ -228,6 +242,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
         return 0;
     }
 
+    SDL_LockMutex(queue->lock);
+
     for (packet = queue->head; len && packet; packet = packet->next) {
         const size_t avail = packet->datalen - packet->startpos;
         const size_t cpy = SDL_min(len, avail);
@@ -238,6 +254,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
         len -= cpy;
     }
 
+    SDL_UnlockMutex(queue->lock);
+
     return (size_t)(ptr - buf);
 }
 
@@ -253,6 +271,8 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
         return 0;
     }
 
+    SDL_LockMutex(queue->lock);
+
     while ((len > 0) && ((packet = queue->head) != NULL)) {
         const size_t avail = packet->datalen - packet->startpos;
         const size_t cpy = SDL_min(len, avail);
@@ -278,12 +298,26 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
         queue->tail = NULL; /* in case we drained the queue entirely. */
     }
 
+    SDL_UnlockMutex(queue->lock);
+
     return (size_t)(ptr - buf);
 }
 
 size_t
 SDL_GetDataQueueSize(SDL_DataQueue *queue)
 {
-    return queue ? queue->queued_bytes : 0;
+    size_t retval = 0;
+    if (queue) {
+        SDL_LockMutex(queue->lock);
+        retval = queue->queued_bytes;
+        SDL_UnlockMutex(queue->lock);
+    }
+    return retval;
+}
+
+SDL_mutex *
+SDL_GetDataQueueMutex(SDL_DataQueue *queue)
+{
+    return queue ? queue->lock : NULL;
 }
 

+ 1 - 0
src/SDL_dataqueue.h

@@ -33,5 +33,6 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *data, const size_t le
 size_t SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
 size_t SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
 size_t SDL_GetDataQueueSize(SDL_DataQueue *queue);
+SDL_mutex *SDL_GetDataQueueMutex(SDL_DataQueue *queue);  /* don't destroy this, obviously. */
 
 #endif /* SDL_dataqueue_h_ */