Median is the middle point in sorted order.

So, we keep track of the middle point by keeping left and right halves of the values in two heaps: (1) max heap for the left half (2) min heap for the right half. So, the tops of these two heaps determine the median at all times.

Between the heaps we maintain the two invariants:
.
- Insert
numin the correct heap comparing it againsttop
- Insert
.
- Fix length of the two heaps
| Operation | Time | Space |
__init__ | ||
addNum | ||
findMedian |
from heapq import heappush as push, heappop as pop
class MedianFinder:
def __init__(self):
self.left_maxq, self.right_minq = [], []
def addNum(self, num: int) -> None:
if not self.left_maxq or num <= -self.left_maxq[0]:
push(self.left_maxq, -num)
else:
push(self.right_minq, num)
if len(self.left_maxq) < len(self.right_minq):
push(self.left_maxq, -pop(self.right_minq))
elif len(self.left_maxq) - len(self.right_minq) > 1:
push(self.right_minq, -pop(self.left_maxq))
def findMedian(self) -> float:
if len(self.left_maxq) > len(self.right_minq):
return -self.left_maxq[0]
else:
return 0.5 * (-self.left_maxq[0] + self.right_minq[0])
# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()
Follow up:
If all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?
Since num can have only about hundred distinct values, we can keep these distinct values in the heaps — making push and pop constant time operations. However, len(heap) would not give the actual count of numbers in a half. We need to maintain the actual total sizes of the heaps alongside the frequency of each distinct value.
| Operation | Time | Space |
__init__ | ||
addNum | ||
findMedian |
from heapq import heappush as push, heappop as pop
class Heap:
def __init__(self, is_max_heap):
self.size = 0
self.map = {}
self.heap = []
self.is_max_heap = is_max_heap
def push(self, num):
self.size += 1
if num not in self.map:
self.map[num] = 1
else:
self.map[num] += 1
if self.map[num] == 1:
push(self.heap, -num if self.is_max_heap else num)
def pop(self):
top = -self.heap[0] if self.is_max_heap else self.heap[0]
self.size -= 1
self.map[top] -= 1
if self.map[top] == 0:
del self.map[top]
pop(self.heap)
return top
def top(self):
return -self.heap[0] if self.is_max_heap else self.heap[0]
def __bool__(self):
return self.size > 0
def __len__(self):
return self.size
class MedianFinder:
def __init__(self):
self.left_heap = Heap(is_max_heap=True)
self.right_heap = Heap(is_max_heap=False)
def addNum(self, num: int) -> None:
if not self.left_heap or num <= self.left_heap.top():
self.left_heap.push(num)
else:
self.right_heap.push(num)
if len(self.left_heap) < len(self.right_heap):
self.left_heap.push( self.right_heap.pop() )
elif len(self.left_heap) - len(self.right_heap) > 1:
self.right_heap.push( self.left_heap.pop() )
def findMedian(self) -> float:
if len(self.left_heap) > len(self.right_heap):
return self.left_heap.top()
else:
return 0.5 * (self.left_heap.top() + self.right_heap.top())
# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()
If 99% of all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?
The distribution looks like below:

We can keep three heaps per half:
left_tail_heap: Regular heap containingnum< 0heap: Distinct value heap containing 0 <=num<= 100right_tail_heap: Regular heap containingnum> 100
For left half, the pop() and top() should poll in the sequence: right_tail_heap, then heap, then left_tail_heap. For right half, the sequence is reversed.
In that way, majority of the time the heap push/pop will be constant time operations.
from heapq import heappush as push, heappop as pop
class Heap:
def __init__(self, is_max_heap):
self.size = 0
self.map = {}
self.left_tail_heap, self.heap, self.right_tail_heap = [], [], []
self.is_max_heap = is_max_heap
def push(self, num):
self.size += 1
if num < 0:
push(self.left_tail_heap, -num if self.is_max_heap else num)
elif 0 <= num <= 100:
if num not in self.map:
self.map[num] = 1
else:
self.map[num] += 1
if self.map[num] == 1:
push(self.heap, -num if self.is_max_heap else num)
else:
push(self.right_tail_heap, -num if self.is_max_heap else num)
def pop(self):
self.size -= 1
if self.is_max_heap:
if self.right_tail_heap:
return -pop(self.right_tail_heap)
elif self.heap:
top = -self.heap[0]
self.map[top] -= 1
if self.map[top] == 0:
del self.map[top]
pop(self.heap)
return top
else:
return -pop(self.left_tail_heap)
else:
if self.left_tail_heap:
return pop(self.left_tail_heap)
elif self.heap:
top = self.heap[0]
self.map[top] -= 1
if self.map[top] == 0:
del self.map[top]
pop(self.heap)
return top
else:
return pop(self.right_tail_heap)
def top(self):
if self.is_max_heap:
if self.right_tail_heap:
return -self.right_tail_heap[0]
elif self.heap:
return -self.heap[0]
else:
return -self.left_tail_heap[0]
else:
if self.left_tail_heap:
return self.left_tail_heap[0]
elif self.heap:
return self.heap[0]
else:
return self.right_tail_heap[0]
def __bool__(self):
return self.size > 0
def __len__(self):
return self.size
class MedianFinder:
def __init__(self):
self.left_heap = Heap(is_max_heap=True)
self.right_heap = Heap(is_max_heap=False)
def addNum(self, num: int) -> None:
if not self.left_heap or num <= self.left_heap.top():
self.left_heap.push(num)
else:
self.right_heap.push(num)
if len(self.left_heap) < len(self.right_heap):
self.left_heap.push(self.right_heap.pop())
elif len(self.left_heap) - len(self.right_heap) > 1:
self.right_heap.push(self.left_heap.pop())
def findMedian(self) -> float:
if len(self.left_heap) > len(self.right_heap):
return self.left_heap.top()
else:
return 0.5 * (self.left_heap.top() + self.right_heap.top())
# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()
Leave a comment