Skip to main content

Working with the Data

Whether you are getting the esports data from the LoL Esports Data Portal, or from the BEDEX, you will most probably want to process it in some way. Here are some tips on how to get started with the game data processing.

The esports game data, that you are receiving from Bayes, follows the unified Data Format. It's best that you understand its structure before reading this article.

Java Implementation Tips

You can refer to the following deserializer and title-specific handlers as examples. These do not include handling of the wrappers at the moment, but they should give you an idea of how to implement it in your setup.:

LiveDataMessageHandlerSimple
@Slf4j
@Builder
public class LiveDataMessageHandlerSimple<T extends HasPayload> implements LiveDataMessageHandler<T> {

private static final int DEFAULT_MATCH_TTL_HOURS = 6;

@Builder.Default
private final ObjectMapper objectMapper = createDefaultObjectMapper();
private final Function<String, LOLEventHandler> lolHandler;
private final Function<String, CSGOEventHandler> csgoHandler;
private final Function<String, DOTAEventHandler> dotaHandler;
@Builder.Default
private final Cache<String, LiveDataCodec> handlerInstances = CacheBuilder
.newBuilder()
.expireAfterAccess(DEFAULT_MATCH_TTL_HOURS, TimeUnit.HOURS)
.build();

private final Class<T> envelopeClass;

@Override
public void handleMessage(String strValue) throws IOException {
consumeMessage(objectMapper.readValue(strValue, envelopeClass));
}

@Override
public void consumeMessage(T msg) throws IOException {
final LiveDataMessage payload = msg.getPayload();
final Duration latency = Duration.between(payload.getCreatedAt(), Instant.now());
log.debug("[{}] received message: {}/{}/{}/{}, latency: {}ms",
payload.getLiveDataMatchUrn(),
payload.getTitle(), payload.getSubject(), payload.getType(), payload.getAction(),
latency.toMillis());

try {
final LiveDataCodec handler = handlerInstances.get(payload.getLiveDataMatchUrn(),
() -> {
final LiveDataCodec codec = createCodec(msg);
// we can do things with msg here
return codec;
});
handler.accept(payload);
} catch(ExecutionException e) {
log.error("failed to create/get codec for {}/{}", payload.getTitle(), payload.getLiveDataMatchUrn());
throw new IOException(e);
}
}

private LiveDataCodec createCodec(T msg) {
final LiveDataMessage payload = msg.getPayload();
final Title title = payload.getTitle();
final String urn = payload.getLiveDataMatchUrn();
switch (title) {
case DOTA:
return new DOTAMessageCodec(objectMapper, dotaHandler.apply(urn));
case CSGO:
return new CSGOMessageCodec(objectMapper, csgoHandler.apply(urn));
case LOL:
return new LOLMessageCodec(objectMapper, lolHandler.apply(urn));
case OW:
return new NotYetImplementedCodec();
default:
return new NotYetImplementedCodec();
}
}

private static ObjectMapper createDefaultObjectMapper() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
objectMapper.registerModule(new JavaTimeModule());
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return objectMapper;
}
}
EsportsEventHandler
public interface EsportsEventHandler {

void onMessage(LiveDataMessage message);

void onControlAnnounce(ControlAnnounceEvent event);

void onControlError(ControlErrorEvent event);
}
CSGOEventHandler
public interface CSGOEventHandler extends EsportsEventHandler {
void onGameState(GameState gameState);

void onStartMap(MapEvent event);

void onEndMap(MapEndEvent event);

void onStartPause(RoundPauseEvent event);

void onEndPause(RoundResumeEvent event);

void onRoundStart(RoundStartEvent event);

void onRoundEnd(RoundEndEvent event);

void onKill(KillEvent event);

void onDeath(DeathEvent event);

void onDamageDealt(DamageDealtEvent event);

void onItemPurchased(ItemPurchaseEvent event);

void onItemThrown(ItemThrowEvent event);

void onBombPlanted(BombPlantedEvent event);

void onBombDefuseStart(BombDefuseStartedEvent event);

void onBombDefuse(BombDefusedEvent event);

void onBombExploded(BombExplodedEvent event);

void onMatchInfo(MatchInfoEvent event);

void onMatchAnnounce(MatchInfoEvent event);

void onRoundRollback(RoundRollbackEvent event);

void onPlayerScoreboard(PlayerScoreboardEvent event);

void onPlayerPositions(PlayerPositionsEvent event);

void onFreezeTimeStarted(FreezeTimeStartedEvent event);

void onFreezeTimeEnded(FreezeTimeEndedEvent event);

void onItemPickup(ItemPickupEvent event);

void onItemDrop(ItemDropEvent event);
}
DOTAEventHandler
public interface DOTAEventHandler extends EsportsEventHandler {
void onDraft(DraftSnapshot draftSnapshot);

void onGameState(GameState gameState);

void onStartMap(MapEvent event);

void onEndMap(MapEndEvent event);

void onStartPause(PauseStartEvent event);

void onEndPause(PauseEndEvent event);

void onKill(HeroDeathEvent event);

void onBuildingsUpdate(BuildingsUpdateEvent event);

void onMatchInfo(MatchInfoEvent event);

void onMatchAnnounce(MatchInfoEvent event);

void onAbilityUse(AbilityUsedEvent event);

void onAncientKill(AncientKillEvent event);

void onAncientSpawn(AncientSpawnEvent event);

void onHeroSpawn(HeroSpawnEvent event);

void onItemDrop(ItemDropEvent event);

void onItemPickUp(ItemPickupEvent event);

void onItemSpawn(ItemSpawnEvent event);

void onItemUse(ItemUseEvent event);

void onMinionDeath(MinionDeathEvent event);

void onPlayerUncloak(PlayerUncloakedEvent event);

void onPlayerCloak(PlayersCloakedEvent event);

void onSpecialKill(SpecialKillEvent event);

void onObjectiveTaken(ObjectiveTakenEvent event);

void onWardDestroy(WardDestroyedEvent event);

void onWardPlace(WardInfo event);

void onWardExpire(WardInfo event);

void onDeniedItem(ItemDenyEvent event);

void onRemovedItem(ItemRemoveEvent event);

void onLevelUp(LevelUpEvent event);

void onHeroBuyback(HeroBuybackEvent event);

void onPositionsUpdate(PlayerPositionsEvent event);

}
LiveDataCodec
public interface LiveDataCodec extends Consumer<LiveDataMessage> {}
MessageCodecAbstract
public abstract class MessageCodecAbstract<T> implements LiveDataCodec {

private final LiveDataCodec defaultCodec = new NotYetImplementedCodec();
private final Map<T, LiveDataCodec> handlers = new HashMap<>();
private final Function<LiveDataMessage, T> keyMapper;

public MessageCodecAbstract(Function<LiveDataMessage, T> keyMapper) {
this.keyMapper = keyMapper;
}

@Override
public void accept(LiveDataMessage liveDataMessage) {
handlers.getOrDefault(keyMapper.apply(liveDataMessage), defaultCodec).accept(liveDataMessage);
}

protected void addHandler(T key, LiveDataCodec codec) {
handlers.put(key, codec);
}
}

