//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose: 
//
// $NoKeywords: $
//=============================================================================//

// Nasty headers!
#include "MySqlDatabase.h"
#include "tier1/strtools.h"
#include "vmpi.h"
#include "vmpi_dispatch.h"
#include "mpi_stats.h"
#include "cmdlib.h"
#include "imysqlwrapper.h"
#include "threadhelpers.h"
#include "vmpi_tools_shared.h"
#include "tier0/icommandline.h"

/*

-- MySQL code to create the databases, create the users, and set access privileges.
-- You only need to ever run this once.

create database vrad;

use mysql;

create user vrad_worker;
create user vmpi_browser;

-- This updates the "user" table, which is checked when someone tries to connect to the database.
grant select,insert,update on vrad.* to vrad_worker;
grant select on vrad.* to vmpi_browser;
flush privileges;

/*

-- SQL code to (re)create the tables.

-- Master generates a unique job ID (in job_master_start) and sends it to workers.
-- Each worker (and the master) make a job_worker_start, link it to the primary job ID, 
--     get their own unique ID, which represents that process in that job.
-- All JobWorkerID fields link to the JobWorkerID field in job_worker_start. 

-- NOTE: do a "use vrad" or "use vvis" first, depending on the DB you want to create.


use vrad;


drop table job_master_start;
create table job_master_start ( 
	JobID					INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,	index id( JobID, MachineName(5) ),
	BSPFilename				TINYTEXT NOT NULL,
	StartTime				TIMESTAMP NOT NULL,
	MachineName				TEXT NOT NULL,
	RunningTimeMS			INTEGER UNSIGNED NOT NULL,
	NumWorkers				INTEGER UNSIGNED NOT NULL default 0
	);

drop table job_master_end;
create table job_master_end (
	JobID					INTEGER UNSIGNED NOT NULL,					PRIMARY KEY ( JobID ),
	NumWorkersConnected		SMALLINT UNSIGNED NOT NULL,
	NumWorkersDisconnected	SMALLINT UNSIGNED NOT NULL,
	ErrorText				TEXT NOT NULL
	);

drop table job_worker_start;
create table job_worker_start (
	JobWorkerID				INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,	
	index index_jobid( JobID ),
	index index_jobworkerid( JobWorkerID ),
	
	JobID					INTEGER UNSIGNED NOT NULL,				-- links to job_master_start::JobID
	IsMaster				BOOL NOT NULL,							-- Set to 1 if this "worker" is the master process.
	RunningTimeMS			INTEGER UNSIGNED NOT NULL default 0,
	MachineName				TEXT NOT NULL,
	WorkerState				SMALLINT UNSIGNED NOT NULL default 0,	-- 0 = disconnected, 1 = connected
	NumWorkUnits			INTEGER UNSIGNED NOT NULL default 0,	-- how many work units this worker has completed
	CurrentStage			TINYTEXT NOT NULL,				-- which compile stage is it on
	Thread0WU				INTEGER NOT NULL default 0,		-- which WU thread 0 is on
	Thread1WU				INTEGER NOT NULL default 0,		-- which WU thread 1 is on
	Thread2WU				INTEGER NOT NULL default 0,		-- which WU thread 2 is on
	Thread3WU				INTEGER NOT NULL default 0		-- which WU thread 3 is on
	);

drop table text_messages;
create table text_messages (
	JobWorkerID				INTEGER UNSIGNED NOT NULL,					index id( JobWorkerID, MessageIndex ),
	MessageIndex			INTEGER UNSIGNED NOT NULL,
	Text					TEXT NOT NULL
	);

drop table graph_entry;
create table graph_entry (
	JobWorkerID				INTEGER UNSIGNED NOT NULL,					index id( JobWorkerID ),
	MSSinceJobStart			INTEGER UNSIGNED NOT NULL,
	BytesSent				INTEGER UNSIGNED NOT NULL,
	BytesReceived			INTEGER UNSIGNED NOT NULL
	);

drop table events;
create table events (
	JobWorkerID				INTEGER UNSIGNED NOT NULL,					index id( JobWorkerID ),
	Text					TEXT NOT NULL
	);
*/



// Stats set by the app.
int		g_nWorkersConnected = 0;
int		g_nWorkersDisconnected = 0;


DWORD	g_StatsStartTime;

CMySqlDatabase	*g_pDB = NULL;

