//========= Copyright Valve Corporation, All rights reserved. ============//
//
//=======================================================================================//

#include "sv_sessionblockpublisher.h"
#include "sv_replaycontext.h"
#include "demofile/demoformat.h"
#include "sv_recordingsession.h"
#include "sv_recordingsessionblock.h"
#include "sv_sessioninfopublisher.h"

// memdbgon must be the last include file in a .cpp file!!!
#include "tier0/memdbgon.h"

//----------------------------------------------------------------------------------------

CSessionBlockPublisher::CSessionBlockPublisher( CServerRecordingSession *pSession,
											    CSessionInfoPublisher *pSessionInfoPublisher )
:	m_pSession( pSession ),
	m_pSessionInfoPublisher( pSessionInfoPublisher )
{
	// Cache the dump interval so it can't be modified during a round - doing so would require
	// an update on all clients.
	extern ConVar replay_block_dump_interval;
	m_nDumpInterval = MAX( MIN_SERVER_DUMP_INTERVAL, replay_block_dump_interval.GetInt() );

	// Write the first block 15 or so seconds from now
	m_flLastBlockWriteTime = g_pEngine->GetHostTime();
}

CSessionBlockPublisher::~CSessionBlockPublisher()
{
}

void CSessionBlockPublisher::PublishAllSynchronous()
{
	while ( !IsDone() )
	{
		Think();
	}
}

void CSessionBlockPublisher::AbortPublish()
{
	FOR_EACH_LL( m_lstPublishingBlocks, it )
	{
		CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
		IFilePublisher *&pPublisher = pCurBlock->m_pPublisher;	// Shorthand

		if ( !pPublisher )
			continue;

		// Already done?
		if ( pPublisher->IsDone() )
			continue;

		pPublisher->AbortAndCleanup();
	}

	// Remove all blocks
	m_lstPublishingBlocks.RemoveAll();
}

void CSessionBlockPublisher::OnStartRecording()
{
}

void CSessionBlockPublisher::OnStopRecord( bool bAborting )
{
	if ( !bAborting )
	{
		// Write one final session block.
		WriteAndPublishSessionBlock();
	}
}

ReplayHandle_t CSessionBlockPublisher::GetSessionHandle() const
{
	return m_pSession->GetHandle();
}

void CSessionBlockPublisher::WriteAndPublishSessionBlock()
{
	// Make sure there is data to write
	uint8 *pSessionBuffer;
	int nSessionBufferSize;
	g_pEngine->GetSessionRecordBuffer( &pSessionBuffer, &nSessionBufferSize );	// This will get called the last client disconnects from the server - but in waiting for players state we won't have a demo buffer
	if ( !pSessionBuffer || nSessionBufferSize == 0 )
		return;

	// Create a new block
	CServerRecordingSessionBlock *pNewBlock = SV_CastBlock( SV_GetRecordingSessionBlockManager()->CreateAndGenerateHandle() );
	if ( !pNewBlock )
	{
		Warning( "Failed to create replay \"%s\"\n", pNewBlock->m_szFullFilename );
		delete pNewBlock;
		return;
	}
	
	if ( m_pSession->m_nServerStartRecordTick < 0 )
	{
		Warning( "Error: Current recording start tick was not properly setup.  Aborting block write.\n" );
		delete pNewBlock;
		return;
	}

	// Figure out what the current block is
	const int iCurrentSessionBlock = m_pSession->GetNumBlocks();

	// Add an entry to the server index with the "writing" status set
	const char *pFullFilename = Replay_va(
		"%s%s_part_%u.%s", SV_GetTmpDir(),
		SV_GetRecordingSessionManager()->GetCurrentSessionName(), iCurrentSessionBlock, BLOCK_FILE_EXTENSION
	);
	V_strcpy( pNewBlock->m_szFullFilename, pFullFilename );
	pNewBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_INVALID;	// Must be set here to trigger write
	pNewBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_WRITING;
	pNewBlock->m_iReconstruction = iCurrentSessionBlock;
	pNewBlock->m_hSession = m_pSession->GetHandle();

	// Match the session's lock - the block will be unlocked once recording has stopped and all publishing is complete.
	pNewBlock->SetLocked( true );

	// Commit the replay to the history manager's list
	SV_GetRecordingSessionBlockManager()->Add( pNewBlock );

	// Also store a pointer to the block in the session - NOTE: session will not attempt to free this pointer
	m_pSession->AddBlock( pNewBlock, false );

	// Cache the block temporarily while the binary block itself writes to disk - NOTE: will not attempt to free
	m_lstPublishingBlocks.AddToTail( pNewBlock );

	// Write the block now
	PublishBlock( pNewBlock );	// pNewBlock->m_nWriteStatus modified here

	IF_REPLAY_DBG( Warning( "%f: (%i) Publishing new block, %s\n", g_pEngine->GetHostTime(), iCurrentSessionBlock, pNewBlock->GetFilename() ) );
}