CSGOMessageCodec
@Slf4j
public class CSGOMessageCodec extends MessageCodecAbstract<MessageType> {

private final ObjectMapper mapper;
private final CSGOEventHandler handler;

public CSGOMessageCodec(ObjectMapper objectMapper, CSGOEventHandler eventHandler) {
super(LiveDataMessage::getType);
this.mapper = objectMapper;
this.handler = eventHandler;
addHandler(MessageType.INFO, new CSGOInfoCodec());
addHandler(MessageType.SNAPSHOT, new CSGOSnapshotCodec());
addHandler(MessageType.GAME_EVENT, new CSGOGameEventCodec());
}

private <T> T deserialize(JsonNode payload, Class<T> clazz) throws JsonProcessingException {
return mapper.treeToValue(payload, clazz);
}

protected <T> void handle(LiveDataMessage message, Class<T> clazz, Consumer<T> consumer) {
try {
handler.onMessage(message);
final T payload = deserialize(message.getPayload(), clazz);
try {
consumer.accept(payload);
} catch (Exception e) {
log.error("consumer failed: {}", e.getMessage(), e);
}
} catch (JsonProcessingException e) {
log.error("deserialization failed: {}", e.getMessage(), e);
}
}

private class CSGOInfoCodec extends MessageCodecAbstract<MessageSubject> {
public CSGOInfoCodec() {
super(LiveDataMessage::getSubject);
addHandler(MessageSubject.MATCH, new CSGOInfoMatchCodec());
addHandler(MessageSubject.CONTROL, new CSGOInfoControlCodec());
}

private class CSGOInfoMatchCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOInfoMatchCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE, msg -> handle(msg, MatchInfoEvent.class, handler::onMatchInfo));
addHandler(GameEventAction.ANNOUNCE,
msg -> handle(msg, MatchInfoEvent.class, handler::onMatchAnnounce));
addHandler(GameEventAction.ROLLBACK,
msg -> handle(msg, RoundRollbackEvent.class, handler::onRoundRollback));
}
}