IMySQL			*g_pSQL = NULL;
CSysModule		*g_hMySQLDLL = NULL;

char	g_BSPFilename[256];

bool			g_bMaster = false;
unsigned long	g_JobPrimaryID = 0;	// This represents this job, but doesn't link to a particular machine.
unsigned long	g_JobWorkerID = 0;	// A unique key in the DB that represents this machine in this job.
char			g_MachineName[MAX_COMPUTERNAME_LENGTH+1] = {0};

unsigned long	g_CurrentMessageIndex = 0;


HANDLE	g_hPerfThread = NULL;
DWORD	g_PerfThreadID = 0xFEFEFEFE;
HANDLE	g_hPerfThreadExitEvent = NULL;

// These are set by the app and they go into the database.
extern uint64 g_ThreadWUs[4];

extern uint64 VMPI_GetNumWorkUnitsCompleted( int iProc );


// ---------------------------------------------------------------------------------------------------- //
// This is a helper class to build queries like the stream IO.
// ---------------------------------------------------------------------------------------------------- //

class CMySQLQuery
{
friend class CMySQL;

public:
	// This is like a sprintf, but it will grow the string as necessary.
	void Format( const char *pFormat, ... );

	int Execute( IMySQL *pDB );

private:
	CUtlVector<char>	m_QueryText;
};


void CMySQLQuery::Format( const char *pFormat, ... )
{
	#define QUERYTEXT_GROWSIZE	1024

	// This keeps growing the buffer and calling _vsnprintf until the buffer is 
	// large enough to hold all the data.
	m_QueryText.SetSize( QUERYTEXT_GROWSIZE );
	while ( 1 )
	{
		va_list marker;
		va_start( marker, pFormat );
		int ret = _vsnprintf( m_QueryText.Base(), m_QueryText.Count(), pFormat, marker );
		va_end( marker );

		if ( ret < 0 )
		{
			m_QueryText.SetSize( m_QueryText.Count() + QUERYTEXT_GROWSIZE );
		}
		else
		{
			m_QueryText[ m_QueryText.Count() - 1 ] = 0;
			break;
		}
	}
}


int CMySQLQuery::Execute( IMySQL *pDB )
{
	int ret = pDB->Execute( m_QueryText.Base() );
	m_QueryText.Purge();
	return ret;
}



// ---------------------------------------------------------------------------------------------------- //
// This inserts the necessary backslashes in front of backslashes or quote characters.
// ---------------------------------------------------------------------------------------------------- //

char* FormatStringForSQL( const char *pText )
{
	// First, count the quotes in the string. We need to put a backslash in front of each one.
	int nChars = 0;
	const char *pCur = pText;
	while ( *pCur != 0 )
	{
		if ( *pCur == '\"' || *pCur == '\\' )
			++nChars;
		
		++pCur;
		++nChars;
	}

	pCur = pText;
	char *pRetVal = new char[nChars+1];
	for ( int i=0; i < nChars; )
	{
		if ( *pCur == '\"' || *pCur == '\\' )
			pRetVal[i++] = '\\';
		
		pRetVal[i++] = *pCur;
		++pCur;
	}
	pRetVal[nChars] = 0;

	return pRetVal;
}



// -------------------------------------------------------------------------------- //
// Commands to add data to the database.
// -------------------------------------------------------------------------------- //
class CSQLDBCommandBase : public ISQLDBCommand
{
public:
	virtual ~CSQLDBCommandBase()
	{
	}

	virtual void deleteThis()
	{
		delete this;
	}
};

class CSQLDBCommand_WorkerStats : public CSQLDBCommandBase
{
public:
	virtual int RunCommand()
	{
		int nCurConnections = VMPI_GetCurrentNumberOfConnections();


		// Update the NumWorkers entry.
		char query[2048];
		Q_snprintf( query, sizeof( query ), "update job_master_start set NumWorkers=%d where JobID=%lu",
			nCurConnections,
			g_JobPrimaryID );
		g_pSQL->Execute( query );

		
		// Update the job_master_worker_stats stuff.
		for ( int i=1; i < nCurConnections; i++ )
		{
			unsigned long jobWorkerID = VMPI_GetJobWorkerID( i );

			if ( jobWorkerID != 0xFFFFFFFF )
			{
				Q_snprintf( query, sizeof( query ), "update "
					"job_worker_start set WorkerState=%d, NumWorkUnits=%d where JobWorkerID=%lu",
					VMPI_IsProcConnected( i ), 
					(int) VMPI_GetNumWorkUnitsCompleted( i ),
					VMPI_GetJobWorkerID( i )
					); 
				g_pSQL->Execute( query );
			}
		}
		return 1;
	}
};