void CSessionBlockPublisher::GatherBlockData( uint8 *pSessionBuffer, int nSessionBufferSize, CServerRecordingSessionBlock *pBlock, unsigned char **ppSafeBlockData, int *pBlockSize )
{
	const int nHeaderSize = sizeof( demoheader_t );

	int nBlockOffset = 0;
	const int nBlockSize = nSessionBufferSize;
	int nTotalSize = nBlockSize;

	demoheader_t *pHeader = NULL;

	// If this is the first block, pass in a header to be written.  Otherwise, just write the block.
	if ( !pBlock->m_iReconstruction )
	{
		// Setup start tick in the header
		pHeader = g_pEngine->GetReplayDemoHeader();

		// Add header size
		nBlockOffset = nHeaderSize;
		nTotalSize += nHeaderSize;
	}

	// Make a copy of the block
	unsigned char *pBuffer = new unsigned char[ nTotalSize ];
	unsigned char *pBlockCopy = pBuffer + nBlockOffset;

	// Only write the header if necessary
	if ( pHeader )
	{
		demoheader_t littleEndianHeader = *pHeader;
		littleEndianHeader.playback_time = FLT_MAX;
		littleEndianHeader.playback_ticks = INT_MAX;
		littleEndianHeader.playback_frames = INT_MAX;

		// Byteswap
		ByteSwap_demoheader_t( littleEndianHeader );

		// Write header
		V_memcpy( pBuffer, &littleEndianHeader, sizeof( littleEndianHeader ) );
	}

	// Note that pBlockCopy is based on pBuffer, which was allocated with nBlockSize PLUS
	// header size - this will not overflow.
	V_memcpy( pBlockCopy, pSessionBuffer, nBlockSize );

	// Copy to "out" parameters
	*pBlockSize = nTotalSize;
	*ppSafeBlockData = pBuffer;
}

void CSessionBlockPublisher::PublishBlock( CServerRecordingSessionBlock *pBlock )
{
	uint8 *pSessionBuffer;
	int nSessionBufferSize;
	if ( !g_pEngine->GetSessionRecordBuffer( &pSessionBuffer, &nSessionBufferSize ) )
	{
		Warning( "Block publish failed!\n" );
		return;
	}

	unsigned char *pSafeBlockData;
	int nBlockSize;
	GatherBlockData( pSessionBuffer, nSessionBufferSize, pBlock, &pSafeBlockData, &nBlockSize );

	// We've got what we need and can reset the put ptr
	g_pEngine->ResetReplayRecordBuffer();

	AssertMsg( !pBlock->m_pPublisher, "No publisher should exist for this block yet!" );

	// Set status to working
	pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_WORKING;

	// Get the number of bytes written
	pBlock->m_uFileSize = nBlockSize;

	// Make sure the main thread doesn't unload the block while it's being published
	pBlock->SetLocked( true );

	// Asynchronously publish to fileserver
	PublishFileParams_t params;
	params.m_pOutFilename = pBlock->m_szFullFilename;
	params.m_pSrcData = pSafeBlockData;
	params.m_nSrcSize = nBlockSize;
	params.m_pCallbackHandler = this;
	params.m_nCompressorType = COMPRESSORTYPE_BZ2;
	params.m_bHash = true;
	params.m_bFreeSrcData = true;
	params.m_bDeleteFile = false;
	params.m_pUserData = pBlock;
	pBlock->m_pPublisher = SV_PublishFile( params );
}

