386 lines
13 KiB
C++
386 lines
13 KiB
C++
//========= Copyright Valve Corporation, All rights reserved. ============//
|
|
//
|
|
// Purpose: A thread pool implementation. You give it CWorkItems,
|
|
// it processes them asynchronously, and hands them back to you when they've
|
|
// been completed.
|
|
//
|
|
// To declare a queue, provide the implementation of a CWorkItem subtype,
|
|
// the thread name prefix for threads in the pool, and the number of work
|
|
// threads you want.
|
|
//
|
|
// CNet uses this class to offload encryption to a separate thread,
|
|
// so that's a good place to start looking for usage examples.
|
|
//
|
|
//=============================================================================
|
|
|
|
#ifndef WORKTHREADPOOL_H
|
|
#define WORKTHREADPOOL_H
|
|
#ifdef _WIN32
|
|
#pragma once
|
|
#endif
|
|
|
|
#include <refcount.h>
|
|
#include <reliabletimer.h>
|
|
#include "jobtime.h"
|
|
|
|
// forward declaration for CTSQueue which we can't statically allocate as our member
|
|
// because of alignment issues on Win64
|
|
template <class T, bool bTestOptimizer>
|
|
class CTSQueue;
|
|
|
|
namespace GCSDK {
|
|
|
|
// forward declarations
|
|
class CWorkThread;
|
|
class CJobMgr;
|
|
|
|
|
|
// these functions return pointers to fixed string in the code section. We need this for VPROF nodes
|
|
#define DECLARE_WORK_ITEM( classname ) \
|
|
virtual const char* GetDispatchCompletedName() const { return #classname"::DispatchCompleted"; } \
|
|
virtual const char* GetThreadProcessName() const { return #classname"::ThreadProcess"; }
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// Purpose: Work item base class. Derive from this for specific work item types.
|
|
// The derived type ideally should be self-contained with all data it
|
|
// needs to perform the work.
|
|
//-----------------------------------------------------------------------------
|
|
class CWorkItem : public CRefCount
|
|
{
|
|
public:
|
|
CWorkItem()
|
|
: m_JobID( k_GIDNil ),
|
|
m_bRunning( false ),
|
|
m_bResubmit( false ),
|
|
m_bCanceled( false ),
|
|
m_ulSequenceNumber( 0 )
|
|
{
|
|
m_jobtimeTimeout.SetLTime( 0 );
|
|
m_jobtimeQueued.SetToJobTime();
|
|
}
|
|
|
|
CWorkItem( JobID_t jobID )
|
|
: m_JobID( jobID ),
|
|
m_bRunning( false ),
|
|
m_bResubmit( false ),
|
|
m_bCanceled( false ),
|
|
m_ulSequenceNumber( 0 )
|
|
{
|
|
m_jobtimeTimeout.SetLTime( 0 );
|
|
m_jobtimeQueued.SetToJobTime();
|
|
}
|
|
|
|
CWorkItem( JobID_t jobID, int64 cTimeoutMicroseconds )
|
|
: m_JobID( jobID ),
|
|
m_bRunning( false ),
|
|
m_bResubmit( false ),
|
|
m_bCanceled( false ),
|
|
m_ulSequenceNumber( 0 )
|
|
{
|
|
SetPreExecuteTimeout( cTimeoutMicroseconds );
|
|
m_jobtimeQueued.SetToJobTime();
|
|
}
|
|
|
|
void SetJobID( JobID_t jobID )
|
|
{
|
|
Assert(jobID != k_GIDNil) ;
|
|
m_JobID = jobID;
|
|
}
|
|
JobID_t GetJobID() const { return m_JobID; }
|
|
|
|
bool HasTimedOut() const { return m_jobtimeTimeout.LTime() != 0 && m_jobtimeTimeout.CServerMicroSecsPassed() > 0; }
|
|
int64 WaitingTime() const { return m_jobtimeQueued.CServerMicroSecsPassed(); }
|
|
void SetPreExecuteTimeout( int64 cMicroSeconds ) { m_jobtimeTimeout.SetFromJobTime( cMicroSeconds ); }
|
|
bool BPreExecuteTimeoutSet( ) const { return m_jobtimeTimeout.LTime() != 0; }
|
|
void ForceTimeOut() { m_jobtimeTimeout.SetFromJobTime( -1 );}
|
|
bool BIsRunning() const { return m_bRunning; } // true if running right now
|
|
bool WasCancelled() const { return m_bCanceled; }
|
|
void SetCycleCount( CCycleCount& cycleCount ) { m_CycleCount = cycleCount ; }
|
|
CCycleCount GetCycleCount() { return m_CycleCount; }
|
|
uint64 GetSequenceNumber() { return m_ulSequenceNumber; }
|
|
|
|
// Work threads can call this to force a work item to be reprocessed (added to the end of the process queue)
|
|
void SetResubmit( bool bResubmit ) { m_bResubmit = bResubmit; }
|
|
|
|
// these functions return pointers to fixed string in the code section.
|
|
// We need this for VPROF nodes, you must use the DECLARE_WORK_ITEM macro
|
|
virtual const char* GetDispatchCompletedName() const = 0;
|
|
virtual const char* GetThreadProcessName() const = 0;
|
|
|
|
// Return false if your operation failed in some way that you would want to know about
|
|
// The CWorkThreadPool will count the failures.
|
|
virtual bool ThreadProcess( CWorkThread *pThread ) = 0; // called by the worker thread
|
|
virtual bool DispatchCompletedWorkItem( CJobMgr *jobMgr ); // called by main loop after item completed
|
|
|
|
#ifdef DBGFLAG_VALIDATE
|
|
virtual void Validate( CValidator &validator, const char *pchName ) {} // Validate our internal structures
|
|
#endif
|
|
|
|
protected:
|
|
// note: destructor is private. This is a ref-counted object, private destructor ensures callers can't accidentally delete
|
|
// directly, or declare on stack
|
|
virtual ~CWorkItem() { }
|
|
|
|
friend class CWorkThread;
|
|
friend class CWorkThreadPool;
|
|
uint64 m_ulSequenceNumber; // Sequence number for the work item, used when enforcing output ordering as matching input order
|
|
CCycleCount m_CycleCount; // A record of how long it took to execute this particular work item !
|
|
|
|
private:
|
|
bool m_bResubmit; // true if the item should be resubmitted after last run
|
|
volatile bool m_bRunning; // true if the work item is running right now
|
|
bool m_bCanceled; // true if the work was canceled due to timeout
|
|
CJobTime m_jobtimeTimeout; // time at which this result is no longer valid, so it shouldn't start to be processed
|
|
CJobTime m_jobtimeQueued;
|
|
JobID_t m_JobID;
|
|
};
|
|
|
|
// forward decl
|
|
class CWorkThreadPool;
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// Purpose: Generic work thread implementation, to be specialized if necessary
|
|
//-----------------------------------------------------------------------------
|
|
class CWorkThread : public CThread
|
|
{
|
|
public:
|
|
CWorkThread( CWorkThreadPool *pThreadPool );
|
|
CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName );
|
|
|
|
virtual ~CWorkThread()
|
|
{
|
|
}
|
|
|
|
virtual int Run();
|
|
|
|
virtual void Cancel()
|
|
{
|
|
}
|
|
|
|
protected:
|
|
CWorkThreadPool *m_pThreadPool; // parent pool
|
|
volatile bool m_bExitThread; // set by CWorkThreadPool::StopWorkerThreads and possibly by subclasses of CWorkThread
|
|
volatile bool m_bFinished; // set by CWorkThread::Run [note: must still check IsThreadRunning, and/or call Join]
|
|
virtual void OnStart() { }
|
|
virtual void OnExit() { }
|
|
|
|
#ifdef DBGFLAG_VALIDATE
|
|
public:
|
|
virtual void Validate( CValidator &validator, const char *pchName )
|
|
{
|
|
VALIDATE_SCOPE();
|
|
};
|
|
#endif // DBGFLAG_VALIDATE
|
|
|
|
friend class CWorkThreadPool;
|
|
};
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// callback class to create work threads
|
|
//-----------------------------------------------------------------------------
|
|
class IWorkThreadFactory
|
|
{
|
|
public:
|
|
virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) = 0;
|
|
};
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// reusable trivial implementation of IWorkThreadFactory
|
|
//-----------------------------------------------------------------------------
|
|
template<class T>
|
|
class CWorkThreadFactory : public IWorkThreadFactory
|
|
{
|
|
public:
|
|
virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool )
|
|
{
|
|
return new T( pWorkThreadPool );
|
|
}
|
|
};
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// Purpose: interface class for object that the WorkThreadPool can signal when
|
|
// there are completed work items to process
|
|
//-----------------------------------------------------------------------------
|
|
class IWorkThreadPoolSignal
|
|
{
|
|
public:
|
|
virtual void Signal() = 0;
|
|
};
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// Purpose: pool of work threads.
|
|
//-----------------------------------------------------------------------------
|
|
class CWorkThreadPool
|
|
{
|
|
friend class CWorkThread;
|
|
public:
|
|
|
|
static void SetWorkItemCompletedSignal( IWorkThreadPoolSignal *pObject )
|
|
{
|
|
sm_pWorkItemsCompletedSignal = pObject;
|
|
}
|
|
|
|
|
|
CWorkThreadPool( const char *pszThreadNamePfx );
|
|
|
|
// eventually it might be nice to be able to resize these pools via console command
|
|
// in that case, we'd want a constructor like this, and a PoolSize accessor/mutator pair
|
|
// it makes this class much more complicated, however (growing the pool is easy, shrinking it
|
|
// is less easy) so we'll punt for now.
|
|
/* CWorkThreadPool( const char *pszName = "unnamed thread" ) : CWorkThreadPool( pszName, -1 ); */
|
|
|
|
virtual ~CWorkThreadPool();
|
|
|
|
// Setting this will ensure that items of the same priority complete and get dispatched in the same order
|
|
// they are added to the threadpool. This has a small additional locking overhead and can increase latency
|
|
// as items that are actually completed out-of-order have to queue waiting on earlier items.
|
|
void SetEnsureOutputOrdering( bool bEnsureOutputOrdering ) { m_bEnsureOutputOrdering = bEnsureOutputOrdering; }
|
|
|
|
void AllowTimeouts( bool bMayHaveJobTimeouts ) { m_bMayHaveJobTimeouts = bMayHaveJobTimeouts; }
|
|
|
|
int AddWorkThread( CWorkThread *pThread );
|
|
void StartWorkThreads(); // gentlemen, start your engines
|
|
void StopWorkThreads(); // stop work threads
|
|
bool HasWorkItemsToProcess() const;
|
|
|
|
// sets it to use dynamic worker thread construction
|
|
// if pWorkThreadControl is NULL, just creates a standard CWorkThread object
|
|
void SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor );
|
|
|
|
bool AddWorkItem( CWorkItem *pWorkItem ); // add a work item to the queue to process
|
|
CWorkItem *GetNextCompletedWorkItem( ); // get next completed work item and it's priority if needed
|
|
const char *GetThreadNamePrefix() const { return m_szThreadNamePfx; }
|
|
|
|
void SetNeverSetEventOnAdd( bool bNeverSet );
|
|
bool BNeverSetEventOnAdd() { return m_bNeverSetOnAdd; }
|
|
|
|
// get count of completed work items
|
|
// can't be inline because of m_TSQueueCompleted type
|
|
int GetCompletedWorkItemCount() const;
|
|
|
|
// get count of work items to process
|
|
// can't be inline because of m_TSQueueToProcess type
|
|
int GetWorkItemToProcessCount() const;
|
|
|
|
uint64 GetLastUsedSequenceNumber( ) const
|
|
{
|
|
return m_ulLastUsedSequenceNumber;
|
|
}
|
|
|
|
uint64 GetLastCompletedSequenceNumber( ) const
|
|
{
|
|
return m_ulLastCompletedSequenceNumber;
|
|
}
|
|
|
|
uint64 GetLastDispatchedSequenceNumber( ) const
|
|
{
|
|
return m_ulLastDispatchedSequenceNumber;
|
|
}
|
|
|
|
#if 0
|
|
uint64 GetAveExecutionTime() const
|
|
{
|
|
return m_StatExecutionTime.GetUlAvg();
|
|
}
|
|
uint64 GetAveWaitTime() const
|
|
{
|
|
return m_StatWaitTime.GetUlAvg();
|
|
}
|
|
uint64 GetCurrentBacklogTime() const;
|
|
#endif
|
|
|
|
int CountCompletedSuccess() const { return m_cSuccesses; }
|
|
int CountRetries() const { return m_cRetries; }
|
|
int CountCompletedFailed() const { return m_cFailures; }
|
|
|
|
bool BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr );
|
|
bool BExiting() const { return m_bExiting; }
|
|
|
|
int GetWorkerCount() const { return m_WorkThreads.Count(); }
|
|
|
|
uint GetActiveThreadCount() const { return m_cActiveThreads; }
|
|
|
|
// make sure you lock before using this
|
|
const CWorkThread *GetWorkThread( int iIndex ) const
|
|
{
|
|
Assert( iIndex >= 0 && iIndex < m_WorkThreads.Count() );
|
|
return m_WorkThreads[iIndex];
|
|
}
|
|
|
|
protected:
|
|
|
|
// STATICS
|
|
static IWorkThreadPoolSignal *sm_pWorkItemsCompletedSignal;
|
|
|
|
// MEMBERS
|
|
CWorkItem *GetNextWorkItemToProcess( );
|
|
void StartWorkThread( CWorkThread *pWorkThread, int iName );
|
|
|
|
// meaningful thread name prefix
|
|
char m_szThreadNamePfx[32];
|
|
// have we actually initialized the threadpool?
|
|
bool m_bThreadsInitialized;
|
|
|
|
// Incoming queue: queue of all work items to process
|
|
// must be dynamically allocated for alignment requirements on Win64
|
|
CTSQueue< CWorkItem *, false > *m_pTSQueueToProcess;
|
|
|
|
// Outgoing queues: queue of all completed work items
|
|
// must be dynamically allocated for alignment requirements on Win64
|
|
CTSQueue< CWorkItem *, false > *m_pTSQueueCompleted;
|
|
|
|
// Vectors of completed, but out of order and waiting work items, only used when bEnsureOutputOrdering == true
|
|
CThreadMutex m_MutexOnItemCompletedOrdered;
|
|
CUtlVector< CWorkItem * > m_vecCompletedAndWaiting;
|
|
|
|
// Should we emit work items in the same order they are received (on a per priority basis)
|
|
bool m_bEnsureOutputOrdering;
|
|
|
|
// Sequence numbers
|
|
uint64 m_ulLastUsedSequenceNumber;
|
|
uint64 m_ulLastCompletedSequenceNumber;
|
|
uint64 m_ulLastDispatchedSequenceNumber;
|
|
|
|
bool m_bMayHaveJobTimeouts;
|
|
CUtlVector< CWorkThread * > m_WorkThreads;
|
|
CThreadMutex m_WorkThreadMutex;
|
|
CInterlockedUInt m_cThreadsRunning; // how many threads are running
|
|
volatile bool m_bExiting; // are we exiting
|
|
CThreadEvent m_EventNewWorkItem; // event set when a new work item is available to process
|
|
CInterlockedInt m_cActiveThreads;
|
|
volatile bool m_bNeverSetOnAdd;
|
|
|
|
bool m_bAutoCreateThreads;
|
|
int m_cMaxThreads;
|
|
IWorkThreadFactory *m_pWorkThreadConstructor;
|
|
|
|
// override this method if you want to do any special handling of completed work items. Default implementation puts
|
|
// work items in our completed item queue.
|
|
virtual void OnWorkItemCompleted( CWorkItem *pWorkItem );
|
|
|
|
bool BTryDeleteExitedWorkerThreads();
|
|
|
|
int m_cSuccesses;
|
|
int m_cFailures;
|
|
int m_cRetries;
|
|
#if 0
|
|
CStat m_StatExecutionTime;
|
|
CStat m_StatWaitTime;
|
|
#endif
|
|
CLimitTimer m_LimitTimerCreateNewThreads;
|
|
|
|
#ifdef DBGFLAG_VALIDATE
|
|
public:
|
|
void Validate( CValidator &validator, const char *pchName );
|
|
#endif
|
|
};
|
|
|
|
} // namespace GCSDK
|
|
|
|
|
|
#endif // WORKTHREAD_H
|