Custom module to count how much traffic users have done in a X range of seconds

I am currently trying to build a module to improve accounting of the streams made by Wowza.

Software versions used to build this module:

Wowza version: 4

JDK: 1.8

Rationale:

I want to build a module that:

  • Counts how much traffic users have done in a range of seconds e.g 10 (disconnect/stop should count as well)

  • Available for RTMP, RTP(RTSP) and HTTP (HLS/HDS/DASH/Smooth)

    Problems:

    I would like to know what is a proper way to count all those protocol in the same module.

    What I have so far:

    package com.mycompany.wms.module;
    import java.util.Timer;
    import java.util.TimerTask;
    import com.wowza.util.IOPerformanceCounter;
    import com.wowza.wms.application.*;
    import com.wowza.wms.amf.*;
    import com.wowza.wms.client.*;
    import com.wowza.wms.httpstreamer.model.IHTTPStreamerSession;
    import com.wowza.wms.media.model.MediaCodecInfoAudio;
    import com.wowza.wms.media.model.MediaCodecInfoVideo;
    import com.wowza.wms.module.*;
    import com.wowza.wms.request.*;
    import com.wowza.wms.stream.IMediaStream;
    import com.wowza.wms.stream.IMediaStreamActionNotify3;
    public class AccountingModule extends ModuleBase {
    	IApplicationInstance appInstance;
      
    	public void onAppStart(IApplicationInstance appInstance) {
    		this.appInstance = appInstance;
    		String fullname = appInstance.getApplication().getName() + "/"
    				+ appInstance.getName();
    		getLogger().info("onAppStart: " + fullname);
    	}
    	public void onAppStop(IApplicationInstance appInstance) {
    		String fullname = appInstance.getApplication().getName() + "/"
    				+ appInstance.getName();
    		getLogger().info("onAppStop: " + fullname);
    	}
    	public void onConnect(IClient client, RequestFunction function,
    			AMFDataList params) {
    		getLogger().info("onConnect: " + client.getClientId());
    	}
    	public void onConnectAccept(IClient client) {
    		getLogger().info("onConnectAccept: " + client.getClientId());
    	}
    	public void onConnectReject(IClient client) {
    		getLogger().info("onConnectReject: " + client.getClientId());
    	}
    	public void onDisconnect(IClient client) {
    		getLogger().info("onDisconnect: " + client.getClientId());
    	}
    	class StreamListener implements IMediaStreamActionNotify3 {
    		@Override
    		public void onMetaData(IMediaStream stream, AMFPacket metaDataPacket) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onPauseRaw(IMediaStream stream, boolean isPause,
    				double location) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onPause(IMediaStream stream, boolean isPause,
    				double location) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onPlay(IMediaStream stream, String streamName,
    				double playStart, double playLen, int playReset) {
    			getLogger().info(
    					"onPlay " + stream.getContextStr() +
    							" playStart: " + playStart + 
    							" playLen: " + playLen +
    							" playReset: "+ playReset);
    			StreamStats watchdog = new StreamStats();
    			IApplicationInstance appInstance;
    			try {
    				try {
    					appInstance = stream.getClient().getAppInstance();
    				} catch (Exception ex) {
    					appInstance = stream.getRTPStream().getAppInstance();
    				}
    				watchdog.start();
    				appInstance.getProperties().setProperty(streamName, watchdog);
    			} catch (Exception ex) {
    			}
    		}
    		@Override
    		public void onPublish(IMediaStream stream, String streamName,
    				boolean isRecord, boolean isAppend) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onSeek(IMediaStream stream, double location) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onStop(IMediaStream stream) {
    			getLogger().info("onStop By: " + stream.getClientId());
    			String streamName = stream.getName();
    			StreamStats watchdog = (StreamStats) stream.getClient()
    					.getAppInstance().getProperties().getProperty(streamName);
    			if (watchdog != null)
    				watchdog.stop();
    		}
    		@Override
    		public void onUnPublish(IMediaStream stream, String streamName,
    				boolean isRecord, boolean isAppend) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onCodecInfoAudio(IMediaStream stream,
    				MediaCodecInfoAudio codecInfoAudio) {
    			// TODO Auto-generated method stub
    		}
    		@Override
    		public void onCodecInfoVideo(IMediaStream stream,
    				MediaCodecInfoVideo codecInfoVideo) {
    			// TODO Auto-generated method stub
    		}
    	}
    	@SuppressWarnings("unchecked")
    	public void onStreamCreate(IMediaStream stream) {
    		getLogger().info("onStreamCreate: " + stream);
    		IMediaStreamActionNotify3 actionNotify = new StreamListener();
    		WMSProperties props = stream.getProperties();
    		synchronized (props) {
    			props.put("streamActionNotifier", actionNotify);
    		}
    		stream.addClientListener(actionNotify);
    	}
    	public void onStreamDestroy(IMediaStream stream) {
    		getLogger().info("onStreamDestroy: " + stream);
    		IMediaStreamActionNotify3 actionNotify = null;
    		WMSProperties props = stream.getProperties();
    		synchronized (props) {
    			actionNotify = (IMediaStreamActionNotify3) stream.getProperties()
    					.get("streamActionNotifier");
    		}
    		if (actionNotify != null) {
    			stream.removeClientListener(actionNotify);
    			getLogger().info("remoteClientListener: " + stream.getSrc());
    		}
    	}
    	public void onHTTPSessionCreate(IHTTPStreamerSession httpSession) {
    		String streamName = httpSession.getStreamName();
    		getLogger().info("Stream Name: " + streamName);
    		StreamStats watchdog = new StreamStats();
    		try {
    			watchdog.session = httpSession;
    			watchdog.start();
    			this.appInstance.getProperties().setProperty(
    					streamName + httpSession.getSessionId(), watchdog);
    		} catch (Exception ex) {
    		}
    	}
    	private class StreamStats {
    		public Timer mTimer;
    		public TimerTask mTask;
    		public IHTTPStreamerSession session;
      	
    		public StreamStats() {
    			mTask = new TimerTask() {
    				public void run() {
    					getLogger().info("Run StreamStats: " + session.getStreamName());
    					if (session != null) {
    						IOPerformanceCounter perf = session
    								.getIOPerformanceCounter();	
    						getLogger().info(
    								"Bytes loaded till now: " + perf.getMessagesOutBytes());
    					}
    				}
    			};
    		}
    		public void start() {
    			if (mTimer == null)
    				mTimer = new Timer();
    			mTimer.schedule(mTask, 10000, 10000);
    			getLogger().info("Start StreamStats");
    		}
    		public void stop() {
    			if (mTimer != null) {
    				mTimer.cancel();
    				mTimer = null;
    				getLogger().info("Stop StreamStats");
    			}
    		}
    	}
    }
    