void CSessionBlockPublisher::OnPublishComplete( const IFilePublisher *pPublisher, void *pUserData )
{
	CServerRecordingSessionBlock *pBlock = (CServerRecordingSessionBlock *)pUserData;
	Assert( pBlock );

	// Set block status
	if ( pPublisher->GetStatus() == IFilePublisher::PUBLISHSTATUS_OK )
	{
		pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_SUCCESS;
	}
	else
	{
		pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_FAILED;

		// Publish failed - handle as needed
		g_pServerReplayContext->OnPublishFailed();
	}

	// Did the block compress OK?
	if ( pPublisher->Compressed() )
	{
		// Cache compressor type
		pBlock->m_nCompressorType = pPublisher->GetCompressorType();

		const int nCompressedSize = pPublisher->GetCompressedSize();
		const float flRatio = (float)pBlock->m_uFileSize / nCompressedSize;
		IF_REPLAY_DBG( Warning( "Block compression ratio: %.3f:1\n", flRatio ) );

		// Update size
		pBlock->m_uUncompressedSize = pBlock->m_uFileSize;
		pBlock->m_uFileSize = nCompressedSize;
	}

	// Get the MD5
	if ( pPublisher->Hashed() )
	{
		pPublisher->GetHash( pBlock->m_aHash );
	}

	// Now that m_nWriteStatus has been set in the block, the session info will be updated
	// accordingly the next time PublishThink() is run.

	// Mark the block as dirty since it was modified
	Assert( pBlock->m_nWriteStatus != CServerRecordingSessionBlock::WRITESTATUS_INVALID );
	SV_GetRecordingSessionBlockManager()->FlagForFlush( pBlock, false );

	IF_REPLAY_DBG( Warning( "Publish complete for block %s\n", pBlock->GetDebugName() ) );
}

void CSessionBlockPublisher::OnPublishAborted( const IFilePublisher *pPublisher )
{
	CServerRecordingSessionBlock *pBlock = FindBlockFromPublisher( pPublisher );

	// Update the block's status
	if ( pBlock )
	{
		pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_FAILED;
	}

	g_pServerReplayContext->OnPublishFailed();
}

CServerRecordingSessionBlock *CSessionBlockPublisher::FindBlockFromPublisher( const IFilePublisher *pPublisher )
{
	FOR_EACH_LL( m_lstPublishingBlocks, i )
	{
		CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ i ];
		if ( pCurBlock->m_pPublisher == pPublisher )
		{
			return pCurBlock;
		}
	}

	AssertMsg( 0, "Could not find block with the given publisher!" );
	return NULL;
}

void CSessionBlockPublisher::Think()
{
	// NOTE: This member function gets called even if replay is disabled.  This is intentional.

	VPROF_BUDGET( "CSessionBlockPublisher::Think", VPROF_BUDGETGROUP_REPLAY );

	PublishThink();
}

