purebasic.info

PureBasic forum
Текущее время: Чт июл 19, 2018 6:44 pm

Часовой пояс: UTC + 3 часа




Начать новую тему Ответить на тему  [ Сообщений: 3 ] 
Автор Сообщение
 Заголовок сообщения: Конвейера и потоки (модуль)
СообщениеДобавлено: Ср янв 28, 2015 8:18 am 
Не в сети
профессор

Зарегистрирован: Чт сен 22, 2011 6:21 pm
Сообщений: 231
Благодарил (а): 31 раз.
Поблагодарили: 22 раз.
Пункты репутации: 0
Много поточное программирование, оно не то что-бы сложно и непонятно, просто очень легко запутаться, и многабукав на синхронизацию.
И вот для упрощения, облегчения и во избежание, написал такой модуль, оказалось удобно, решил поделиться.
1) Реализована концепция очередей сообщений "конвейеров" которые нужны для буферизации асинхронного обмена данными между потоками (сообщение может быть как простой переменной, так и структурной).
2) Реализован пул потоков (поскольку запуск потока операция достаточно время-затратная, принято заранее запускать несколько потоков ждущих задания из входной очереди, и складывающих результат работы в выходную очередь)
3) Одиночные глобальные переменные для синхронного доступа из разных потоков (макросы для сокращения количества строк типа LockMutex(...) и UnLockMutex(...))
workerModule.pbi
Код:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
;*********************************
; Module:   WORKER
; Author:   void
; Compiler: PB524-PB531
; Ver:      003
;
; конвейера и потоки
;
; ChangeLog:
; 002 + более осмысленные имена параметров
;     + синхронизированные глобальные переменные
; 003 + входной и выходной конвейера рабочего потока, могут быть разных структур
;*********************************
DeclareModule WORKER
  EnableExplicit
  Structure BaseWorkerStruct
    MutexIn.i
    MutexOu.i
    MutexCou.i
    ActiveCount.i
    Semaphore.i
    Array Thr.i(0)
  EndStructure
 
  ;количество доступных ядер
  Macro MAX_CPU
    CountCPUs(#PB_System_ProcessCPUs)
  EndMacro
 
  ;*************************************************************************************
  ;создание 2-х конвейеров(очередей), входного и выходного (на основе List)
  ;запуск произвольного количества рабочих потоков ждущих задания из входного конвейера
  ;результат помещается в выходной конвейер (конвейера синхронизированы)
  ;*************************************************************************************
 
  ;создает глобальную переменную управляющей структуры
  ;располагать в программе на уровне главного потока, после структуры очереди, перед процедурой потока)
  Macro DEF(WorkerName, WorkerDataTypeIn, WorkerDataTypeOu)
    UseModule WORKER
    Structure WorkerName Extends BaseWorkerStruct
      List WrkListIn.WorkerDataTypeIn()
      List WrkListOu.WorkerDataTypeOu()
    EndStructure
    UnuseModule WORKER
    Global WorkerName.WorkerName
  EndMacro
 
  ;инициализирует поля управляющей переменной, конвейера, запускает рабочие потоки
  Macro NEW(WorkerName, WrkProc, ThrCount=1, Priority=0)
    WorkerName\MutexIn   = CreateMutex()
    WorkerName\MutexOu   = CreateMutex()
    WorkerName\MutexCou  = CreateMutex()
    WorkerName\Semaphore = CreateSemaphore()
    Dim WorkerName\Thr(ThrCount-1)
    NewList WorkerName\WrkListIn()
    NewList WorkerName\WrkListOu()
    Define WorkerName#Temp
    For WorkerName#Temp = 0 To ThrCount-1
      Repeat
        WorkerName\Thr(WorkerName#Temp) = CreateThread(@WrkProc(), 0)
        CompilerIf Priority
          If WorkerName\Thr(WorkerName#Temp)
            ThreadPriority(WorkerName\Thr(WorkerName#Temp), Priority)
          EndIf
        CompilerEndIf
        Delay(2)  ; создание потока достаточно тяжелая (для системы) операция, нужно дать ей время
      Until WorkerName\Thr(WorkerName#Temp)
    Next
  EndMacro
 
  ;для процедуры рабочего потока.
  ;ждет задание во входном конвейере и помещает его в переменную WrkElementIn
  ;по окончании переменная WrkElementOu помещается в выходной конвейер (если NoOut=#False)
  ;если сигнал выхода (от DESTROY) - генерирует Break (выход из бесконечного цикла потока)
  Macro WAIT_WORK(WorkerName, WrkElementIn, WrkElementOu, NoOut=#False)
    Define WorkerName#NoFirstPass
    If WorkerName#NoFirstPass
      CompilerIf NoOut=#False
        LockMutex(WorkerName\MutexOu)
        LastElement(WorkerName\WrkListOu())
        AddElement(WorkerName\WrkListOu())
        WorkerName\WrkListOu()=WrkElementOu
        UnlockMutex(WorkerName\MutexOu)
      CompilerEndIf
      LockMutex(WorkerName\MutexCou)
      WorkerName\ActiveCount-1
      UnlockMutex(WorkerName\MutexCou)
    Else
      WorkerName#NoFirstPass=#True
    EndIf
    WaitSemaphore(WorkerName\Semaphore)
    LockMutex(WorkerName\MutexIn)
    If ListSize(WorkerName\WrkListIn())=0
      UnlockMutex(WorkerName\MutexIn)
      Break
    EndIf
    FirstElement(WorkerName\WrkListIn())
    WrkElementIn=WorkerName\WrkListIn()
    DeleteElement(WorkerName\WrkListIn())
    UnlockMutex(WorkerName\MutexIn)
  EndMacro
 
  ;ждет завершения выполнения всех заданий очереди (из главного потока)
  Macro WAIT_END(WorkerName)
    Repeat
      LockMutex(WorkerName\MutexCou)
      If WorkerName\ActiveCount=0
        UnlockMutex(WorkerName\MutexCou)
        Break
      EndIf
      UnlockMutex(WorkerName\MutexCou)
      Delay(10)
    ForEver
  EndMacro
 
  ;добавляет новое задание в начало очереди (срочное)
  Macro ADD_FIRST(WorkerName, WrkElementIn)
    LockMutex(WorkerName\MutexCou)
    WorkerName\ActiveCount+1
    UnlockMutex(WorkerName\MutexCou)    
    LockMutex(WorkerName\MutexIn)
    ResetList(WorkerName\WrkListIn())
    AddElement(WorkerName\WrkListIn())
    WorkerName\WrkListIn()=WrkElementIn
    UnlockMutex(WorkerName\MutexIn)
    SignalSemaphore(WorkerName\Semaphore)
  EndMacro
 
  ;добавляет новое задание в конец очереди
  Macro ADD_LAST(WorkerName, WrkElementIn)
    LockMutex(WorkerName\MutexCou)
    WorkerName\ActiveCount+1
    UnlockMutex(WorkerName\MutexCou)    
    LockMutex(WorkerName\MutexIn)
    LastElement(WorkerName\WrkListIn())
    AddElement(WorkerName\WrkListIn())
    WorkerName\WrkListIn()=WrkElementIn
    UnlockMutex(WorkerName\MutexIn)
    SignalSemaphore(WorkerName\Semaphore)
  EndMacro
 
  ;читает выходную очередь (удаляя прочитанное из очереди), если есть данные - FoundFlag=#True
  ;(для главного потока)
  Macro GET_OUT(WorkerName, WrkElementOu, FoundFlag)
    LockMutex(WorkerName\MutexOu)
    If ListSize(WorkerName\WrkListOu())
      FirstElement(WorkerName\WrkListOu())
      WrkElementOu=WorkerName\WrkListOu()
      DeleteElement(WorkerName\WrkListOu())
      UnlockMutex(WorkerName\MutexOu)
      FoundFlag=#True
    Else
      UnlockMutex(WorkerName\MutexOu)
      FoundFlag=#False
    EndIf
  EndMacro
 
  ;как предыдущий, но при отсутствии генерит Break (для циклов)
  Macro GET_OUT_DO(WorkerName, WrkElementOu)
    LockMutex(WorkerName\MutexOu)
    If ListSize(WorkerName\WrkListOu())
      FirstElement(WorkerName\WrkListOu())
      WrkElementOu=WorkerName\WrkListOu()
      DeleteElement(WorkerName\WrkListOu())
      UnlockMutex(WorkerName\MutexOu)
    Else
      UnlockMutex(WorkerName\MutexOu)
      Break
    EndIf
  EndMacro
 
  ;сигнал на остановку рабочих потоков (завершаются при окончании очереди заданий)
  ;ожидание завершения потоков, очистка управляющих структур.и очередей
  ;Clear=#True - принудительная очистка входной очереди заданий, перед сигналом окончания работы.
  Macro DESTROY(WorkerName, Clear=#False)
    Define WorkerName#Temp
    If Clear
      LockMutex(WorkerName\MutexIn)
      ClearList(WorkerName\WrkListIn())
      UnlockMutex(WorkerName\MutexIn)
    EndIf
    For WorkerName#Temp=0 To ArraySize(WorkerName\Thr())
      SignalSemaphore(WorkerName\Semaphore)
    Next
    For WorkerName#Temp=0 To ArraySize(WorkerName\Thr())
      WaitThread(WorkerName\Thr(WorkerName#Temp))
    Next
    WorkerName\ActiveCount=0
    FreeList(WorkerName\WrkListIn())
    FreeList(WorkerName\WrkListOu())
    FreeArray(WorkerName\Thr())
    FreeSemaphore(WorkerName\Semaphore)
    FreeMutex(WorkerName\MutexIn)
    FreeMutex(WorkerName\MutexOu)
    FreeMutex(WorkerName\MutexCou)
  EndMacro
 
 
  ;*************************************************************************************
  ;просто синхронизируемые конвейеры на основе списка (для общения с перманентными потоками)
  ;*************************************************************************************
 
  ;создает новый конвейер, структуры BachDataType
  Macro BATCH_NEW(BatchName, BachDataType)
    Global BatchName#Mutex=CreateMutex()
    Global NewList BatchName#List.BachDataType()
  EndMacro
 
  ;добавляет элемент в начало (срочный)
  Macro BATCH_ADD_FIRST(BatchName, BatchElement)
    LockMutex(BatchName#Mutex)
    ResetList(BatchName#List())
    AddElement(BatchName#List())
    BatchName#List()=BatchElement
    UnlockMutex(BatchName#Mutex)
  EndMacro
 
  ;добавляет элемент в конец
  Macro BATCH_ADD_LAST(BatchName, BatchElement)
    LockMutex(BatchName#Mutex)
    LastElement(BatchName#List())
    AddElement(BatchName#List())
    BatchName#List()=BatchElement
    UnlockMutex(BatchName#Mutex)
  EndMacro
 
  ;вытаскивает первый элемент конвейера в переменную
  Macro BATCH_GET(BatchName, BatchElement, FoundFlag)
    LockMutex(BatchName#Mutex)
    If ListSize(BatchName#List())
      FirstElement(BatchName#List())
      BatchElement=BatchName#List()
      DeleteElement(BatchName#List())
      UnlockMutex(BatchName#Mutex)
      FoundFlag = #True
    Else
      UnlockMutex(BatchName#Mutex)
      FoundFlag = #False
    EndIf
  EndMacro  
 
  ;как предыдущий, но при отсутствии генерит Break (для циклов)
  Macro BATCH_GET_DO(BatchName, BatchElement)
    LockMutex(BatchName#Mutex)
    If ListSize(BatchName#List())
      FirstElement(BatchName#List())
      BatchElement=BatchName#List()
      DeleteElement(BatchName#List())
      UnlockMutex(BatchName#Mutex)
    Else
      UnlockMutex(BatchName#Mutex)
      Break
    EndIf
  EndMacro  
 
  ;удаляет конвейер
  Macro BATCH_DESTROY(BatchName)
    FreeList(BatchName#List())
    FreeMutex(BatchName#Mutex)
  EndMacro
 
  ;*************************************************************************************
  ; синхронизованные глобальные переменные (изменяются - только целиком)
  ;*************************************************************************************
  Macro SYNC_NEW(SyncVarName, VarType)
    Global SyncVarName#Mutex=CreateMutex()
    Global SyncVarName.VarType
  EndMacro
 
  Macro SYNC_GET(SyncVarName, VarOut)
    LockMutex(SyncVarName#Mutex)
    VarOut = SyncVarName
    UnlockMutex(SyncVarName#Mutex)
  EndMacro
 
  Macro SYNC_SET(SyncVarName, VarIn)
    LockMutex(SyncVarName#Mutex)
    SyncVarName = VarIn
    UnlockMutex(SyncVarName#Mutex)    
  EndMacro  
 
EndDeclareModule
Module WORKER
EndModule
 
;Test Module
CompilerIf #PB_Compiler_IsMainFile
 
  ;структура переменных наших конвейеров
  Structure WorkStruc
    msg$
  EndStructure
 
  ;вначале создаем управляющую переменую
  ;(в нашем примере структуры входная и выходная одинаковы, но могут отличаться)
  WORKER::DEF(Worker1, WorkStruc, WorkStruc)
 
  ;создаем дополнительный конвейер для теста
  WORKER::BATCH_NEW(TestBatch, WorkStruc)
 
  ;процедура запускаемая в потоках
  Procedure WorkerProc(*r)
    ;локальные переменные для приема из входного конвеера и отправки в выходной
    ;в нашем случае тип одинаковый, но может и отличаться.
    Protected In.WorkStruc, Ou.WorkStruc
    ;тут можно что либо однократно проинициализировать
    ;....
    ;Главный цикл работника
    ;Worker1 - имя управляющей переменной, In-переменная в которую загружать задание из
    ;входного конвейера, Ou-будет выгружена в выходной конвейер по окончании цикла,
    Repeat
      WORKER::WAIT_WORK(Worker1,In,Ou)
     
      Debug "WorkIn: " + In\msg$  ;что-то делаем
      Ou\msg$ =  In\msg$ + " Worked" ;выгружаем результат работы
     
      Delay(100)                      ;дабы изобразить тяжелую работу
     
      ;дублируем в дополнительный конвейер
      WORKER::BATCH_ADD_LAST(TestBatch, Ou)
     
    ForEver
    ;перед закрытием потока можно что-то сделать
    ;...
  EndProcedure
 
  ;создаем конвейера, запускаем 10 потоков
  WORKER::NEW(Worker1, WorkerProc, 10)
 
  ;переменная структуры заданий
  Define Var.WorkStruc, i
 
  Debug "Начало подачи заданий"
  ;добавляем 100 заданий во входной конвейер
  For i=1 To 100
    Var\msg$=Str(i)
    ;добавляем задание в конец очереди
    WORKER::ADD_LAST(Worker1,Var)
  Next
  Debug "Все задания отправлены"
 
  Debug "Ждем завершения заданий"
  WORKER::WAIT_END(Worker1)  
  Debug "Все задания завершены"
 
  Debug "Читаем выходные данные из выходного конвеера"
  Define tmp.WorkStruc
  Repeat
    WORKER::GET_OUT_DO(Worker1, tmp)
    Debug tmp\msg$
  ForEver
 
  Debug "останавливаем потоки, и удаляем конвейера"
  WORKER::DESTROY(Worker1)
  Debug "Рабочие потоки удалены"
 
  Debug "выбираем сообщения из доп конвейера"
  Repeat
    WORKER::BATCH_GET_DO(TestBatch, tmp)
    Debug tmp\msg$
  ForEver
  Debug "Дополнительный конвейер пуст"
 
  Debug "удаляем дополнительный конвейер"
  WORKER::BATCH_DESTROY(TestBatch)
 
  Debug ""
  Debug "Новая синхронная переменная `perem`"
  WORKER::SYNC_NEW(perem, i)
  Debug "perem + 1"
  WORKER::SYNC_SET(perem, perem+1)
  Debug "perem + 1"
  WORKER::SYNC_SET(perem, perem+1)
  WORKER::SYNC_GET(perem, temp)
  Debug "perem = " + Str(temp)
 
CompilerEndIf
 



Последний раз редактировалось Kuzmat Пн авг 10, 2015 4:11 am, всего редактировалось 2 раз(а).

Вернуться наверх
 Профиль  
 
 Заголовок сообщения: Re: Конвейера и потоки (модуль)
СообщениеДобавлено: Сб апр 04, 2015 12:46 pm 
Не в сети
профессор

Зарегистрирован: Чт сен 22, 2011 6:21 pm
Сообщений: 231
Благодарил (а): 31 раз.
Поблагодарили: 22 раз.
Пункты репутации: 0
Ver: 002
! более осмысленные имена параметров
+ синхронизованные глобальные переменные

ps. добавил небольшое описание. (первый пост обновлен)


Вернуться наверх
 Профиль  
 
 Заголовок сообщения: Re: Конвейера и потоки (модуль)
СообщениеДобавлено: Пн авг 10, 2015 4:10 am 
Не в сети
профессор

Зарегистрирован: Чт сен 22, 2011 6:21 pm
Сообщений: 231
Благодарил (а): 31 раз.
Поблагодарили: 22 раз.
Пункты репутации: 0
Ver: 003
+ входной и выходной конвейера рабочего потока, могут быть разных структур
(первый пост обновлен)


Вернуться наверх
 Профиль  
 
Показать сообщения за:  Сортировать по:  
Начать новую тему Ответить на тему  [ Сообщений: 3 ] 

Часовой пояс: UTC + 3 часа


Кто сейчас на форуме

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 1


Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете добавлять вложения

Найти:
Перейти:  
Создано на основе phpBB® Forum Software © phpBB Group (блог о phpBB)
Сборка создана CMSart Studio
Русская поддержка phpBB