class CSQLDBCommand_JobMasterEnd : public CSQLDBCommandBase
{
public:
	
	virtual int RunCommand()
	{
		CMySQLQuery query;
		query.Format( "insert into job_master_end values ( %lu, %d, %d, \"no errors\" )", g_JobPrimaryID, g_nWorkersConnected, g_nWorkersDisconnected ); 
		query.Execute( g_pSQL  );

		// Now set RunningTimeMS.
		unsigned long runningTimeMS = GetTickCount() - g_StatsStartTime;
		query.Format( "update job_master_start set RunningTimeMS=%lu where JobID=%lu", runningTimeMS, g_JobPrimaryID );
		query.Execute( g_pSQL );
		return 1;
	}
};


void UpdateJobWorkerRunningTime()
{
	unsigned long runningTimeMS = GetTickCount() - g_StatsStartTime;
	
	char curStage[256];
	VMPI_GetCurrentStage( curStage, sizeof( curStage ) );
	
	CMySQLQuery query;
	query.Format( "update job_worker_start set RunningTimeMS=%lu, CurrentStage=\"%s\", "
		"Thread0WU=%d, Thread1WU=%d, Thread2WU=%d, Thread3WU=%d where JobWorkerID=%lu", 
		runningTimeMS, 
		curStage,
		(int) g_ThreadWUs[0],
		(int) g_ThreadWUs[1],
		(int) g_ThreadWUs[2],
		(int) g_ThreadWUs[3],
		g_JobWorkerID );
	query.Execute( g_pSQL );
}


class CSQLDBCommand_GraphEntry : public CSQLDBCommandBase
{
public:
	
				CSQLDBCommand_GraphEntry( DWORD msTime, DWORD nBytesSent, DWORD nBytesReceived )
				{
					m_msTime = msTime;
					m_nBytesSent = nBytesSent;
					m_nBytesReceived = nBytesReceived;
				}

	virtual int RunCommand()
	{
		CMySQLQuery query;
		query.Format(	"insert into graph_entry (JobWorkerID, MSSinceJobStart, BytesSent, BytesReceived) "
						"values ( %lu, %lu, %lu, %lu )", 
			g_JobWorkerID, 
			m_msTime, 
			m_nBytesSent, 
			m_nBytesReceived );
		
		query.Execute( g_pSQL );

		UpdateJobWorkerRunningTime();

		++g_CurrentMessageIndex;
		return 1;
	}

	DWORD m_nBytesSent;
	DWORD m_nBytesReceived;
	DWORD m_msTime;
};



class CSQLDBCommand_TextMessage : public CSQLDBCommandBase
{
public:
	
				CSQLDBCommand_TextMessage( const char *pText )
				{
					m_pText = FormatStringForSQL( pText );
				}

	virtual		~CSQLDBCommand_TextMessage()
	{
		delete [] m_pText;
	}

	virtual int RunCommand()
	{
		CMySQLQuery query;
		query.Format( "insert into text_messages (JobWorkerID, MessageIndex, Text) values ( %lu, %lu, \"%s\" )", g_JobWorkerID, g_CurrentMessageIndex, m_pText );
		query.Execute( g_pSQL );

		++g_CurrentMessageIndex;
		return 1;
	}

	char		*m_pText;
};


// -------------------------------------------------------------------------------- //
// Internal helpers.
// -------------------------------------------------------------------------------- //

// This is the spew output before it has connected to the MySQL database.
CCriticalSection g_SpewTextCS;
CUtlVector<char> g_SpewText( 1024 );


void VMPI_Stats_SpewHook( const char *pMsg )
{
	CCriticalSectionLock csLock( &g_SpewTextCS );
	csLock.Lock();

		// Queue the text up so we can send it to the DB right away when we connect.
		g_SpewText.AddMultipleToTail( strlen( pMsg ), pMsg );
}


