cypclass_acthon.pyx 4.43 KB
Newer Older
gsamain's avatar
gsamain committed
1 2 3 4 5 6 7 8
# mode: run
# tag: cpp, cpp11, pthread
# cython: experimental_cpp_class_def=True, language_level=2
from libcpp.deque cimport deque

ctypedef deque[ActhonMessageInterface] message_queue_t

cdef extern from "<semaphore.h>" nogil:
9 10
  ctypedef struct sem_t:
    pass
gsamain's avatar
gsamain committed
11 12 13 14 15 16
  int sem_init(sem_t *sem, int pshared, unsigned int value)
  int sem_wait(sem_t *sem)
  int sem_post(sem_t *sem)
  int sem_destroy(sem_t* sem)


17
cdef cypclass BasicQueue(ActhonQueueInterface):
gsamain's avatar
gsamain committed
18 19 20 21 22 23 24 25
  message_queue_t* _queue

  __init__(self):
    self._queue = new message_queue_t()

  __dealloc__(self):
    del self._queue

26
  bint is_empty(const self):
gsamain's avatar
gsamain committed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
    return self._queue.empty()

  void push(self, ActhonMessageInterface message):
    self._queue.push_back(message)
    if message._sync_method is not NULL:
      message._sync_method.insertActivity(message)

  bint activate(self):
    cdef bint one_message_processed
    if self._queue.empty():
      return False
    # Note here that according to Cython refcount conventions,
    # the front() method should have returned a new ref.
    # This is obviously not the case, so if we do nothing
    # we will, at the end of this function, loose a ref on the pointed object
    # (as we will decref the thing pointed by next_message).
    next_message = self._queue.front()
    self._queue.pop_front()
    one_message_processed = next_message.activate()
    if one_message_processed:
      if next_message._sync_method is not NULL:
48 49 50
        next_sync_method = next_message._sync_method
        with wlocked next_sync_method:
          next_sync_method.removeActivity(next_message)
gsamain's avatar
gsamain committed
51 52 53 54 55
    else:
      self._queue.push_back(next_message)
      # Don't forget to incref to avoid premature deallocation
    return one_message_processed

56
cdef cypclass NoneResult(ActhonResultInterface):
gsamain's avatar
gsamain committed
57 58 59 60
  void pushVoidStarResult(self, void* result):
    pass
  void pushIntResult(self, int result):
    pass
61
  void* getVoidStarResult(const self):
gsamain's avatar
gsamain committed
62
    return NULL
63
  int getIntResult(const self):
gsamain's avatar
gsamain committed
64 65
    return 0

66
cdef cypclass WaitResult(ActhonResultInterface):
gsamain's avatar
gsamain committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  union result_t:
    int int_val
    void* ptr
  result_t result
  sem_t semaphore

  __init__(self):
    self.result.ptr = NULL
    sem_init(&self.semaphore, 0, 0)

  __dealloc__(self):
    sem_destroy(&self.semaphore)

  @staticmethod
  ActhonResultInterface construct():
    return WaitResult()

  void pushVoidStarResult(self, void* result):
    self.result.ptr = result
    sem_post(&self.semaphore)

  void pushIntResult(self, int result):
    self.result.int_val = result
    sem_post(&self.semaphore)

92
  result_t _getRawResult(const self):
gsamain's avatar
gsamain committed
93 94 95 96 97 98
    # We must ensure a result exists, but we can let others access it immediately
    # The cast here is a way of const-casting (we're modifying the semaphore in a const method)
    sem_wait(<sem_t*> &self.semaphore)
    sem_post(<sem_t*> &self.semaphore)
    return self.result

99
  void* getVoidStarResult(const self):
gsamain's avatar
gsamain committed
100 101 102
    res = self._getRawResult()
    return res.ptr

103
  int getIntResult(const self):
gsamain's avatar
gsamain committed
104 105 106
    res = self._getRawResult()
    return res.int_val

107
cdef cypclass ActivityCounterSync(ActhonSyncInterface):
gsamain's avatar
gsamain committed
108 109 110 111 112 113 114 115 116 117 118 119 120
  int count
  ActivityCounterSync previous_sync

  __init__(self, ActivityCounterSync prev = <ActivityCounterSync> NULL):
    self.count = 0
    self.previous_sync = prev

  void insertActivity(self, ActhonMessageInterface msg):
    self.count += 1

  void removeActivity(self, ActhonMessageInterface msg):
    self.count -= 1

121
  bint isCompleted(const self):
gsamain's avatar
gsamain committed
122 123
    return self.count == 0

124
  bint isActivable(const self):
gsamain's avatar
gsamain committed
125 126
    cdef bint res = True
    if self.previous_sync is not NULL:
127 128 129
      prev_sync = self.previous_sync
      with rlocked prev_sync:
        res = prev_sync.isCompleted()
gsamain's avatar
gsamain committed
130 131
    return res

132
cdef cypclass A activable:
gsamain's avatar
gsamain committed
133 134 135 136
    int a
    __init__(self):
        self.a = 0
        self._active_result_class = WaitResult.construct
137
        self._active_queue_class = consume BasicQueue()
138
    int getter(const self):
gsamain's avatar
gsamain committed
139 140 141 142 143 144 145 146 147 148
        return self.a
    void setter(self, int a):
        self.a = a

def test_acthon_chain(n):
    """
    >>> test_acthon_chain(42)
    42
    """
    cdef ActhonResultInterface res
149
    cdef locked ActhonQueueInterface queue
gsamain's avatar
gsamain committed
150
    sync1 = ActivityCounterSync()
151 152
    after_sync1 = ActivityCounterSync(sync1)

gsamain's avatar
gsamain committed
153
    obj = A()
154 155
    queue = obj._active_queue_class
    obj_actor = activate(consume obj)
156 157 158 159 160

    # Pushing things in the queue
    obj_actor.setter(sync1, n)
    res = obj_actor.getter(after_sync1)

gsamain's avatar
gsamain committed
161
    # Processing the queue
162 163
    while not queue.is_empty():
        queue.activate()
gsamain's avatar
gsamain committed
164
    print <int> res