import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from .rpn_msr.proposal_layer import ProposalLayer
from .rpn_msr.anchor_target_layer import AnchorTargerLayer
from rpn_msr.proposal_target_layer import ProposalTargetLayer
from .network import vgg16, Conv2d, np_to_tensor, FC, smooth_l1_loss
from roi_pooling.modules.roi_pool import RoIPool
from .fastrcnn.bbox_transform import bbox_transform_inv, clip_boxes
from .fastrcnn.nms_wrapper import nms
from PIL import Image
from torchvision import transforms
import logging
logger = logging.getLogger("root")
logger.setLevel(logging.DEBUG)
def nms_detections(pred_boxes, scores, nms_thresh, inds=None):
dets = np.hstack((pred_boxes,
scores[:, np.newaxis])).astype(np.float32)
keep = nms(dets, nms_thresh)
if inds is None:
return pred_boxes[keep], scores[keep]
return pred_boxes[keep], scores[keep], inds[keep]
[docs]class RPN(nn.Module):
"""Generate region proposals, shares computation with the object detection network.
Attributes
----------
anchor_scales : list
The scale of each anchor on particular point on feature maps.
anchor_target_layer : :class:`faster_rcnn.rpn_msr.anchor_target_layer.AnchorTargerLayer`
Calculate network target base on anchors and ground truth boxes.
bbox_conv : :class:`torch.nn.module`
Proposals coordinate refine predictor
conv1 : :class:`torch.nn.module`
Probability that anchors contains object predictor
cross_entropy : int
Cross entropy loss.
features : :class:`torch.nn.module`
Backbone network, that share computation with object detection network
loss_box : int
Box coordinate refine loss.
proposal_layer : :class:`faster_rcnn.rpn_msr.proposal_layer.ProposalLayer`
Create proposals base on generated anchors and bbox refine values.
score_conv : TYPE
Description
"""
_feat_stride = [16, ]
anchor_scales = [4, 8, 16, 32]
[docs] def __init__(self):
super(RPN, self).__init__()
self.features = vgg16()
self.features = nn.DataParallel(self.features)
self.conv1 = nn.DataParallel(Conv2d(512, 512, 3, same_padding=True))
self.score_conv = nn.DataParallel(Conv2d(
512, len(self.anchor_scales) * 3 * 2, 1, relu=False))
self.bbox_conv = nn.DataParallel(Conv2d(
512, len(self.anchor_scales) * 3 * 4, 1, relu=False))
self.anchor_target_layer = AnchorTargerLayer(
self._feat_stride, self.anchor_scales)
self.proposal_layer = ProposalLayer(
self._feat_stride, self.anchor_scales)
[docs] def _computer_forward(self, im_data):
"""Calculate forward
Parameters
----------
im_data : :class:`torch.tensor`
image as tensor
Returns
-------
(:class:`torch.tensor`, :class:`torch.tensor`, :class:`torch.tensor`)
Return feature map, proposal boxes refine values w.r.t to each anchors, probability that anchors is foreground
"""
features = self.features(im_data) # (N, 512, W, H)
rpn_conv1 = self.conv1(features) # (N, 512, W, H)
# rpn score
rpn_cls_score = self.score_conv(rpn_conv1) # (N, A * 2, W, H)
# rpn boxes
rpn_bbox_pred = self.bbox_conv(rpn_conv1)
return features, rpn_bbox_pred, rpn_cls_score
[docs] def forward(self,
im_data,
im_info, gt_boxes=None, gt_boxes_index=[]):
"""Forward
Parameters
----------
im_data : TYPE
Description
im_info : TYPE
Description
gt_boxes : None, optional
Description
gt_boxes_index : list, optional
Description
Returns
-------
tuple(features, rois)
Return the features map and list of rois.
"""
im_data = im_data.to(torch.device('cuda'))
features, rpn_bbox_pred, rpn_cls_score = self._computer_forward(
im_data)
batch_size = features.shape[0]
# rpn_cls_score : batch ,(num_anchors * 2) , h ,w = 1 , (4 * 3 * 2) , h , w
rpn_cls_score_reshape = rpn_cls_score.view(
batch_size, 2, -1, rpn_cls_score.shape[-1]) # batch , 2 , (num_anchors*h) , w
rpn_cls_prob = F.softmax(rpn_cls_score_reshape, dim=1)
rpn_cls_prob_reshape = rpn_cls_prob.view_as(
rpn_cls_score) # batch , h , w , (num_anchors * 2)
cfg_key = 'TRAIN' if self.training else 'TEST'
rois = self.proposal_layer(rpn_cls_prob_reshape, rpn_bbox_pred,
im_info, cfg_key)
if self.training:
assert gt_boxes is not None
# list GT boxes
target = self.anchor_target_layer(
rpn_cls_score, gt_boxes, gt_boxes_index, im_info)
# self.cross_entropy, self.loss_box = self.build_loss(
# rpn_cls_score_reshape, rpn_bbox_pred, target)
else:
target = None
return features, rois, rpn_cls_prob_reshape, rpn_bbox_pred, target
@staticmethod
def build_loss(rpn_cls_score_reshape, rpn_bbox_pred, target):
# classification loss
rpn_cls_score = rpn_cls_score_reshape.permute(
0, 2, 3, 1).contiguous().view(-1, 2) # batch * h * w * a , 2
rpn_label = target[0].permute(0, 2, 3, 1).contiguous().view(-1)
rpn_keep = torch.tensor(
rpn_label.data.ne(-1).nonzero().squeeze()).cuda()
rpn_cls_score = torch.index_select(rpn_cls_score, 0, rpn_keep)
rpn_label = torch.index_select(rpn_label, 0, rpn_keep)
rpn_cross_entropy = F.cross_entropy(rpn_cls_score, rpn_label)
# box loss
rpn_bbox_targets, rpn_bbox_inside_weights, bbox_outside_weights = target[1:]
rpn_loss_box = smooth_l1_loss(
rpn_bbox_pred, rpn_bbox_targets, rpn_bbox_inside_weights, bbox_outside_weights, sigma=3.0, dim=[1, 2, 3])
return rpn_cross_entropy, rpn_loss_box
def predict_rois(self, im_data, im_info):
self.eval()
_, rois = self(im_data, im_info)
return rois
class FastRCNN(nn.Module):
"""docstring for FasterRCNN
Attributes
----------
bbox_fc : TYPE
Description
classes : TYPE
Description
cross_entropy : TYPE
Description
debug : TYPE
Description
fc6 : TYPE
Description
fc7 : TYPE
Description
loss_box : TYPE
Description
MAX_SIZE : int
Description
n_classes : TYPE
Description
proposal_target_layer : TYPE
Description
roi_pool : TYPE
Description
rpn : TYPE
Description
SCALES : tuple
Description
score_fc : TYPE
Description
"""
SCALES = (600, )
MAX_SIZE = 1000
def __init__(self, classes, debug=False):
super(FastRCNN, self).__init__()
assert classes is not None
self.classes = classes
self.n_classes = len(classes)
# self.features = vgg16()
self.rpn = RPN()
self.proposal_target_layer = ProposalTargetLayer(self.n_classes)
self.roi_pool = RoIPool(7, 7, 1.0 / 16)
self.fc6 = nn.DataParallel(FC(512 * 7 * 7, 4096))
self.fc7 = nn.DataParallel(FC(4096, 4096))
self.score_fc = nn.DataParallel(FC(4096, self.n_classes, relu=False))
self.bbox_fc = nn.DataParallel(
FC(4096, self.n_classes * 4, relu=False))
self.debug = debug
def forward(self, im_data, im_info, gt_boxes=None, gt_boxes_index=[]):
"""Summary
Parameters
----------
im_data : TYPE
Description
im_info : TYPE
Description
gt_boxes : None, optional
Description
gt_boxes_index : list, optional
Description
Returns
-------
TYPE
Description
"""
features, rois, rpn_cls_prob_reshape, rpn_bbox_pred, rpn_target = self.rpn(
im_data, im_info, gt_boxes, gt_boxes_index)
if self.training:
target = self.proposal_target_layer(
rois, gt_boxes, gt_boxes_index)
rois = target[0]
else:
target = None
rois = rois.reshape(-1, 5).type(torch.FloatTensor).to(torch.device("cuda"))
# Roi pool
pooled_features = self.roi_pool(features, rois)
x = pooled_features.view(pooled_features.size()[0], -1)
x = self.fc6(x)
x = F.dropout(x, training=self.training)
x = self.fc7(x)
x = F.dropout(x, training=self.training)
cls_score = self.score_fc(x)
cls_prob = F.softmax(cls_score, dim=1)
bbox_pred = self.bbox_fc(x)
return cls_prob, bbox_pred, rois, cls_score, target, rpn_cls_prob_reshape, rpn_bbox_pred, rpn_target
@staticmethod
def build_loss(cls_score, bbox_pred, target):
label = target[1].squeeze()
fg_cnt = torch.sum(label.data.ne(0))
bg_cnt = label.data.numel() - fg_cnt
ce_weights = torch.ones(cls_score.size()[1])
ce_weights[0] = float(fg_cnt.item()) / bg_cnt.item()
ce_weights = ce_weights.cuda()
cross_entropy = F.cross_entropy(cls_score, label, weight=ce_weights)
bbox_targets, bbox_inside_weights, bbox_outside_weights = target[2:]
loss_box = smooth_l1_loss(
bbox_pred, bbox_targets, bbox_inside_weights, bbox_outside_weights, dim=[1])
return cross_entropy, loss_box
def interpret(self, cls_prob, bbox_pred, rois, im_info, im_shape, nms=True, clip=True, min_score=0.0):
# find class
scores, inds = cls_prob.data.max(1)
scores, inds = scores.cpu().numpy(), inds.cpu().numpy()
keep = np.where((inds > 0) & (scores >= min_score))
scores, inds = scores[keep], inds[keep]
# Apply bounding-box regression deltas
keep = keep[0]
box_deltas = bbox_pred.data.cpu().numpy()[keep]
box_deltas = np.asarray([
box_deltas[i, (inds[i] * 4): (inds[i] * 4 + 4)] for i in range(len(inds))
], dtype=np.float)
boxes = rois.data.cpu().numpy()[keep, 1:5]
if len(boxes) == 0:
return np.array([]), np.array([]),np.array([]),np.array([])
pred_boxes = bbox_transform_inv(
boxes[np.newaxis, :], box_deltas[np.newaxis, :])
if clip:
pred_boxes = clip_boxes(pred_boxes, im_shape)
pred_boxes = pred_boxes[0]
if nms and pred_boxes.shape[0] > 0:
pred_boxes, scores, inds = nms_detections(
pred_boxes, scores, 0.2, inds=inds)
self.classes = np.array(self.classes)
return pred_boxes, scores, self.classes[inds], boxes
def detect(self, image, thr=0.5):
self.eval()
im_data, im_info = self.get_image_blob(image)
cls_prob, bbox_pred, rois, _, _, _, _, _ = self(im_data, im_info[:, :2])
cls_prob = cls_prob.squeeze()
bbox_pred = bbox_pred.squeeze()
pred_boxes, scores, classes, rois = \
self.interpret(
cls_prob, bbox_pred, rois, im_info, im_info[0][:2], min_score=thr, nms=True)
return pred_boxes, scores, classes, rois, im_data
def detect_blob(self, im_data, im_info, thr=0.5):
self.eval()
cls_prob, bbox_pred, rois, _, _, _, _, _ = self(im_data, im_info[:, :2])
cls_prob = cls_prob.squeeze()
bbox_pred = bbox_pred.squeeze()
pred_boxes, scores, classes, rois = \
self.interpret(
cls_prob, bbox_pred, rois, im_info, im_info[0][:2], min_score=thr, nms=True)
return pred_boxes, scores, classes, rois, im_data
def get_image_blob(self, im):
"""Converts an image into a network input.
Parameters
----------
im : ndarray
a color image in BGR order
Returns
-------
blob : ndarray
a data blob holding an image pyramid
im_scale_factors : list
list of image scales (relative to im) used
in the image pyramid
"""
transform = transforms.Compose([
transforms.Resize(600),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [
0.229, 0.224, 0.225])])
img = Image.open(im).convert('RGB')
origin_size = img.size
img = transform(img)
img = img.unsqueeze(0)
target_size = tuple(img.size())
im_info = np.array(
[[float(target_size[2]), float(target_size[3]), 600. / min(origin_size)]])
return img, im_info