void PerfThread_SendSpewText()
{
	// Send the spew text to the database.
	CCriticalSectionLock csLock( &g_SpewTextCS );
	csLock.Lock();
		
		if ( g_SpewText.Count() > 0 )
		{
			g_SpewText.AddToTail( 0 );
			
			if ( g_bMPI_StatsTextOutput )
			{
				g_pDB->AddCommandToQueue( new CSQLDBCommand_TextMessage( g_SpewText.Base() ), NULL );
			}
			else
			{
				// Just show one message in the vmpi_job_watch window to let them know that they need
				// to use a command line option to get the output.
				static bool bFirst = true;
				if ( bFirst )
				{
					char msg[512];
					V_snprintf( msg, sizeof( msg ), "%s not enabled", VMPI_GetParamString( mpi_Stats_TextOutput ) );
					bFirst = false;
					g_pDB->AddCommandToQueue( new CSQLDBCommand_TextMessage( msg ), NULL );
				}
			}
			
			g_SpewText.RemoveAll();
		}

	csLock.Unlock();
}


void PerfThread_AddGraphEntry( DWORD startTicks, DWORD &lastSent, DWORD &lastReceived )
{
	// Send the graph entry with data transmission info.
	DWORD curSent = g_nBytesSent + g_nMulticastBytesSent;
	DWORD curReceived = g_nBytesReceived + g_nMulticastBytesReceived;

	g_pDB->AddCommandToQueue( 
		new CSQLDBCommand_GraphEntry( 
			GetTickCount() - startTicks,
			curSent - lastSent, 
			curReceived - lastReceived ), 
		NULL );

	lastSent = curSent;
	lastReceived = curReceived;
}


// This function adds a graph_entry into the database periodically.
DWORD WINAPI PerfThreadFn( LPVOID pParameter )
{
	DWORD lastSent = 0;
	DWORD lastReceived = 0;
	DWORD startTicks = GetTickCount();

	while ( WaitForSingleObject( g_hPerfThreadExitEvent, 1000 ) != WAIT_OBJECT_0 )
	{
		PerfThread_AddGraphEntry( startTicks, lastSent, lastReceived );

		// Send updates for text output.
		PerfThread_SendSpewText();

		// If we're the master, update all the worker stats.
		if ( g_bMaster )
		{
			g_pDB->AddCommandToQueue( 
				new CSQLDBCommand_WorkerStats, 
				NULL );
		}
	}

	// Add the remaining text and one last graph entry (which will include the current stage info).
	PerfThread_SendSpewText();
	PerfThread_AddGraphEntry( startTicks, lastSent, lastReceived );

	SetEvent( g_hPerfThreadExitEvent );
	return 0;
}


// -------------------------------------------------------------------------------- //
// VMPI_Stats interface.
// -------------------------------------------------------------------------------- //

void VMPI_Stats_InstallSpewHook()
{
	InstallExtraSpewHook( VMPI_Stats_SpewHook );
}


void UnloadMySQLWrapper()
{
	if ( g_hMySQLDLL )
	{
		if ( g_pSQL )
		{
			g_pSQL->Release();
			g_pSQL = NULL;
		}
	
		Sys_UnloadModule( g_hMySQLDLL );
		g_hMySQLDLL = NULL;
	}
}


bool LoadMySQLWrapper(
	const char *pHostName, 
	const char *pDBName, 
	const char *pUserName
	)
{
	UnloadMySQLWrapper();

	// Load the DLL and the interface.
	if ( !Sys_LoadInterface( "mysql_wrapper", MYSQL_WRAPPER_VERSION_NAME, &g_hMySQLDLL, (void**)&g_pSQL ) )
		return false;

	// Try to init the database.
	if ( !g_pSQL->InitMySQL( pDBName, pHostName, pUserName ) )
	{
		UnloadMySQLWrapper();
		return false;
	}

	return true;
}


