summaryrefslogtreecommitdiffstats
path: root/mpeglib/lib/input/simpleRingBuffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'mpeglib/lib/input/simpleRingBuffer.cpp')
-rw-r--r--mpeglib/lib/input/simpleRingBuffer.cpp420
1 files changed, 420 insertions, 0 deletions
diff --git a/mpeglib/lib/input/simpleRingBuffer.cpp b/mpeglib/lib/input/simpleRingBuffer.cpp
new file mode 100644
index 00000000..71c92329
--- /dev/null
+++ b/mpeglib/lib/input/simpleRingBuffer.cpp
@@ -0,0 +1,420 @@
+/*
+ a thread safe ring buffer without dependencies
+ Copyright (C) 1999 Martin Vogt
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation.
+
+ For more information look at the file COPYRIGHT in this package
+
+ */
+
+
+
+
+#include "simpleRingBuffer.h"
+#include <string.h>
+
+#include <iostream>
+
+using namespace std;
+
+static int instanceCnt=0;
+
+SimpleRingBuffer::SimpleRingBuffer(int size,int minLinSize) {
+ abs_thread_mutex_init(&mut);
+ abs_thread_cond_init(&dataCond);
+ abs_thread_cond_init(&spaceCond);
+
+ instance=instanceCnt;
+ instanceCnt++;
+ this->size=size;
+ startPos=(char*)malloc(size);
+ readPos=startPos;
+ writePos=startPos;
+ lockPos=startPos;
+
+ lockgrade=0;
+ fillgrade=0;
+ linAvail=size;
+ lastPos=(startPos+size-1);
+ eofPos=lastPos+1;
+
+ canWrite=size;
+ canRead=0;
+
+ minLinBuf=new char[minLinSize];
+ this->minLinBufSize=minLinSize;
+ waitMinData=0;
+ waitMinSpace=0;
+ lWaitForData=false;
+ lWaitForSpace=false;
+
+ readBytes=0;
+ writeBytes=0;
+
+ lCanWaitForData=true;
+ lCanWaitForSpace=true;
+}
+
+
+SimpleRingBuffer::~SimpleRingBuffer() {
+ // The user of this class must take care that the threads
+ // have exited!
+ free(startPos);
+ delete [] minLinBuf;
+ abs_thread_mutex_destroy(&mut);
+ abs_thread_cond_destroy(&dataCond);
+ abs_thread_cond_destroy(&spaceCond);
+}
+
+
+int SimpleRingBuffer::getSize() {
+ return size;
+}
+
+
+int SimpleRingBuffer::getWriteArea(char* &ptr,int &size) {
+
+ // When we are in this area the following
+ // can happen by the other thread
+ // fillgrade is decreased (because reader fowards)
+ // readPos is changed.
+
+ ptr=writePos;
+ size=canWrite;
+
+ return size;
+}
+
+
+void SimpleRingBuffer::updateCanWrite() {
+ if (lockPos < writePos) {
+ canWrite=eofPos-writePos;
+ //printf("1 c:%d l:%p w:%p",canWrite,lockPos,writePos);
+ } else if (lockPos > writePos) {
+ canWrite=lockPos-writePos;
+ //printf("2 c:%d l:%p w:%p",canWrite,lockPos,writePos);
+ } else {
+ if (fillgrade > 0) {
+ canWrite=0;
+ } else {
+ canWrite=eofPos-writePos;
+ }
+ //printf("2 c:%d ",canWrite);
+
+ }
+ if (canWrite < 0) {
+ printf("error canWrite:%d fill:%d lock:%p start:%p eof:%p write:%p\n",
+ canWrite,fillgrade,lockPos,startPos,eofPos,writePos);
+ }
+
+}
+
+
+void SimpleRingBuffer::updateCanRead() {
+ canRead=fillgrade-lockgrade;
+ int currentSpace=size-fillgrade;
+ if (currentSpace >= waitMinSpace) {
+ abs_thread_cond_signal(&spaceCond);
+ }
+ if (canRead < 0) {
+ printf("error canRead:%d fillgrade:%d lockgrade:%d \n",
+ canRead,fillgrade,lockgrade);
+ }
+
+}
+
+
+void SimpleRingBuffer::forwardLockPtr(int nBytes) {
+ abs_thread_mutex_lock(&mut);
+
+ if (fillgrade < lockgrade) {
+ printf("1:fillgrade:%d < lockgrade:%d\n",fillgrade,lockgrade);
+ }
+ fillgrade-=nBytes;
+ lockgrade-=nBytes;
+ if (fillgrade < lockgrade) {
+ printf("2:fillgrade:%d < lockgrade:%d nBytes:%d\n",
+ fillgrade,lockgrade,nBytes);
+ }
+ lockPos=lockPos+nBytes;
+ if (lockPos > lastPos) { // we expects that we had a linAvail part
+ // if user forwards more than buffer boundary
+ nBytes=lockPos-lastPos;
+ lockPos=startPos+nBytes-1;
+ }
+ updateCanWrite();
+ updateCanRead();
+
+ abs_thread_mutex_unlock(&mut);
+ return;
+}
+
+
+void SimpleRingBuffer::forwardWritePtr(int nBytes) {
+ abs_thread_mutex_lock(&mut);
+
+ fillgrade=fillgrade+nBytes;
+ if (fillgrade < lockgrade) {
+ printf("3:fillgrade:%d < lockgrade:%d nBytes:%d\n",
+ fillgrade,lockgrade,nBytes);
+ }
+ writeBytes+=nBytes;
+ writePos=writePos+nBytes;
+ if(writePos >= eofPos) {
+ if (writePos == eofPos) {
+ writePos=startPos;
+ } else {
+ cout << "writePos > eofPos ! forward error:"<<(eofPos-writePos)
+ <<" bytes"<<endl;
+ }
+ }
+
+ updateCanWrite();
+ updateCanRead();
+ if (fillgrade >= waitMinData) {
+ abs_thread_cond_signal(&dataCond);
+ }
+ abs_thread_mutex_unlock(&mut);
+}
+
+
+int SimpleRingBuffer::waitForSpace(int bytes){
+ abs_thread_mutex_lock(&mut);
+ int back=0;
+ waitMinSpace=bytes;
+ if (waitMinSpace > size) {
+ waitMinSpace=size;
+ }
+ if (waitMinSpace < 0) {
+ cout << "negative waitForSpace"<<endl;
+ waitMinSpace=0;
+ }
+ int currentSpace=size-fillgrade;
+ if (lCanWaitForSpace) {
+ if (currentSpace < waitMinSpace) {
+ lWaitForSpace=true;
+ // it is not possible to wait for data/space simultanously
+ if (lWaitForData == true) {
+ abs_thread_cond_signal(&dataCond);
+ }
+ abs_thread_cond_wait(&spaceCond,&mut);
+ lWaitForSpace=false;
+ }
+ }
+ if (size-fillgrade >= waitMinSpace) {
+ back=1;
+ }
+ abs_thread_mutex_unlock(&mut);
+ return back;
+}
+
+
+void SimpleRingBuffer::exitWaitForSpace(){
+ abs_thread_mutex_lock(&mut);
+ abs_thread_cond_signal(&spaceCond);
+ abs_thread_mutex_unlock(&mut);
+}
+
+void SimpleRingBuffer::setCanWaitForSpace(int lCanWaitForSpace) {
+ abs_thread_mutex_lock(&mut);
+ this->lCanWaitForSpace=lCanWaitForSpace;
+ abs_thread_cond_signal(&spaceCond);
+ abs_thread_mutex_unlock(&mut);
+
+}
+
+
+
+void SimpleRingBuffer::forwardReadPtr(int nBytes) {
+ abs_thread_mutex_lock(&mut);
+ readBytes+=nBytes;
+ readPos+=nBytes;
+ linAvail=linAvail-nBytes;
+ lockgrade+=nBytes;
+ if (readPos > lastPos) { // we expects that we had a linAvail part
+ // if user forwards more than buffer boundary
+ nBytes=readPos-lastPos;
+ readPos=startPos+nBytes-1;
+ linAvail=lastPos+1-readPos;
+ }
+ if (fillgrade < lockgrade) {
+ printf("5:fillgrade:%d < lockgrade:%d nBytes:%d\n",
+ fillgrade,lockgrade,nBytes);
+ }
+ updateCanRead();
+ abs_thread_mutex_unlock(&mut);
+}
+
+
+
+int SimpleRingBuffer::getReadArea(char* &ptr,int &readSize) {
+ int pSize=readSize;
+ ptr=readPos;
+
+ if (canRead == 0) {
+ readSize=0;
+ return 0;
+ }
+ if (pSize < 0) {
+ cout << "Generic Memory Info invalid"<<endl;
+ pSize=size/2;
+ }
+ //
+ // Now the part the we deliver a minimum buffer if it is
+ // possible
+ //
+
+ if ( (pSize > linAvail) &&
+ (minLinBufSize >linAvail) &&
+ (canRead > linAvail) ) {
+ int copySize;
+ copySize=canRead; // we cannot copy more than this
+ if (copySize > pSize) { // if it is too much reduche it
+ copySize=pSize;
+ }
+ if (copySize > minLinBufSize) { // if it does not fit in buffer->reduce
+ copySize=minLinBufSize;
+ }
+ memcpy(minLinBuf,readPos,linAvail);
+ memcpy(minLinBuf+linAvail,startPos,copySize-linAvail);
+ readSize=copySize;
+ ptr=minLinBuf;
+ return copySize;
+ }
+
+ // linAvail part end
+
+ int copyBytes=linAvail;
+ if (canRead < copyBytes) {
+ copyBytes=canRead;
+ }
+ if (copyBytes >= pSize) {
+ readSize=pSize;
+ } else {
+ readSize=copyBytes;
+ }
+ return readSize;
+}
+
+
+
+void SimpleRingBuffer::exitWaitForData(){
+ abs_thread_mutex_lock(&mut);
+ abs_thread_cond_signal(&dataCond);
+ abs_thread_mutex_unlock(&mut);
+}
+
+
+
+int SimpleRingBuffer::waitForData(int bytes){
+ abs_thread_mutex_lock(&mut);
+ int back=0;
+ waitMinData=bytes;
+ if (waitMinData > size) {
+ waitMinData=size;
+ }
+ if (waitMinData < 0) {
+ cout << "negative waitForData"<<endl;
+ waitMinData=0;
+ }
+ if (lCanWaitForData) {
+ if (fillgrade < waitMinData) {
+ lWaitForData=true;
+ // it is not possible to wait for data space simultanously
+ if (lWaitForSpace == true) {
+ abs_thread_cond_signal(&spaceCond);
+ }
+ abs_thread_cond_wait(&dataCond,&mut);
+ lWaitForData=false;
+ }
+ }
+ if (fillgrade >= waitMinData) {
+ back=1;
+ }
+ abs_thread_mutex_unlock(&mut);
+ return back;
+}
+
+int SimpleRingBuffer::getCanWaitForData() {
+ return lCanWaitForData;
+}
+
+
+void SimpleRingBuffer::setCanWaitForData(int lCanWaitForData) {
+ abs_thread_mutex_lock(&mut);
+ this->lCanWaitForData=lCanWaitForData;
+ abs_thread_cond_signal(&dataCond);
+ abs_thread_mutex_unlock(&mut);
+
+}
+
+void SimpleRingBuffer::emptyBuffer() {
+ abs_thread_mutex_lock(&mut);
+ writePos=readPos;
+ if (fillgrade < lockgrade) {
+ printf("4:fillgrade:%d < lockgrade:%d\n",fillgrade,lockgrade);
+ }
+ linAvail=lastPos+1-writePos;
+ fillgrade=lockgrade;
+ updateCanRead();
+ updateCanWrite();
+ readBytes=0;
+ writeBytes=0;
+ if (size-fillgrade >= waitMinSpace) {
+ abs_thread_cond_signal(&spaceCond);
+ }
+ if (fillgrade >= waitMinData) {
+ abs_thread_cond_signal(&dataCond);
+ }
+ abs_thread_mutex_unlock(&mut);
+}
+
+
+int SimpleRingBuffer::getFillgrade() {
+ return fillgrade;
+}
+
+
+int SimpleRingBuffer::getReadBytes() {
+ return readBytes;
+}
+
+
+int SimpleRingBuffer::getWriteBytes() {
+ return writeBytes;
+}
+
+
+int SimpleRingBuffer::getFreeRead() {
+ return fillgrade;
+}
+
+int SimpleRingBuffer::getFreeWrite() {
+ return size-fillgrade;
+}
+
+void SimpleRingBuffer::resizeBuffer(int changeSize) {
+ abs_thread_mutex_lock(&mut);
+ int lPos=lockPos-startPos;
+ int wPos=writePos-startPos;
+ int rPos=readPos-startPos;
+ startPos=(char *)realloc(startPos,changeSize);
+ size=changeSize;
+ readPos=startPos+lPos;
+ writePos=startPos+wPos;
+ lockPos=startPos+rPos;
+
+ lastPos=(startPos+size-1);
+ eofPos=lastPos+1;
+
+ linAvail=lastPos+1-readPos;
+
+ updateCanWrite();
+ updateCanRead();
+ abs_thread_mutex_unlock(&mut);
+
+
+}