threadpool.c 3.96 KB
Newer Older
1
2
#include "kvm/threadpool.h"
#include "kvm/mutex.h"
3
#include "kvm/kvm.h"
4
5
6
7
8
9

#include <linux/kernel.h>
#include <linux/list.h>
#include <pthread.h>
#include <stdbool.h>

10
11
12
static DEFINE_MUTEX(job_mutex);
static DEFINE_MUTEX(thread_mutex);
static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER;
13
14
15
16
17

static LIST_HEAD(head);

static pthread_t	*threads;
static long		threadcount;
18
static bool		running;
19

Lai Jiangshan's avatar
Lai Jiangshan committed
20
static struct thread_pool__job *thread_pool__job_pop_locked(void)
21
{
22
	struct thread_pool__job *job;
23
24
25
26

	if (list_empty(&head))
		return NULL;

27
	job = list_first_entry(&head, struct thread_pool__job, queue);
28
	list_del_init(&job->queue);
29
30
31
32

	return job;
}

Lai Jiangshan's avatar
Lai Jiangshan committed
33
static void thread_pool__job_push_locked(struct thread_pool__job *job)
34
35
36
37
{
	list_add_tail(&job->queue, &head);
}

Lai Jiangshan's avatar
Lai Jiangshan committed
38
static struct thread_pool__job *thread_pool__job_pop(void)
39
{
40
	struct thread_pool__job *job;
41
42

	mutex_lock(&job_mutex);
Lai Jiangshan's avatar
Lai Jiangshan committed
43
	job = thread_pool__job_pop_locked();
44
45
46
47
	mutex_unlock(&job_mutex);
	return job;
}

Lai Jiangshan's avatar
Lai Jiangshan committed
48
static void thread_pool__job_push(struct thread_pool__job *job)
49
50
{
	mutex_lock(&job_mutex);
Lai Jiangshan's avatar
Lai Jiangshan committed
51
	thread_pool__job_push_locked(job);
52
53
54
	mutex_unlock(&job_mutex);
}

55
static void thread_pool__handle_job(struct thread_pool__job *job)
56
57
58
59
60
61
62
63
{
	while (job) {
		job->callback(job->kvm, job->data);

		mutex_lock(&job->mutex);

		if (--job->signalcount > 0)
			/* If the job was signaled again while we were working */
Lai Jiangshan's avatar
Lai Jiangshan committed
64
			thread_pool__job_push(job);
65
66
67

		mutex_unlock(&job->mutex);

Lai Jiangshan's avatar
Lai Jiangshan committed
68
		job = thread_pool__job_pop();
69
70
71
72
73
74
75
76
77
78
79
80
	}
}

static void thread_pool__threadfunc_cleanup(void *param)
{
	mutex_unlock(&job_mutex);
}

static void *thread_pool__threadfunc(void *param)
{
	pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);

81
82
	kvm__set_thread_name("threadpool-worker");

83
	while (running) {
84
		struct thread_pool__job *curjob = NULL;
85
86

		mutex_lock(&job_mutex);
87
		while (running && (curjob = thread_pool__job_pop_locked()) == NULL)
88
			pthread_cond_wait(&job_cond, &job_mutex.mutex);
89
90
		mutex_unlock(&job_mutex);

91
92
		if (running)
			thread_pool__handle_job(curjob);
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
	}

	pthread_cleanup_pop(0);

	return NULL;
}

static int thread_pool__addthread(void)
{
	int res;
	void *newthreads;

	mutex_lock(&thread_mutex);
	newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
	if (newthreads == NULL) {
		mutex_unlock(&thread_mutex);
		return -1;
	}

	threads = newthreads;

	res = pthread_create(threads + threadcount, NULL,
			     thread_pool__threadfunc, NULL);

	if (res == 0)
		threadcount++;
	mutex_unlock(&thread_mutex);

	return res;
}

124
int thread_pool__init(struct kvm *kvm)
125
126
{
	unsigned long i;
127
128
129
	unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN);

	running = true;
130
131
132
133
134
135
136

	for (i = 0; i < thread_count; i++)
		if (thread_pool__addthread() < 0)
			return i;

	return i;
}
137
late_init(thread_pool__init);
138

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
int thread_pool__exit(struct kvm *kvm)
{
	int i;
	void *NUL = NULL;

	running = false;

	for (i = 0; i < threadcount; i++) {
		mutex_lock(&job_mutex);
		pthread_cond_signal(&job_cond);
		mutex_unlock(&job_mutex);
	}

	for (i = 0; i < threadcount; i++) {
		pthread_join(threads[i], NUL);
	}

	return 0;
}
158
late_exit(thread_pool__exit);
159

160
void thread_pool__do_job(struct thread_pool__job *job)
161
{
162
	struct thread_pool__job *jobinfo = job;
163

164
	if (jobinfo == NULL || jobinfo->callback == NULL)
165
166
167
168
		return;

	mutex_lock(&jobinfo->mutex);
	if (jobinfo->signalcount++ == 0)
Lai Jiangshan's avatar
Lai Jiangshan committed
169
		thread_pool__job_push(job);
170
171
	mutex_unlock(&jobinfo->mutex);

172
	mutex_lock(&job_mutex);
173
	pthread_cond_signal(&job_cond);
174
	mutex_unlock(&job_mutex);
175
}
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198

void thread_pool__cancel_job(struct thread_pool__job *job)
{
	bool running;

	/*
	 * If the job is queued but not running, remove it. Otherwise, wait for
	 * the signalcount to drop to 0, indicating that it has finished
	 * running. We assume that nobody is queueing this job -
	 * thread_pool__do_job() isn't called - while this function is running.
	 */
	do {
		mutex_lock(&job_mutex);
		if (list_empty(&job->queue)) {
			running = job->signalcount > 0;
		} else {
			list_del_init(&job->queue);
			job->signalcount = 0;
			running = false;
		}
		mutex_unlock(&job_mutex);
	} while (running);
}