bool VMPI_Stats_Init_Master( 
	const char *pHostName, 
	const char *pDBName, 
	const char *pUserName,
	const char *pBSPFilename, 
	unsigned long *pDBJobID )
{
	Assert( !g_pDB );

	g_bMaster = true;
	
	// Connect the database.
	g_pDB = new CMySqlDatabase;
	if ( !g_pDB || !g_pDB->Initialize() || !LoadMySQLWrapper( pHostName, pDBName, pUserName ) )
	{
		delete g_pDB;
		g_pDB = NULL;
		return false;
	}

	DWORD size = sizeof( g_MachineName );
	GetComputerName( g_MachineName, &size );

	// Create the job_master_start row.
	Q_FileBase( pBSPFilename, g_BSPFilename, sizeof( g_BSPFilename ) );

	g_JobPrimaryID = 0;
	CMySQLQuery query;
	query.Format( "insert into job_master_start ( BSPFilename, StartTime, MachineName, RunningTimeMS ) values ( \"%s\", null, \"%s\", %lu )", g_BSPFilename, g_MachineName, RUNNINGTIME_MS_SENTINEL ); 
	query.Execute( g_pSQL );

	g_JobPrimaryID = g_pSQL->InsertID();
	if ( g_JobPrimaryID == 0 )
	{
		delete g_pDB;
		g_pDB = NULL;
		return false;
	}


	// Now init the worker portion.
	*pDBJobID = g_JobPrimaryID;
	return VMPI_Stats_Init_Worker( NULL, NULL, NULL, g_JobPrimaryID );
}



bool VMPI_Stats_Init_Worker( const char *pHostName, const char *pDBName, const char *pUserName, unsigned long DBJobID )
{
	g_StatsStartTime = GetTickCount();
	
	// If pDBServerName is null, then we're the master and we just want to make the job_worker_start entry.
	if ( pHostName )
	{
		Assert( !g_pDB );
		
		// Connect the database.
		g_pDB = new CMySqlDatabase;
		if ( !g_pDB || !g_pDB->Initialize() || !LoadMySQLWrapper( pHostName, pDBName, pUserName ) )
		{
			delete g_pDB;
			g_pDB = NULL;
			return false;
		}
		
		// Get our machine name to store in the database.
		DWORD size = sizeof( g_MachineName );
		GetComputerName( g_MachineName, &size );
	}


	g_JobPrimaryID = DBJobID;
	g_JobWorkerID = 0;

	CMySQLQuery query;
	query.Format( "insert into job_worker_start ( JobID, CurrentStage, IsMaster, MachineName ) values ( %lu, \"none\", %d, \"%s\" )",
		g_JobPrimaryID, g_bMaster, g_MachineName );
	query.Execute( g_pSQL );
			
	g_JobWorkerID = g_pSQL->InsertID();
	if ( g_JobWorkerID == 0 )
	{
		delete g_pDB;
		g_pDB = NULL;
		return false;
	}

	// Now create a thread that samples perf data and stores it in the database.
	g_hPerfThreadExitEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
	g_hPerfThread = CreateThread(
		NULL,
		0,
		PerfThreadFn,
		NULL,
		0,
		&g_PerfThreadID );

	return true;	
}


void VMPI_Stats_Term()
{
	if ( !g_pDB )
		return;

	// Stop the thread.
	SetEvent( g_hPerfThreadExitEvent );
	WaitForSingleObject( g_hPerfThread, INFINITE );
	
	CloseHandle( g_hPerfThreadExitEvent );
	g_hPerfThreadExitEvent = NULL;

	CloseHandle( g_hPerfThread );
	g_hPerfThread = NULL;

	if ( g_bMaster )
	{
		// (Write a job_master_end entry here).
		g_pDB->AddCommandToQueue( new CSQLDBCommand_JobMasterEnd, NULL );
	}

	// Wait for up to a second for the DB to finish writing its data.
	DWORD startTime = GetTickCount();
	while ( GetTickCount() - startTime < 1000 )
	{
		if ( g_pDB->QueriesInOutQueue() == 0 )
			break;
	}

	delete g_pDB;
	g_pDB = NULL;

	UnloadMySQLWrapper();
}


static bool ReadStringFromFile( FILE *fp, char *pStr, int strSize )
{
	int i=0;
	for ( i; i < strSize-2; i++ )
	{
		if ( fread( &pStr[i], 1, 1, fp ) != 1 ||
			pStr[i] == '\n' )
		{
			break;
		}
	}

	pStr[i] = 0;
	return i != 0;
}