private class CSGOInfoControlCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOInfoControlCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.ANNOUNCE,
msg -> handle(msg, ControlAnnounceEvent.class, handler::onControlAnnounce));
addHandler(GameEventAction.ERROR,
msg -> handle(msg, ControlErrorEvent.class, handler::onControlError));
}
}
}

public class CSGOSnapshotCodec extends MessageCodecAbstract<MessageSubject> {
public CSGOSnapshotCodec() {
super(LiveDataMessage::getSubject);
addHandler(MessageSubject.MATCH, new CSGOSnapshotMatchCodec());
addHandler(MessageSubject.PLAYER, new CSGOSnapshotPlayerCodec());
// TODO maybe change player score update to player snapshot
}

private class CSGOSnapshotMatchCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOSnapshotMatchCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE, msg -> handle(msg, GameState.class, handler::onGameState));
}
}

private class CSGOSnapshotPlayerCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOSnapshotPlayerCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE_POSITIONS, msg -> handle(msg, PlayerPositionsEvent.class, handler::onPlayerPositions));
}
}
}

public class CSGOGameEventCodec extends MessageCodecAbstract<MessageSubject> {
public CSGOGameEventCodec() {
super(LiveDataMessage::getSubject);
addHandler(MessageSubject.MATCH, new CSGOGameEventMatchCodec());
addHandler(MessageSubject.TEAM, new CSGOGameEventTeamCodec());
addHandler(MessageSubject.PLAYER, new CSGOGameEventPlayerCodec());
}

private class CSGOGameEventMatchCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOGameEventMatchCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.START_MAP, msg -> handle(msg, MapEvent.class, handler::onStartMap));
addHandler(GameEventAction.END_MAP, msg -> handle(msg, MapEndEvent.class, handler::onEndMap));
addHandler(GameEventAction.START_PAUSE,
msg -> handle(msg, RoundPauseEvent.class, handler::onStartPause));
addHandler(GameEventAction.END_PAUSE, msg -> handle(msg, RoundResumeEvent.class, handler::onEndPause));
addHandler(GameEventAction.START_ROUND,
msg -> handle(msg, RoundStartEvent.class, handler::onRoundStart));
addHandler(GameEventAction.END_ROUND, msg -> handle(msg, RoundEndEvent.class, handler::onRoundEnd));
addHandler(GameEventAction.FREEZE_TIME_STARTED, msg -> handle(msg, FreezeTimeStartedEvent.class, handler::onFreezeTimeStarted));
addHandler(GameEventAction.FREEZE_TIME_ENDED, msg -> handle(msg, FreezeTimeEndedEvent.class, handler::onFreezeTimeEnded));
}
}

private class CSGOGameEventTeamCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOGameEventTeamCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.EXPLODED_BOMB,
msg -> handle(msg, BombExplodedEvent.class, handler::onBombExploded));
}
}

