First of all we need a couple of test images:
# import numpy from StringIO import StringIO I0 = numpy.loadtxt(StringIO(""" 0 0 0 0 0 0 0 0.5000 0 0 0 0 1.0000 0 0 0 0 0.5000 0 0 0 0 0 0 0""") ) I1 = numpy.loadtxt(StringIO(""" 0 0 0 0 0 0 0.5 0 0 0 0 1.0 0 0 0 0 0.5 0 0 0 0 0 0 0 0 """) )Define initial horiozontal and vertical components of optical flow
u = numpy.zeros_like(I0); v = numpy.zeros_like(I0);Lets write class for making warps. As OF usually deals only with small displacements, we need iterative estimation: estimate, shift image by found vectors, find again.
class Warper: def `__init__`(self, shape, u0, v0, I0, I1, display = False): """ shape - shape of input function, u0, v0 - starting values of flow I0, I1 - images to compute flow between """ # saving dimensions self.M, self.N = shape[0], shape[1] # save initial estimation self.u, self.v = u0.copy(), v0.copy() # grid of coordinates, required further self.idx, self.idy = np.meshgrid(np.arange(self.N), np.arange(self.M)) # filter for partial derivatives computation self.mask = np.array([1, -8, 0, 8, -1], ndmin=2)/12.0; # flag of debug information output self.display = display self.counter = 0 # copy of images self.I0, self.I1 = I0.copy(), I1.copy() # here we create an instance of training function. On each step we will approach to minimum by gradient descent self.train = TrainFunctionSimple(u0, v0, rate=0.1) # main function def warp(self): if self.display: print 'Warp %d' % (self.counter,) # initial value u0, v0 = self.u.copy(), self.v.copy() # Ends of motion vectors. From these points we will "compensate" motion idxx = self.idx + u0 idyy = self.idy + v0 # get linearly interpolated values from (idxx, idyy) pixels of I1 I1warped = interp2linear(self.I1, idxx, idyy) # just debug output if self.display: print "I1warped", I1warped print "I0", I0 print "u0", u0 print "v0", v0 pass It = (I1warped - self.I0) print "It", It #My first wrong version (it gives no converging, something like continuing initial vectors to infinity during warps) (***) #Ix = ndimage.correlate(self.I1, self.mask, mode='nearest') #Iy = ndimage.correlate(self.I1, self.mask.T, mode='nearest') #Much better is: Ix = ndimage.correlate(I1warped, self.mask, mode='nearest') Iy = ndimage.correlate(I1warped, self.mask.T, mode='nearest') # boundary handling m = (idxx > self.N - 1) | (idxx < 0) | (idyy > self.M - 1) | (idyy < 0) Ix[m] = 0.0 Iy[m] = 0.0 It[m] = 0.0 self.Ix = Ix self.Iy = Iy self.It = It self.train.init(np.zeros_like(self.I0), np.zeros_like(self.I0)) for i_sgd in range(120): print self.train.step(Ix, Iy, It), self.u += self.train.tu.get_value() self.v += self.train.tv.get_value() self.counter += 1Now describing TrainFunction. Well design is not good yet
# class TrainFunction(object): def `__init__`(self, u0, v0, rate): self.rate = rate self.tu = theano.shared(u0,name='tu') self.tv = theano.shared(v0,name='tv') self.tIx = T.matrix("tIx") self.tIy = T.matrix("tIy") self.tIt = T.matrix("tIt") self.gu, self.gv = None, None self.E = None self.train_function = self.get_function() # this function must be overloaded in the derived classes def get_energy(self): raise Exception("Non implemented") # construct Theano-function for gradient descent def get_function(self): if self.E is None: self.E = self.get_energy() if self.gu is None or self.gv is None: self.gu, self.gv = T.grad(self.E, [self.tu, self.tv]) train_function = theano.function( inputs=[self.tIx, self.tIy, self.tIt], outputs=[self.E], updates=((self.tu, self.tu - self.rate * self.gu), (self.tv, self.tv - self.rate * self.gv)), allow_input_downcast=True) return train_function # initialization of flow values def init(self, u0, v0): self.tu.set_value(u0) self.tv.set_value(v0) # launching step of gradiennt descent def step(self, *args): return self.train_function(*args)
#finally launchingclass TrainFunctionSimple(TrainFunction): def
__init__
(self, *args, **kwargs): self.alpha = kwargs.get(‘alpha’, 1.1)super(self.`__class__`, self).`__init__`(*args, **kwargs) # constructs Theano-function, that calculate Energy def get_energy(self,): # data term Edata = T.sum( ( self.tIx * self.tu + self.tIy * self.tv + self.tIt ) ** 2 ) # regularization term Ereg = T.sum( (self.tu)**2 + (self.tv)**2 ) return Edata+self.alpha*Ereg
wrpr = Warper( I0.shape, numpy.zeros_like(I0), numpy.zeros_like(I0), I0, I1, display=True) warps = 5 for i in range(warps): wrpr.warp()Printing the results:
numpy.set_printoptions(precision=3,) print wrpr.u print wrpr.v
[[ 0. 0. 0. 0. 0. ]Well, not precise enough. However the direction is right.[ 0. 0. -0.523 0. 0. ] [ 0. 0. -0.941 0. 0. ] [ 0. 0. -0.523 0. 0. ] [ 0. 0. 0. 0. 0. ]] [[ 0. 0. 0. 0. 0. ] [ 0. -0.692 0. 0. 0. ] [ 0. 0. 0. 0. 0. ] [ 0. 0.692 0. 0. 0. ] [ 0. 0. 0. 0. 0. ]]
UPDATE: due to my error in (***), see code, results and images below may have no sense
Some errors
In this implementation we estimate how to move I1 to match I0. So vectors points from locations of I0 to points of I1. So spatial derivatives must be calculated from I1, not I0:Ix = ndimage.correlate(self.I1, self.mask, mode='nearest') Iy = ndimage.correlate(self.I1, self.mask.T, mode='nearest')If you write instead:
Ix = ndimage.correlate(self.I0, self.mask, mode='nearest') Iy = ndimage.correlate(self.I0, self.mask.T, mode='nearest')you will get pressy strange results. Input images I0 and I1: