2020-09-30 16:53:55 +02:00

747 lines
21 KiB
C++

//+-------------------------------------------------------------------------
//
// Microsoft Windows
//
// Copyright (C) Microsoft Corporation, 1997 - 1999
//
// File: dsthread.cpp
//
//--------------------------------------------------------------------------
// FILE: dsThread.cpp
#include "stdafx.h"
#include "dssnap.h" // Note: this has to be before dsthread.h
#include "dsthread.h"
#ifdef _DEBUG
#define new DEBUG_NEW
#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#endif
void WaitForThreadShutdown(HANDLE* phThreadArray, DWORD dwCount)
{
TRACE(L"entering WaitForThreadShutdown()\n");
while (TRUE)
{
//
// NOTE: this will block the console. This the intended behavior
// to keep MMC from breaking on none re-entrant code.
//
DWORD dwWaitResult = WaitForMultipleObjectsEx(
dwCount,
phThreadArray, // handle array
TRUE, // wait for all
INFINITE, // time-out
FALSE);// signal completion routine
//TRACE(L"after MsgWaitForMultipleObjects()\n");
//TRACE(L"dwWaitResult = 0x%x\n", dwWaitResult);
if (dwWaitResult == WAIT_OBJECT_0 ||
(WAIT_FAILED == dwWaitResult))
{
// woke up because the thread handle got signalled,
// or because the handle is no longer valid (thread already terminated)
// can proceed
TRACE(L"WaitForMultipleObjectsEx() succeeded\n");
break;
}
else
{
TRACE(L"WaitForMultipleObjectsEx() return 0x%x\n", dwWaitResult);
}
/* Whistler bug #176012 MMC: Assert m_pScopeTree == 0
This message pump causes the UI not to be blocked which means
MMC can reach code that is not re-entrant.
else
{
// woke up because there is a message in the queue
// need to pump
MSG msg;
while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
{
TRACE(L"inside while(PeekMessage())\n");
ASSERT(msg.message != WM_QUIT);
DispatchMessage(&msg);
}
}*/
} // while
TRACE(L"exiting WaitForThreadShutdown()\n");
}
////////////////////////////////////////////////////////////////////
// CHiddenWnd
const UINT CHiddenWnd::s_ThreadStartNotificationMessage = WM_USER + 1;
const UINT CHiddenWnd::s_ThreadTooMuchDataNotificationMessage = WM_USER + 2;
const UINT CHiddenWnd::s_ThreadHaveDataNotificationMessage = WM_USER + 3;
const UINT CHiddenWnd::s_ThreadDoneNotificationMessage = WM_USER + 4;
const UINT CHiddenWnd::s_SheetCloseNotificationMessage = WM_DSA_SHEET_CLOSE_NOTIFY; // propcfg.h
const UINT CHiddenWnd::s_SheetCreateNotificationMessage = WM_DSA_SHEET_CREATE_NOTIFY; // propcfg.h
const UINT CHiddenWnd::s_RefreshAllNotificationMessage = WM_USER + 7;
const UINT CHiddenWnd::s_ThreadShutDownNotificationMessage = WM_USER + 8;
BOOL CHiddenWnd::Create()
{
RECT rcPos;
ZeroMemory(&rcPos, sizeof(RECT));
HWND hWnd = CWindowImpl<CHiddenWnd>::Create( NULL, //HWND hWndParent,
rcPos, //RECT& rcPos,
NULL, //LPCTSTR szWindowName = NULL,
WS_POPUP, //DWORD dwStyle = WS_CHILD | WS_VISIBLE,
0x0, //DWORD dwExStyle = 0,
0 //UINT nID = 0
);
return hWnd != NULL;
}
LRESULT CHiddenWnd::OnThreadStartNotification(UINT, WPARAM, LPARAM, BOOL&)
{
TRACE(_T("CHiddenWnd::OnThreadStartNotification()\n"));
ASSERT(m_pCD != NULL);
ASSERT(m_pCD->m_pBackgroundThreadInfo->m_state == notStarted);
ASSERT(m_pCD->m_pBackgroundThreadInfo->m_nThreadID != 0);
ASSERT(m_pCD->m_pBackgroundThreadInfo->m_hThreadHandle != NULL);
m_pCD->m_pBackgroundThreadInfo->m_state = running;
return 1;
}
LRESULT CHiddenWnd::OnThreadShutDownNotification(UINT, WPARAM, LPARAM, BOOL&)
{
TRACE(_T("CHiddenWnd::OnThreadShutDownNotification()\n"));
ASSERT(m_pCD != NULL);
ASSERT(m_pCD->m_pBackgroundThreadInfo->m_state == shuttingDown);
m_pCD->m_pBackgroundThreadInfo->m_state = terminated;
return 1;
}
LRESULT CHiddenWnd::OnThreadTooMuchDataNotification(UINT, WPARAM wParam, LPARAM, BOOL&)
{
TRACE(_T("CHiddenWnd::OnThreadTooMuchDataNotification()\n"));
ASSERT(m_pCD != NULL);
// ingnore if we are shutting down (i.e. not running state)
if (m_pCD->m_pBackgroundThreadInfo->m_state == running)
{
CUINode* pUINode = reinterpret_cast<CUINode*>(wParam);
m_pCD->_OnTooMuchData(pUINode);
}
return 1;
}
LRESULT CHiddenWnd::OnThreadHaveDataNotification(UINT, WPARAM wParam, LPARAM lParam, BOOL&)
{
TRACE(_T("CHiddenWnd::OnThreadHaveDataNotification()\n"));
ASSERT(m_pCD != NULL);
CUINode* pUINode = reinterpret_cast<CUINode*>(wParam);
CThreadQueryResult* pResult = reinterpret_cast<CThreadQueryResult*>(lParam);
// ingnore if we are shutting down (i.e. not running state)
if (m_pCD->m_pBackgroundThreadInfo->m_state == running)
{
m_pCD->_OnHaveData(pUINode, pResult);
}
else
{
// going down, eat up data
if (pResult != NULL)
delete pResult;
}
return 1;
}
LRESULT CHiddenWnd::OnThreadDoneNotification(UINT, WPARAM wParam, LPARAM lParam, BOOL&)
{
HRESULT ReturnedHr = (HRESULT)lParam;
ASSERT(m_pCD != NULL);
// ingnore if we are shutting down (i.e. not running state)
if (m_pCD->m_pBackgroundThreadInfo->m_state == running)
{
CUINode* pUINode = reinterpret_cast<CUINode*>(wParam);
m_pCD->_OnDone(pUINode, ReturnedHr);
}
return 1;
}
LRESULT CHiddenWnd::OnSheetCloseNotification(UINT, WPARAM wParam, LPARAM, BOOL&)
{
ASSERT(m_pCD != NULL);
CUINode* pUINode = reinterpret_cast<CUINode*>(wParam);
m_pCD->_OnSheetClose(pUINode);
return 1;
}
LRESULT CHiddenWnd::OnSheetCreateNotification(UINT, WPARAM wParam, LPARAM, BOOL&)
{
ASSERT(m_pCD != NULL);
PDSA_SEC_PAGE_INFO pDsaSecondaryPageInfo = reinterpret_cast<PDSA_SEC_PAGE_INFO>(wParam);
ASSERT(pDsaSecondaryPageInfo != NULL);
// ingnore if we are shutting down (i.e. not running state)
if (m_pCD->m_pBackgroundThreadInfo->m_state == running)
{
m_pCD->_OnSheetCreate(pDsaSecondaryPageInfo);
}
::LocalFree(pDsaSecondaryPageInfo);
return 1;
}
LRESULT CHiddenWnd::OnRefreshAllNotification(UINT, WPARAM, LPARAM, BOOL&)
{
ASSERT(m_pCD != NULL);
// ingnore if we are shutting down (i.e. not running state)
if (m_pCD->m_pBackgroundThreadInfo->m_state == running)
{
m_pCD->ForceRefreshAll();
}
return 1;
}
////////////////////////////////////////////////////////////////////
// CBackgroundThreadBase
CBackgroundThreadBase::CBackgroundThreadBase()
{
m_bAutoDelete = FALSE;
m_hWnd = NULL;
m_pCD = NULL;
}
CBackgroundThreadBase::~CBackgroundThreadBase()
{
}
BOOL CBackgroundThreadBase::Start(HWND hWnd, CDSComponentData* pCD)
{
// this function executes in the context of the parent thread
AFX_MANAGE_STATE(AfxGetStaticModuleState());
ASSERT(::IsWindow(hWnd));
m_hWnd = hWnd;
m_pCD = pCD;
return CreateThread();
}
BOOL CBackgroundThreadBase::InitInstance()
{
// this function executes in the context of the child thread
HRESULT hr = ::CoInitialize(NULL);
if (FAILED(hr))
return FALSE;
return SUCCEEDED(hr);
}
int CBackgroundThreadBase::ExitInstance()
{
::CoUninitialize();
PostExitNotification();
// Sleep(1000);
return CWinThread::ExitInstance();
}
BOOL CBackgroundThreadBase::PostMessageToWnd(UINT msg, WPARAM wParam, LPARAM lParam)
{
ASSERT(::IsWindow(m_hWnd));
return ::PostMessage(m_hWnd, msg, wParam, lParam);
}
////////////////////////////////////////////////////////////////////
// CBackgroundThreadInfo
CBackgroundThreadInfo::CBackgroundThreadInfo()
{
m_nThreadID = 0;
m_hThreadHandle = 0;
m_state = notStarted;
m_pThreadObj = 0;
}
////////////////////////////////////////////////////////////////////
// CDispatcherThread
#define WORKER_THREAD_COUNT 2
CDispatcherThread::CDispatcherThread()
{
m_nArrCount = WORKER_THREAD_COUNT;
m_pThreadInfoArr = (CBackgroundThreadInfo*)malloc(m_nArrCount*sizeof(CBackgroundThreadInfo));
if (m_pThreadInfoArr != NULL)
{
ZeroMemory(m_pThreadInfoArr, m_nArrCount*sizeof(CBackgroundThreadInfo));
}
}
CDispatcherThread::~CDispatcherThread()
{
if (m_pThreadInfoArr)
{
for (UINT idx = 0; idx < m_nArrCount; ++idx)
{
if (m_pThreadInfoArr[idx].m_pThreadObj)
{
delete m_pThreadInfoArr[idx].m_pThreadObj;
m_pThreadInfoArr[idx].m_pThreadObj = 0;
}
}
free(m_pThreadInfoArr);
}
}
int CDispatcherThread::Run()
{
TRACE(_T("CDispatcherThread::Run() starting\n"));
BOOL bShuttingDown = FALSE;
MSG msg;
// initialize the message pump
::PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE);
// get let the main thread know we are entering the loop
PostMessageToWnd(CHiddenWnd::s_ThreadStartNotificationMessage,0,0);
BOOL bQuit = FALSE;
while(!bQuit && ::GetMessage(&msg, NULL, 0, 0))
{
switch(msg.message)
{
case DISPATCH_THREAD_RUN_MSG:
{
if (bShuttingDown)
{
// going down, eat up the message
CDSThreadQueryInfo* pQueryInfo = reinterpret_cast<CDSThreadQueryInfo*>(msg.lParam);
// reclaim memory in the queue
delete pQueryInfo;
}
else
{
// get a thread from the thread pool
UINT nEntry = GetThreadEntryFromPool();
ASSERT(m_pThreadInfoArr[nEntry].m_nThreadID != 0);
ASSERT(m_pThreadInfoArr[nEntry].m_state == running);
// forward the processing request to the thread from the pool
::PostThreadMessage(m_pThreadInfoArr[nEntry].m_nThreadID,
DISPATCH_THREAD_RUN_MSG, msg.wParam, msg.lParam);
// move the thread to a busy state
m_pThreadInfoArr[nEntry].m_state = busy;
}
}
break;
case DISPATCH_THREAD_DONE_MSG:
{
UINT nThreadID = (UINT)(msg.wParam);
ReturnThreadToPool(nThreadID);
}
break;
case THREAD_SHUTDOWN_MSG:
{
TRACE(L"CDispatcherThread got THREAD_SHUTDOWN_MSG\n");
ASSERT(!bShuttingDown);
// asked to shut down
bShuttingDown = TRUE;
// if no threads running, we go down immediately
// otherwise we have to wait for them to terminate
bQuit = BroadcastShutDownAllThreads();
TRACE(L"BroadcastShutDownAllThreads() returned bQuit = %d\n", bQuit);
}
break;
case THREAD_SHUTDOWN_ACK_MSG:
{
TRACE(L"CDispatcherThread got THREAD_SHUTDOWN_ACK_MSG\n");
ASSERT(bShuttingDown);
// worker thread has gone down
UINT nThreadID = (UINT)(msg.wParam);
bQuit = MarkThreadAsTerminated(nThreadID);
TRACE(L"MarkThreadAsTerminated() returned bQuit = %d\n", bQuit);
}
break;
default:
{
// unknown message, just let it through
::DispatchMessage(&msg);
} // default
} //switch
} // while
ASSERT(bShuttingDown);
// wait now for all the thread handles to become signalled
WaitForAllWorkerThreadsToExit();
TRACE(_T("CDispatcherThread::Run() is terminating\n"));
return ExitInstance();
}
void CDispatcherThread::PostExitNotification()
{
// we are finally done shutting down, let the main thread know
// that we are going down
PostMessageToWnd(CHiddenWnd::s_ThreadShutDownNotificationMessage, 0, 0);
TRACE(_T("CDispatcherThread::PostExitNotification() posted thread shutdown notification\n"));
}
UINT CDispatcherThread::_GetEntryFromArray()
{
UINT nFreeSlot = m_nArrCount; // set as "not found"
for (UINT k=0; k<m_nArrCount; k++)
{
if ( (m_pThreadInfoArr[k].m_nThreadID == 0) && (nFreeSlot == m_nArrCount))
nFreeSlot = k; // remember the first free slot
if ((m_pThreadInfoArr[k].m_nThreadID != 0) && (m_pThreadInfoArr[k].m_state == running))
return k; // found an idle running thread
}
// not found any idle thread, return an empty slot
if (nFreeSlot == m_nArrCount)
{
// no free space anymore, need to reallocate array
int nAlloc = m_nArrCount*2;
CBackgroundThreadInfo* temp = (CBackgroundThreadInfo*)realloc(m_pThreadInfoArr, sizeof(CBackgroundThreadInfo)*nAlloc);
if (temp)
{
m_pThreadInfoArr = temp;
::ZeroMemory(&m_pThreadInfoArr[m_nArrCount], sizeof(CBackgroundThreadInfo)*m_nArrCount);
nFreeSlot = m_nArrCount; // first free in new block
m_nArrCount = nAlloc;
}
}
return nFreeSlot;
}
UINT CDispatcherThread::GetThreadEntryFromPool()
{
UINT nEntry = _GetEntryFromArray();
// if the entry is empty, need to
// spawn a thread and wait it is running
if (m_pThreadInfoArr[nEntry].m_nThreadID == 0)
{
// If there is a thread object for this entry
// delete it and create a new one
if (m_pThreadInfoArr[nEntry].m_pThreadObj)
{
delete m_pThreadInfoArr[nEntry].m_pThreadObj;
m_pThreadInfoArr[nEntry].m_pThreadObj = 0;
}
// create the thread
CWorkerThread* pThreadObj = new CWorkerThread(m_nThreadID);
ASSERT(pThreadObj != NULL);
if (pThreadObj == NULL)
return 0;
// start the the thread
ASSERT(m_pThreadInfoArr[nEntry].m_hThreadHandle == NULL);
ASSERT(m_pThreadInfoArr[nEntry].m_state == notStarted);
if (!pThreadObj->Start(GetHiddenWnd(),GetCD()))
return 0;
ASSERT(pThreadObj->m_nThreadID != 0);
ASSERT(pThreadObj->m_hThread != NULL);
// copy the thread info we need from the thread object
m_pThreadInfoArr[nEntry].m_hThreadHandle = pThreadObj->m_hThread;
m_pThreadInfoArr[nEntry].m_nThreadID = pThreadObj->m_nThreadID;
m_pThreadInfoArr[nEntry].m_pThreadObj = pThreadObj;
// wait for the thread to start
MSG msg;
while(TRUE)
{
if (::PeekMessage(&msg,(HWND)-1,WORKER_THREAD_START_MSG, WORKER_THREAD_START_MSG,
PM_REMOVE))
{
TRACE(_T("CDispatcherThread::GetThreadFromPool() got WORKER_THREAD_START_MSG\n"));
m_pThreadInfoArr[nEntry].m_state = running;
break;
}
} // while
} // if
ASSERT(m_pThreadInfoArr[nEntry].m_state == running);
ASSERT(m_pThreadInfoArr[nEntry].m_nThreadID != 0);
return nEntry;
}
void CDispatcherThread::ReturnThreadToPool(UINT nThreadID)
{
ASSERT(nThreadID != 0);
for (UINT k=0; k<m_nArrCount; k++)
{
if (m_pThreadInfoArr[k].m_nThreadID == nThreadID)
{
// return the thread to a busy state
m_pThreadInfoArr[k].m_state = running;
return;
}
}
ASSERT(FALSE); // should never get here
}
BOOL CDispatcherThread::BroadcastShutDownAllThreads()
{
BOOL bQuit = TRUE;
for (UINT k=0; k<m_nArrCount; k++)
{
if (m_pThreadInfoArr[k].m_nThreadID != 0)
{
::PostThreadMessage(m_pThreadInfoArr[k].m_nThreadID, THREAD_SHUTDOWN_MSG,0,0);
bQuit = FALSE;
}
}
TRACE(L"CDispatcherThread::BroadcastShutDownAllThreads() returning %d\n", bQuit);
return bQuit;
}
BOOL CDispatcherThread::MarkThreadAsTerminated(UINT nThreadID)
{
TRACE(L"CDispatcherThread::MarkThreadAsTerminated()\n");
ASSERT(nThreadID != 0);
for (UINT k=0; k<m_nArrCount; k++)
{
if (m_pThreadInfoArr[k].m_nThreadID == nThreadID)
{
// mark the thread as done
TRACE(L"marking thread k = %d as terminated\n", k);
ASSERT(m_pThreadInfoArr[k].m_state == running);
m_pThreadInfoArr[k].m_state = terminated;
break;
}
}
// check if all the threads are terminated
for (k=0; k<m_nArrCount; k++)
{
if ((m_pThreadInfoArr[k].m_nThreadID != 0) &&
(m_pThreadInfoArr[k].m_state != terminated))
{
// at least one thread is still running
return FALSE;
}
}
// all the threads are gone (terminated state)
return TRUE;
}
void CDispatcherThread::WaitForAllWorkerThreadsToExit()
{
TRACE(L"CDispatcherThread::WaitForAllWorkerThreadsToExit()\n");
// wait for the dispatcher thread handle to become signalled
DWORD nCount = 0;
HANDLE* pHandles = new HANDLE[m_nArrCount];
if (!pHandles)
{
TRACE(L"Failed to allocate space for the handles\n");
return;
}
::ZeroMemory(pHandles, sizeof(HANDLE)*m_nArrCount);
for (UINT k=0; k<m_nArrCount; k++)
{
if (m_pThreadInfoArr[k].m_nThreadID != 0)
{
TRACE(L"m_pThreadInfoArr[%d].m_state = %d\n", k, m_pThreadInfoArr[k].m_state);
ASSERT(m_pThreadInfoArr[k].m_state == terminated);
ASSERT(m_pThreadInfoArr[k].m_hThreadHandle != NULL);
pHandles[nCount++] = m_pThreadInfoArr[k].m_hThreadHandle;
}
}
if (nCount == 0)
{
TRACE(L"WARNING: no worker threads to wait for!!!\n");
return;
}
TRACE(L"before WaitForThreadShutdown() loop on %d worker threads\n", nCount);
WaitForThreadShutdown(pHandles, nCount);
TRACE(L"after WaitForThreadShutdown() loop on worker threads\n");
#if (FALSE)
TRACE(L"before WaitForMultipleObjects() on worker threads\n");
WaitForMultipleObjects(nCount, pHandles, TRUE /*fWaitAll*/, INFINITE);
TRACE(L"after WaitForMultipleObjects() on worker threads\n");
#endif
delete[] pHandles;
pHandles = 0;
}
////////////////////////////////////////////////////////////////////
// CWorkerThread
CWorkerThread::CWorkerThread(UINT nParentThreadID)
: m_nMaxQueueLength(99),
m_currWParamCookie(0),
m_nParentThreadID(nParentThreadID),
m_bQuit(FALSE),
m_pCurrentQueryResult(0)
{
ASSERT(nParentThreadID != 0);
}
CWorkerThread::~CWorkerThread()
{
ASSERT(m_pCurrentQueryResult == NULL);
}
int CWorkerThread::Run()
{
HRESULT hr = S_OK;
TRACE(_T("CWorkerThread::Run() starting\n"));
MSG msg;
// initialize the message pump
::PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE);
// get let the main thread know we are entering the loop
::PostThreadMessage(m_nParentThreadID, WORKER_THREAD_START_MSG, 0,0);
ASSERT(m_bQuit == FALSE);
while(!m_bQuit && ::GetMessage(&msg, NULL, 0, 0))
{
if(msg.message == DISPATCH_THREAD_RUN_MSG)
{
m_currWParamCookie = msg.wParam;
//::MessageBox(NULL, _T("Wait"), _T("Thread"), MB_OK);
CThreadQueryInfo* pQueryInfo = reinterpret_cast<CThreadQueryInfo*>(msg.lParam);
hr = GetCD()->QueryFromWorkerThread(pQueryInfo, this);
// make sure we flush the result set
SendCurrentQueryResult();
// if we had to many items, let the hidden window know
if (pQueryInfo->m_bTooMuchData)
{
PostMessageToWnd(CHiddenWnd::s_ThreadTooMuchDataNotificationMessage,
m_currWParamCookie, (LPARAM)0);
}
delete pQueryInfo; // not needed anymore
// tell the hidden window we are done
PostMessageToWnd(CHiddenWnd::s_ThreadDoneNotificationMessage,
m_currWParamCookie, (LPARAM)hr);
// tell the dispatcher thread we are done processing
::PostThreadMessage(m_nParentThreadID, DISPATCH_THREAD_DONE_MSG, m_nThreadID,0);
m_currWParamCookie = 0; // reset
}
else if (msg.message == THREAD_SHUTDOWN_MSG)
{
TRACE(_T("CWorkerThread::Run() got THREAD_SHUTDOWN_MSG\n"));
m_bQuit = TRUE;
}
else
{
// unknown message, just let it through
::DispatchMessage(&msg);
}
} // while
TRACE(_T("CWorkerThread::Run() is terminating\n"));
return ExitInstance();
}
void CWorkerThread::PostExitNotification()
{
// we are finally done shutting down, let the main thread know
// that we are going down
::PostThreadMessage(m_nParentThreadID, THREAD_SHUTDOWN_ACK_MSG, m_nThreadID,0);
TRACE(_T("CWorkerThread::PostExitNotification() posted THREAD_SHUTDOWN_ACK_MSG, m_nThreadID = 0x%x\n"),
m_nThreadID);
}
void CWorkerThread::AddToQueryResult(CUINode* pUINode)
{
ASSERT(!m_bQuit);
if (m_pCurrentQueryResult == NULL)
{
m_pCurrentQueryResult = new CThreadQueryResult;
}
ASSERT(m_pCurrentQueryResult != NULL);
m_pCurrentQueryResult->m_nodeList.AddTail(pUINode);
if (m_pCurrentQueryResult->m_nodeList.GetCount() > m_nMaxQueueLength)
SendCurrentQueryResult();
// check to see if we are forced to abort
MSG msg;
if (::PeekMessage(&msg,(HWND)-1,THREAD_SHUTDOWN_MSG, THREAD_SHUTDOWN_MSG,
PM_REMOVE))
{
TRACE(_T("CWorkerThread::AddToQueryResult() got THREAD_SHUTDOWN_MSG\n"));
m_bQuit = TRUE;
}
}
void CWorkerThread::SendCurrentQueryResult()
{
if(m_pCurrentQueryResult != NULL)
{
// wParam has the cookie, that we just ship back
PostMessageToWnd(CHiddenWnd::s_ThreadHaveDataNotificationMessage,
m_currWParamCookie, reinterpret_cast<LPARAM>(m_pCurrentQueryResult));
m_pCurrentQueryResult = NULL;
}
}