private class CSGOGameEventPlayerCodec extends MessageCodecAbstract<GameEventAction> {
public CSGOGameEventPlayerCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.KILL, msg -> handle(msg, KillEvent.class, handler::onKill));
addHandler(GameEventAction.DIED, msg -> handle(msg, DeathEvent.class, handler::onDeath));
addHandler(GameEventAction.DEALT_DAMAGE,
msg -> handle(msg, DamageDealtEvent.class, handler::onDamageDealt));
addHandler(GameEventAction.PURCHASED_ITEM,
msg -> handle(msg, ItemPurchaseEvent.class, handler::onItemPurchased));
addHandler(GameEventAction.THREW_ITEM,
msg -> handle(msg, ItemThrowEvent.class, handler::onItemThrown));
addHandler(GameEventAction.PICKED_UP_ITEM,
msg -> handle(msg, ItemPickupEvent.class, handler::onItemPickup));
addHandler(GameEventAction.DROPPED_ITEM,
msg -> handle(msg, ItemDropEvent.class, handler::onItemDrop));
addHandler(GameEventAction.PLANTED_BOMB,
msg -> handle(msg, BombPlantedEvent.class, handler::onBombPlanted));
addHandler(GameEventAction.STARTED_DEFUSING_BOMB,
msg -> handle(msg, BombDefuseStartedEvent.class, handler::onBombDefuseStart));
addHandler(GameEventAction.DEFUSED_BOMB,
msg -> handle(msg, BombDefusedEvent.class, handler::onBombDefuse));
addHandler(GameEventAction.UPDATE_SCORE,
msg -> handle(msg, PlayerScoreboardEvent.class, handler::onPlayerScoreboard));
}
}
}
}
DOTAMessageCodec
@Slf4j
public class DOTAMessageCodec extends MessageCodecAbstract<MessageType> {

private final ObjectMapper mapper;
private final DOTAEventHandler handler;

public DOTAMessageCodec(ObjectMapper objectMapper, DOTAEventHandler eventHandler) {
super(LiveDataMessage::getType);
this.mapper = objectMapper;
this.handler = eventHandler;
addHandler(MessageType.INFO, new DOTAInfoCodec());
addHandler(MessageType.SNAPSHOT, new DOTASnapshotCodec());
addHandler(MessageType.GAME_EVENT, new DOTAGameEventCodec());
}

private <T> T deserialize(JsonNode payload, Class<T> clazz) throws JsonProcessingException {
return mapper.treeToValue(payload, clazz);
}

protected <T> void handle(LiveDataMessage message, Class<T> clazz, Consumer<T> consumer) {
try {
handler.onMessage(message);
final T payload = deserialize(message.getPayload(), clazz);
try {
consumer.accept(payload);
} catch (Exception e) {
log.error("consumer failed: {}", e.getMessage(), e);
}
} catch (JsonProcessingException e) {
log.error("deserialization failed: {}", e.getMessage(), e);
}
}

private class DOTAInfoCodec extends MessageCodecAbstract<MessageSubject> {
public DOTAInfoCodec() {
super(LiveDataMessage::getSubject);
addHandler(MessageSubject.MATCH, new DOTAInfoMatchCodec());
addHandler(MessageSubject.CONTROL, new DOTAInfoControlCodec());
}

private class DOTAInfoMatchCodec extends MessageCodecAbstract<GameEventAction> {
public DOTAInfoMatchCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE, msg -> handle(msg, MatchInfoEvent.class, handler::onMatchInfo));
addHandler(GameEventAction.ANNOUNCE,
msg -> handle(msg, MatchInfoEvent.class, handler::onMatchAnnounce));
}
}

private class DOTAInfoControlCodec extends MessageCodecAbstract<GameEventAction> {
public DOTAInfoControlCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.ANNOUNCE,
msg -> handle(msg, ControlAnnounceEvent.class, handler::onControlAnnounce));
addHandler(GameEventAction.ERROR,
msg -> handle(msg, ControlErrorEvent.class, handler::onControlError));
}
}
}

public class DOTASnapshotCodec extends MessageCodecAbstract<MessageSubject> {
public DOTASnapshotCodec() {
super(LiveDataMessage::getSubject);
addHandler(MessageSubject.MATCH, new DOTASnapshotMatchCodec());
addHandler(MessageSubject.TEAM, new DOTASnapshotTeamCodec());
addHandler(MessageSubject.PLAYER, new DOTASnapshotPlayerCodec());
}

private class DOTASnapshotMatchCodec extends MessageCodecAbstract<GameEventAction> {
public DOTASnapshotMatchCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE, msg -> handle(msg, GameState.class, handler::onGameState));
}
}

private class DOTASnapshotTeamCodec extends MessageCodecAbstract<GameEventAction> {
public DOTASnapshotTeamCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE, msg -> handle(msg, DraftSnapshot.class, handler::onDraft));
}
}

private class DOTASnapshotPlayerCodec extends MessageCodecAbstract<GameEventAction> {
public DOTASnapshotPlayerCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.UPDATE_POSITIONS, msg -> handle(msg, PlayerPositionsEvent.class, handler::onPositionsUpdate));
}
}
}

public class DOTAGameEventCodec extends MessageCodecAbstract<MessageSubject> {
public DOTAGameEventCodec() {
super(LiveDataMessage::getSubject);
addHandler(MessageSubject.MATCH, new DOTAGameEventMatchCodec());
addHandler(MessageSubject.PLAYER, new DOTAGameEventPlayerCodec());
addHandler(MessageSubject.TEAM, new DOTAGameEventTeamCodec());
}

private class DOTAGameEventMatchCodec extends MessageCodecAbstract<GameEventAction> {
public DOTAGameEventMatchCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.START_MAP, msg -> handle(msg, MapStartEvent.class, handler::onStartMap));
addHandler(GameEventAction.END_MAP, msg -> handle(msg, MapEndEvent.class, handler::onEndMap));
addHandler(GameEventAction.START_PAUSE, msg -> handle(msg, PauseStartEvent.class, handler::onStartPause));
addHandler(GameEventAction.END_PAUSE, msg -> handle(msg, PauseEndEvent.class, handler::onEndPause));
addHandler(GameEventAction.UPDATE_BUILDINGS, msg ->
handle(msg, BuildingsUpdateEvent.class, handler::onBuildingsUpdate));
addHandler(GameEventAction.SPAWNED_ANCIENT, msg -> handle(msg, AncientSpawnEvent.class, handler::onAncientSpawn));
addHandler(GameEventAction.DROPPED_ITEM, msg -> handle(msg, ItemDropEvent.class, handler::onItemDrop));
addHandler(GameEventAction.SPAWNED_ITEM, msg -> handle(msg, ItemSpawnEvent.class, handler::onItemSpawn));
}
}

private class DOTAGameEventPlayerCodec extends MessageCodecAbstract<GameEventAction> {
public DOTAGameEventPlayerCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.KILL, msg -> handle(msg, HeroDeathEvent.class, handler::onKill));
addHandler(GameEventAction.USED_ABILITY, msg -> handle(msg, AbilityUsedEvent.class, handler::onAbilityUse));
addHandler(GameEventAction.KILLED_ANCIENT, msg -> handle(msg, AncientKillEvent.class, handler::onAncientKill));
addHandler(GameEventAction.USED_CLOAK, msg -> handle(msg, PlayersCloakedEvent.class, handler::onPlayerCloak));
addHandler(GameEventAction.BROKE_CLOAK, msg -> handle(msg, PlayerUncloakedEvent.class, handler::onPlayerUncloak));
addHandler(GameEventAction.SPAWNED, msg -> handle(msg, HeroSpawnEvent.class, handler::onHeroSpawn));
addHandler(GameEventAction.PICKED_UP_ITEM, msg -> handle(msg, ItemPickupEvent.class, handler::onItemPickUp));
addHandler(GameEventAction.CONSUMED_ITEM, msg -> handle(msg, ItemUseEvent.class, handler::onItemUse));
addHandler(GameEventAction.KILLED_MINION, msg -> handle(msg, MinionDeathEvent.class, handler::onMinionDeath));
addHandler(GameEventAction.SPECIAL_KILL, msg -> handle(msg, SpecialKillEvent.class, handler::onSpecialKill));
addHandler(GameEventAction.KILLED_WARD, msg -> handle(msg, WardDestroyedEvent.class, handler::onWardDestroy));
addHandler(GameEventAction.PLACED_WARD, msg -> handle(msg, WardInfo.class, handler::onWardPlace));
addHandler(GameEventAction.EXPIRED_WARD, msg -> handle(msg, WardInfo.class, handler::onWardExpire));
addHandler(GameEventAction.DENIED_ITEM, msg -> handle(msg, ItemDenyEvent.class, handler::onDeniedItem));
addHandler(GameEventAction.REMOVED_ITEM, msg -> handle(msg, ItemRemoveEvent.class, handler::onRemovedItem));
addHandler(GameEventAction.LEVEL_UP, msg -> handle(msg, LevelUpEvent.class, handler::onLevelUp));
addHandler(GameEventAction.BUYBACK, msg -> handle(msg, HeroBuybackEvent.class, handler::onHeroBuyback));
}
}

private class DOTAGameEventTeamCodec extends MessageCodecAbstract<GameEventAction> {
public DOTAGameEventTeamCodec() {
super(LiveDataMessage::getAction);
addHandler(GameEventAction.TOOK_OBJECTIVE, msg -> handle(msg, ObjectiveTakenEvent.class, handler::onObjectiveTaken));
}
}
}
}
final String messagePayloadString = recv(); // receive the message from somewhere

// create the handler
final LiveDataMessageHandlerSimple handler = LiveDataMessageHandlerSimple
.<BayesMessageEnvelope>builder()
.objectMapper(objectMapper)
.lolHandler(matchId -> new MyLolHandler(matchId))
.csgoHandler(matchId -> new MyCsgoHandler(matchId))
.envelopeClass(BayesMessageEnvelope.class)
.build();

// then use it to process events contained in received messages:
handler.handleMessage(messagePayloadString);

Python Implementation Tips

As in the Java example, we are leaving the actual implementation to you, but the high level workflow may look like this.

At the beginning of the processing, we need to parse the outmost data structure of the message, the DataProviderMessage. It contains some useful information about the message, which may help us determine early on if we should start processing this message, skip it, or leave it for later. If all checks pass, we can unpack the payload from the DataProviderMessage and pass it to the next processing function.

# ...
message = get_message() # receive the message from somewhere

def process_data_provider_message(message: dict) -> None:
version = float(message['version'])
check_message_version(version)

seq_idx = message['seqIdx']
check_seq_idx(seq_idx)
payload = message['payload']
return process_bayes_message_envelope(payload)

The next processing function, process_bayes_message_envelope deals with BayesMessageEnvelope structure. The envelope contains game-specific properties. It allows us to do additional checks or update our processing classes with fresh game info before passing the data to the next processing function.

def process_bayes_message_envelope(message: dict) -> None:
game_urn = message['urn']
check_game_urn(game_urn)

add_props = message['additionalProperties']
Processor.update_additional_properties(add_props)

payload = message['payload']
process_live_data_message(payload)

While processing the main data structure, the LiveDataMessage, we have access to the type-subject-action triad. In the example above, we are using the imaginary get_proc_func function to determine the function that should process this Live Data Message payload. This function will do the actual work with the game data. For the GAME_EVENT-PLAYER-KILL message above from the example above, it may increase the kill counter of one player and the death counter of the other. Or it may do anything else with the data, you paid for it, it's yours now!

def process_live_data_message(message: dict) -> None:
type_ = message['type']
subject = message['subject']
action = message['action']
# determine how we should process this message based on the triad:
proc_func = get_proc_func(type_, subject, action)

payload = message['payload']
proc_func(payload)