void CSessionBlockPublisher::PublishThink()
{
	AssertMsg( m_pSession->IsLocked(), "The session isn't locked, which means blocks can be being deleted and will probably cause a crash." );

	// Go through all currently publishing blocks and free/think
	FOR_EACH_LL( m_lstPublishingBlocks, it )
	{
		CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
		IFilePublisher *&pPublisher = pCurBlock->m_pPublisher;	// Shorthand

		if ( !pPublisher )
			continue;

		// If the publisher's done, free it
		if ( pPublisher->IsDone() )
		{
			delete pPublisher;
			pPublisher = NULL;
		}
		else
		{
			// Let the publisher think
			pPublisher->Think();
		}
	}

	// Write a new session block out right now?
	float flHostTime = g_pEngine->GetHostTime();
	if ( m_flLastBlockWriteTime != 0.0f &&
		 flHostTime - m_flLastBlockWriteTime >= m_nDumpInterval &&
		 m_pSession->m_bRecording )
	{
		Assert( m_nDumpInterval > 0 );

		// Write it
		WriteAndPublishSessionBlock();

		// Update the time
		m_flLastBlockWriteTime = flHostTime;
	}

	// Check status of any replays that are being written
	bool bUpdateSessionInfo  = false;
	for( int it = m_lstPublishingBlocks.Head(); it != m_lstPublishingBlocks.InvalidIndex(); )
	{
		CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];

		// Updated when write status is set to success or failure
		int nPendingRequestStatus = CBaseRecordingSessionBlock::STATUS_INVALID;

		// If set to anything besides InvalidIndex(), it will be removed from the list
		int itRemove = m_lstPublishingBlocks.InvalidIndex();
		bool bWriteBlockInfoToDisk = false;

		switch ( pCurBlock->m_nWriteStatus )
		{
		case CServerRecordingSessionBlock::WRITESTATUS_INVALID:
			AssertMsg( 0, "Why is m_nWriteStatus WRITESTATUS_INVALID here?" );
			break;

		case CServerRecordingSessionBlock::WRITESTATUS_WORKING:	// Do nothing if still writing
			break;

		case CServerRecordingSessionBlock::WRITESTATUS_SUCCESS:
			IF_REPLAY_DBG2( Warning( "  Block %i marked as succeeded.\n", pCurBlock->m_iReconstruction ) );
			pCurBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_READYFORDOWNLOAD;
			nPendingRequestStatus = pCurBlock->m_nRemoteStatus;
			bWriteBlockInfoToDisk = true;
			itRemove = it;
			break;

		case CServerRecordingSessionBlock::WRITESTATUS_FAILED:
		default:	// Error?
			IF_REPLAY_DBG2( Warning( "  Block %i marked as failed.\n", pCurBlock->m_iReconstruction ) );
			pCurBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_ERROR;
			pCurBlock->m_nHttpError = CBaseRecordingSessionBlock::ERROR_WRITEFAILED;
			nPendingRequestStatus = pCurBlock->m_nRemoteStatus;
			bWriteBlockInfoToDisk = true;
			itRemove = it;

			// TODO: Retry
		}

		if ( bWriteBlockInfoToDisk )
		{
			// Save the master index file
			Assert( pCurBlock->m_nWriteStatus != CServerRecordingSessionBlock::WRITESTATUS_INVALID );
			SV_GetRecordingSessionBlockManager()->FlagForFlush( pCurBlock, false );
		}

		// Find the owning session
		Assert( pCurBlock->m_hSession == m_pSession->GetHandle() );

		// Refresh session info file
		if ( nPendingRequestStatus != CBaseRecordingSessionBlock::STATUS_INVALID )
		{
			// Update it after this loop
			bUpdateSessionInfo = true;
		}

		// Update iterator
		it = m_lstPublishingBlocks.Next( it );

		// Remove?
		if ( itRemove != m_lstPublishingBlocks.InvalidIndex() )
		{
			IF_REPLAY_DBG( Warning( "Removing block %i from publisher\n", pCurBlock->m_iReconstruction ) );
			// Free/clear publisher
			delete pCurBlock->m_pPublisher;
			pCurBlock->m_pPublisher = NULL;

			// Removes from the list but doesn't free, since any pointer here points to a block somewhere
			m_lstPublishingBlocks.Unlink( itRemove );
		}
	}

	// Publish session info file now if it isn't already publishing
	if ( bUpdateSessionInfo )
	{
		m_pSessionInfoPublisher->Publish();
	}
}

bool CSessionBlockPublisher::IsDone() const
{
	return m_lstPublishingBlocks.Count() == 0;
}

#ifdef _DEBUG
void CSessionBlockPublisher::Validate()
{
	FOR_EACH_LL( m_lstPublishingBlocks, i )
	{
		CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ i ];
		Assert( pCurBlock->m_nRemoteStatus == CBaseRecordingSessionBlock::STATUS_READYFORDOWNLOAD );
	}
}
#endif

//----------------------------------------------------------------------------------------