Hello

One example would be to check out our connection counts http provider.

Thanks

Matt

Would I be able to count traffic (bytes) with that example if using JWPlayer? (rtmp://[wowza-ipaddress]/…/live.sdp)

Hey matt_y my module looks like it now:

package com.mycompany.wms.module;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import com.wowza.util.IOPerformanceCounter;
import com.wowza.wms.amf.AMFPacket;
import com.wowza.wms.application.IApplicationInstance;
import com.wowza.wms.application.WMSProperties;
import com.wowza.wms.httpstreamer.model.IHTTPStreamerSession;
import com.wowza.wms.media.model.MediaCodecInfoAudio;
import com.wowza.wms.media.model.MediaCodecInfoVideo;
import com.wowza.wms.module.ModuleBase;
import com.wowza.wms.stream.IMediaStream;
import com.wowza.wms.stream.IMediaStreamActionNotify3;
 
 
public class AccountingModule extends ModuleBase {
	
	IApplicationInstance appInstance;
	
	public void onAppStart(IApplicationInstance appInstance) {
		this.appInstance = appInstance;
	}
	
	public void onStreamDestroy(IMediaStream stream) {
		getLogger().info("onStreamDestroy");
		
		IMediaStreamActionNotify3 actionNotify = null;
		WMSProperties props = stream.getProperties();
		synchronized (props) {
			actionNotify = (IMediaStreamActionNotify3)stream.getProperties().get("streamActionNotifier");
		}
		if (actionNotify != null) {
			stream.removeClientListener(actionNotify);
			getLogger().info("removeClientListener " + stream.getSrc());
		}
	}
	
	@SuppressWarnings("unchecked")
	public void onStreamCreate(IMediaStream stream) {
		getLogger().info("onStreamCreate");
		
		IMediaStreamActionNotify3 actionNotify = new StreamListener();
		WMSProperties props = stream.getProperties();
		synchronized (props) {
			props.put("streamActionNotifier", actionNotify);
		}
		stream.addClientListener(actionNotify);
	}
	
	class StreamListener implements IMediaStreamActionNotify3{
		@Override
		public void onMetaData(IMediaStream stream, AMFPacket metaDataPacket) {
			// TODO Auto-generated method stub
			
		}
		@Override
		public void onPauseRaw(IMediaStream stream, boolean isPause,
				double location) {
			// TODO Auto-generated method stub
			
		}
		@Override
		public void onPause(IMediaStream arg0, boolean arg1, double arg2) {
			// TODO Auto-generated method stub
			
		}
		@Override
		public void onPlay(IMediaStream stream, String streamName, double playStart, double playLen, int playReset) {
			getLogger().info("onPlay " + streamName);
			StreamStats watchdog = new StreamStats();
			IApplicationInstance appInstance;
			
			try {
				try {
					appInstance = stream.getClient().getAppInstance();
				} catch (Exception ex) {
					appInstance = stream.getRTPStream().getAppInstance();
				}
				
				watchdog.stream = stream;
				watchdog.start();
				appInstance.getProperties().setProperty(streamName, watchdog);
			} catch (Exception ex) {
				
			}
		}
		@Override
		public void onUnPublish(IMediaStream arg0, String arg1, boolean arg2,
				boolean arg3) {
			// TODO Auto-generated method stub
			
		}
		
		@Override
		public void onPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend) {
			// TODO Auto-generated method stub
			
		}
		@Override
		public void onSeek(IMediaStream arg0, double arg1) {
			// TODO Auto-generated method stub
			
		}
		@Override
		public void onStop(IMediaStream stream) {
			getLogger().info("onStop");
			
			String streamName = stream.getName();
			StreamStats watchdog = (StreamStats)stream.getClient().getAppInstance().getProperties().getProperty(streamName);
			if (watchdog != null)
				getLogger().info("Stop Watchdog");
				watchdog.stop();
		}
		@Override
		public void onCodecInfoAudio(IMediaStream stream,
				MediaCodecInfoAudio codecInfoAudio) {
			// TODO Auto-generated method stub
			
		}
		@Override
		public void onCodecInfoVideo(IMediaStream stream,
				MediaCodecInfoVideo codecInfoVideo) {
			// TODO Auto-generated method stub
			
		}
		
	}
	
	public void onHTTPSessionDestroy(IHTTPStreamerSession httpSession) {
		getLogger().info("onHTTPSessionDestroy");
		
		String streamName = httpSession.getStreamName();
		StreamStats watchdog = (StreamStats)this.appInstance.getProperties().getProperty(streamName + httpSession.getSessionId());
		if (watchdog != null)
			getLogger().info("Stop Watchdog");
			watchdog.stop();
	}
	
	public void onHTTPSessionCreate(IHTTPStreamerSession httpSession) {
		getLogger().info("onHTTPSessionCreate");
		
		String streamName = httpSession.getStreamName();
		getLogger().info("Stream Name: " + streamName);
		StreamStats watchdog = new StreamStats();
		
		try {
			watchdog.session = httpSession;
			watchdog.start();
			this.appInstance.getProperties().setProperty(streamName + httpSession.getSessionId(), watchdog);
		} catch (Exception ex) {
		}
	}
 
	private class StreamStats
	{
		public Timer mTimer;
		public TimerTask mTask;
		public IMediaStream stream;
		public IHTTPStreamerSession session;
		
		public String connectionId;
		public long bytesOut;
		public long bytesNow;
		
		Map<String, Long> usersMap = new HashMap<String, Long>();
		
		public StreamStats() {
			mTask = new TimerTask() {
				public void run() {
					getLogger().info("Run StreamStats");
					if (session != null) {
						IOPerformanceCounter perf = session.getIOPerformanceCounter();
						getLogger().info("bytesOut " + perf.getMessagesOutBytes());
						bytesOut = perf.getMessagesOutBytes();
						
						connectionId = session.getSessionId();
					}
					if (stream != null) {
						IOPerformanceCounter perf = stream.getMediaIOPerformance();
						getLogger().info("bytesOut " + perf.getMessagesOutBytes());
					
						connectionId = Long.toString(stream.getClientId());
						bytesOut = perf.getMessagesOutBytes();
					}
					
					if (usersMap.containsKey(connectionId)) {
						bytesNow = bytesOut - usersMap.get(connectionId);
					} else {
						bytesNow = bytesOut - 0;
					}
					usersMap.put(connectionId, bytesOut);
					
					getLogger().info("connectionId " + connectionId);
					getLogger().info("bytesNow " + bytesNow);
				}
			};
		}
		
		public void start() {
			if (mTimer == null)
				mTimer = new Timer();
			mTimer.schedule(mTask, 10000, 10000);
			getLogger().info("Start StreamStats");
			
		}
		
		public void stop() {
			if (mTimer != null) {
				mTimer.cancel();
				mTimer = null;
				getLogger().info("Stop StreamStats");
			}
		}
	}
}

Do you see any problem with this?

I am not sure if this is the best way to uniform RTMP, RTP, HTTP in one module.

Now I get what you meant talking about HTTPProvider.

Implementing something like that, how to avoid the HTTP overhead? Is there any way to use that module (example provided in the link) without the HTTP overhead? Like calling from other class? There is the timer (range of seconds) problem too. What are your thoughts?