// This looks for pDBInfoFilename in the same path as pBaseExeFilename.
// The file has 3 lines: machine name (with database), database name, username
void GetDBInfo( const char *pDBInfoFilename, CDBInfo *pInfo )
{
	char baseExeFilename[512];
	if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) )
		Error( "GetModuleFileName failed." );
	
	// Look for the info file in the same directory as the exe.
	char dbInfoFilename[512];
	Q_strncpy( dbInfoFilename, baseExeFilename, sizeof( dbInfoFilename ) );
	Q_StripFilename( dbInfoFilename );

	if ( dbInfoFilename[0] == 0 )
		Q_strncpy( dbInfoFilename, ".", sizeof( dbInfoFilename ) );

	Q_strncat( dbInfoFilename, "/", sizeof( dbInfoFilename ), COPY_ALL_CHARACTERS );
	Q_strncat( dbInfoFilename, pDBInfoFilename, sizeof( dbInfoFilename ), COPY_ALL_CHARACTERS );

	FILE *fp = fopen( dbInfoFilename, "rt" );
	if ( !fp )
	{
		Error( "Can't open %s for database info.\n", dbInfoFilename );
	}

	if ( !ReadStringFromFile( fp, pInfo->m_HostName, sizeof( pInfo->m_HostName ) ) ||
		 !ReadStringFromFile( fp, pInfo->m_DBName, sizeof( pInfo->m_DBName ) ) || 
		 !ReadStringFromFile( fp, pInfo->m_UserName, sizeof( pInfo->m_UserName ) ) 
		 )
	{
		Error( "%s is not a valid database info file.\n", dbInfoFilename );
	}

	fclose( fp );
}


void RunJobWatchApp( char *pCmdLine )
{
	STARTUPINFO si;
	memset( &si, 0, sizeof( si ) );
	si.cb = sizeof( si );

	PROCESS_INFORMATION pi;
	memset( &pi, 0, sizeof( pi ) );

	// Working directory should be the same as our exe's directory.
	char dirName[512];
	if ( GetModuleFileName( NULL, dirName, sizeof( dirName ) ) != 0 )
	{
		char *s1 = V_strrchr( dirName, '\\' );
		char *s2 = V_strrchr( dirName, '/' );
		if ( s1 || s2 )
		{
			// Get rid of the last slash.
			s1 = max( s1, s2 );
			s1[0] = 0;
		
			if ( !CreateProcess( 
				NULL, 
				pCmdLine, 
				NULL,							// security
				NULL,
				TRUE,
				0,			// flags
				NULL,							// environment
				dirName,							// current directory
				&si,
				&pi ) )
			{
				Warning( "%s - error launching '%s'\n", VMPI_GetParamString( mpi_Job_Watch ), pCmdLine );
			}
		}
	}
}


void StatsDB_InitStatsDatabase( 
	int argc, 
	char **argv, 
	const char *pDBInfoFilename )
{
	// Did they disable the stats database?
	if ( !g_bMPI_Stats && !VMPI_IsParamUsed( mpi_Job_Watch ) )
		return;

	unsigned long jobPrimaryID;

	// Now open the DB.
	if ( g_bMPIMaster )
	{
		CDBInfo dbInfo;
		GetDBInfo( pDBInfoFilename, &dbInfo );

		if ( !VMPI_Stats_Init_Master( dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, argv[argc-1], &jobPrimaryID ) )
		{
			Warning( "VMPI_Stats_Init_Master( %s, %s, %s ) failed.\n", dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName );

			// Tell the workers not to use stats.
			dbInfo.m_HostName[0] = 0; 
		}

		char cmdLine[2048];
		Q_snprintf( cmdLine, sizeof( cmdLine ), "vmpi_job_watch -JobID %d", jobPrimaryID );
		
		Msg( "\nTo watch this job, run this command line:\n%s\n\n", cmdLine );
		
		if ( VMPI_IsParamUsed( mpi_Job_Watch ) )
		{
			// Convenience thing to automatically launch the job watch for this job.
			RunJobWatchApp( cmdLine );
		}

		// Send the database info to all the workers.
		SendDBInfo( &dbInfo, jobPrimaryID );
	}
	else
	{
		// Wait to get DB info so we can connect to the MySQL database.
		CDBInfo dbInfo;
		unsigned long jobPrimaryID;
		RecvDBInfo( &dbInfo, &jobPrimaryID );
		
		if ( dbInfo.m_HostName[0] != 0 )
		{
			if ( !VMPI_Stats_Init_Worker( dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, jobPrimaryID ) )
				Error( "VMPI_Stats_Init_Worker( %s, %s, %s, %d ) failed.\n", dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, jobPrimaryID );
		}
	}
}


unsigned long StatsDB_GetUniqueJobID()
{
	return g_JobPrimaryID;
}


unsigned long VMPI_Stats_GetJobWorkerID()
{
	return g_JobWorkerID;
}