/* * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de. * Distributed under the terms of the MIT License. */ #ifndef WORKER_H #define WORKER_H #include #include #include #include class Job; class Worker; enum job_state { JOB_STATE_UNSCHEDULED, JOB_STATE_WAITING, JOB_STATE_ACTIVE, JOB_STATE_ABORTED, JOB_STATE_FAILED, JOB_STATE_SUCCEEDED }; enum job_wait_status { JOB_DEPENDENCY_NOT_FOUND, JOB_DEPENDENCY_SUCCEEDED, JOB_DEPENDENCY_FAILED, JOB_DEPENDENCY_ABORTED, JOB_DEPENDENCY_ACTIVE // internal only }; struct JobKey { void* object; uint32 type; JobKey(void* object, uint32 type) : object(object), type(type) { } JobKey(const JobKey& other) : object(other.object), type(other.type) { } JobKey& operator=(const JobKey& other) { object = other.object; type = other.type; return *this; } bool operator==(const JobKey& other) const { return object == other.object && type == other.type; } size_t HashValue() const { return (size_t)(addr_t)object ^ (size_t)type; } }; class JobListener { public: virtual ~JobListener(); virtual void JobDone(Job* job); virtual void JobFailed(Job* job); virtual void JobAborted(Job* job); }; typedef DoublyLinkedList JobList; class Job : public DoublyLinkedListLinkImpl, public HashTableLink { public: Job(); virtual ~Job(); virtual JobKey Key() const = 0; virtual status_t Do() = 0; Worker* GetWorker() const { return fWorker; } job_state State() const { return fState; } protected: job_wait_status WaitFor(const JobKey& key); private: friend class Worker; private: void SetWorker(Worker* worker); void SetState(job_state state); Job* Dependency() const { return fDependency; } void SetDependency(Job* job); JobList& DependentJobs() { return fDependentJobs; } job_wait_status WaitStatus() const { return fWaitStatus; } void SetWaitStatus(job_wait_status status); status_t AddListener(JobListener* listener); void RemoveListener(JobListener* listener); void NotifyListeners(); private: typedef BObjectList ListenerList; private: Worker* fWorker; job_state fState; Job* fDependency; JobList fDependentJobs; job_wait_status fWaitStatus; ListenerList fListeners; }; class Worker { public: Worker(); ~Worker(); status_t Init(); void ShutDown(); bool Lock() { return fLock.Lock(); } void Unlock() { fLock.Unlock(); } status_t ScheduleJob(Job* job, JobListener* listener = NULL); // always takes over ownership void AbortJob(const JobKey& key); status_t AddListener(const JobKey& key, JobListener* listener); void RemoveListener(const JobKey& key, JobListener* listener); private: friend class Job; struct JobHashDefinition { typedef JobKey KeyType; typedef Job ValueType; size_t HashKey(const JobKey& key) const { return key.HashValue(); } size_t Hash(Job* value) const { return HashKey(value->Key()); } bool Compare(const JobKey& key, Job *value) const { return value->Key() == key; } HashTableLink* GetLink(Job* value) const { return value; } }; typedef OpenHashTable JobTable; private: job_wait_status WaitForJob(Job* waitingJob, const JobKey& key); static status_t _WorkerLoopEntry(void* data); status_t _WorkerLoop(); void _ProcessJobs(Job* finalJob); void _AbortJob(Job* job, bool removeFromTable); void _FinishJob(Job* job); private: BLocker fLock; JobTable fJobs; JobList fUnscheduledJobs; JobList fAbortedJobs; sem_id fWorkToDoSem; thread_id fWorkerThread; volatile bool fTerminating; }; #endif // WORKER_H