3.1 接口IEvent.java
接口IEvent.java的代码如下:
1. package com.amigo.rtsp;
2. import java.io.IOException;
3. import java.nio.channels.SelectionKey;
4. /**
5. * IEvent.java 网络事件处理器,当Selector可以进行操作时,调用这个接口中的方法.
6. * 2007-3-22 下午03:35:51
7. * @author sycheng
8. * @version 1.0
9. */
10. public interface IEvent {
11. /**
12. * 当channel得到connect事件时调用这个方法.
13. * @param key
14. * @throws IOException
15. */
16. void connect(SelectionKey key) throws IOException;
17. /**
18. * 当channel可读时调用这个方法.
19. * @param key
20. * @throws IOException
21. */
22. void read(SelectionKey key) throws IOException;
23. /**
24. * 当channel可写时调用这个方法.
25. * @throws IOException
26. */
27. void write() throws IOException;
28. /**
29. * 当channel发生错误时调用.
30. * @param e
31. */
32. void error(Exception e);
33. }
3.2 RTSP的测试类:RTSPClient.java
RTSP的测试类RTSPClient.java类的代码如下所示:
1. package com.amigo.rtsp;
2. import java.io.IOException;
3. import java.net.InetSocketAddress;
4. import java.nio.ByteBuffer;
5. import java.nio.channels.SelectionKey;
6. import java.nio.channels.Selector;
7. import java.nio.channels.SocketChannel;
8. import java.util.Iterator;
9. import java.util.concurrent.atomic.AtomicBoolean;
10. public class RTSPClient extends Thread implements IEvent {
11. private static final String VERSION = " RTSP/1.0/r/n";
12. private static final String RTSP_OK = "RTSP/1.0 200 OK";
13. /** 远程地址 */
14. private final InetSocketAddress remoteAddress;
15. /** * 本地地址 */
16. private final InetSocketAddress localAddress;
17. /** * 连接通道 */
18. private SocketChannel socketChannel;
19. /** 发送缓冲区 */
20. private final ByteBuffer sendBuf;
21. /** 接收缓冲区 */
22. private final ByteBuffer receiveBuf;
23. private static final int BUFFER_SIZE = 8192;
24. /** 端口选择器 */
25. private Selector selector;
26. private String address;
27. private Status sysStatus;
28. private String sessionid;
29. /** 线程是否结束的标志 */
30. private AtomicBoolean shutdown;
31. private int seq=1;
32. private boolean isSended;
33. private String trackInfo;
34. private enum Status {
35. init, options, describe, setup, play, pause, teardown
36. }
37. public RTSPClient(InetSocketAddress remoteAddress,
38. InetSocketAddress localAddress, String address) {
39. this.remoteAddress = remoteAddress;
40. this.localAddress = localAddress;
41. this.address = address;
42. // 初始化缓冲区
43. sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
44. receiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
45. if (selector == null) {
46. // 创建新的Selector
47. try {
48. selector = Selector.open();
49. } catch (final IOException e) {
50. e.printStackTrace();
51. }
52. }
53. startup();
54. sysStatus = Status.init;
55. shutdown=new AtomicBoolean(false);
56. isSended=false;
57. }
58. public void startup() {
59. try {
60. // 打开通道
61. socketChannel = SocketChannel.open();
62. // 绑定到本地端口
63. socketChannel.socket().setSoTimeout(30000);
64. socketChannel.configureBlocking(false);
65. socketChannel.socket().bind(localAddress);
66. if (socketChannel.connect(remoteAddress)) {
67. System.out.println("开始建立连接:" + remoteAddress);
68. }
69. socketChannel.register(selector, SelectionKey.OP_CONNECT
70. | SelectionKey.OP_READ | SelectionKey.OP_WRITE, this);
71. System.out.println("端口打开成功");
72. } catch (final IOException e1) {
73. e1.printStackTrace();
74. }
75. }
76. public void send(byte[] out) {
77. if (out == null || out.length < 1) {
78. return;
79. }
80. synchronized (sendBuf) {
81. sendBuf.clear();
82. sendBuf.put(out);
83. sendBuf.flip();
84. }
85. // 发送出去
86. try {
87. write();
88. isSended=true;
89. } catch (final IOException e) {
90. e.printStackTrace();
91. }
92. }
93. public void write() throws IOException {
94. if (isConnected()) {
95. try {
96. socketChannel.write(sendBuf);
97. } catch (final IOException e) {
98. }
99. } else {
100. System.out.println("通道为空或者没有连接上");
101. }
102. }
103. public byte[] recieve() {
104. if (isConnected()) {
105. try {
106. int len = 0;
107. int readBytes = 0;
108. synchronized (receiveBuf) {
109. receiveBuf.clear();
110. try {
111. while ((len = socketChannel.read(receiveBuf)) > 0) {
112. readBytes += len;
113. }
114. } finally {
115. receiveBuf.flip();
116. }
117. if (readBytes > 0) {
118. final byte[] tmp = new byte[readBytes];
119. receiveBuf.get(tmp);
120. return tmp;
121. } else {
122. System.out.println("接收到数据为空,重新启动连接");
123. return null;
124. }
125. }
126. } catch (final IOException e) {
127. System.out.println("接收消息错误:");
128. }
129. } else {
130. System.out.println("端口没有连接");
131. }
132. return null;
133. }
134. public boolean isConnected() {
135. return socketChannel != null && socketChannel.isConnected();
136. }
137. private void select() {
138. int n = 0;
139. try {
140. if (selector == null) {
141. return;
142. }
143. n = selector.select(1000);
144. } catch (final Exception e) {
145. e.printStackTrace();
146. }
147. // 如果select返回大于0,处理事件
148. if (n > 0) {
149. for (final Iterator<SelectionKey> i = selector.selectedKeys()
150. .iterator(); i.hasNext();) {
151. // 得到下一个Key
152. final SelectionKey sk = i.next();
153. i.remove();
154. // 检查其是否还有效
155. if (!sk.isValid()) {
156. continue;
157. }
158. // 处理事件
159. final IEvent handler = (IEvent) sk.attachment();
160. try {
161. if (sk.isConnectable()) {
162. handler.connect(sk);
163. } else if (sk.isReadable()) {
164. handler.read(sk);
165. } else {
166. // System.err.println("Ooops");
167. }
168. } catch (final Exception e) {
169. handler.error(e);
170. sk.cancel();
171. }
172. }
173. }
174. }
175. public void shutdown() {
176. if (isConnected()) {
177. try {
178. socketChannel.close();
179. System.out.println("端口关闭成功");
180. } catch (final IOException e) {
181. System.out.println("端口关闭错误:");
182. } finally {
183. socketChannel = null;
184. }
185. } else {
186. System.out.println("通道为空或者没有连接");
187. }
188. }
189. @Override
190. public void run() {
191. // 启动主循环流程
192. while (!shutdown.get()) {
193. try {
194. if (isConnected()&&(!isSended)) {
195. switch (sysStatus) {
196. case init:
197. doOption();
198. break;
199. case options:
200. doDescribe();
201. break;
202. case describe:
203. doSetup();
204. break;
205. case setup:
206. if(sessionid==null&&sessionid.length()>0){
207. System.out.println("setup还没有正常返回");
208. }else{
209. doPlay();
210. }
211. break;
212. case play:
213. doPause();
214. break;
215. case pause:
216. doTeardown();
217. break;
218. default:
219. break;
220. }
221. }
222. // do select
223. select();
224. try {
225. Thread.sleep(1000);
226. } catch (final Exception e) {
227. }
228. } catch (final Exception e) {
229. e.printStackTrace();
230. }
231. }
232. shutdown();
233. }
234. public void connect(SelectionKey key) throws IOException {
235. if (isConnected()) {
236. return;
237. }
238. // 完成SocketChannel的连接
239. socketChannel.finishConnect();
240. while (!socketChannel.isConnected()) {
241. try {
242. Thread.sleep(300);
243. } catch (final InterruptedException e) {
244. e.printStackTrace();
245. }
246. socketChannel.finishConnect();
247. }
248. }
249. public void error(Exception e) {
250. e.printStackTrace();
251. }
252. public void read(SelectionKey key) throws IOException {
253. // 接收消息
254. final byte[] msg = recieve();
255. if (msg != null) {
256. handle(msg);
257. } else {
258. key.cancel();
259. }
260. }
261. private void handle(byte[] msg) {
262. String tmp = new String(msg);
263. System.out.println("返回内容:");
264. System.out.println(tmp);
265. if (tmp.startsWith(RTSP_OK)) {
266. switch (sysStatus) {
267. case init:
268. sysStatus = Status.options;
269. break;
270. case options:
271. sysStatus = Status.describe;
272. trackInfo=tmp.substring(tmp.indexOf("trackID"));
273. break;
274. case describe:
275. sessionid = tmp.substring(tmp.indexOf("Session: ") + 9, tmp
276. .indexOf("Date:"));
277. if(sessionid!=null&&sessionid.length()>0){
278. sysStatus = Status.setup;
279. }
280. break;
281. case setup:
282. sysStatus = Status.play;
283. break;
284. case play:
285. sysStatus = Status.pause;
286. break;
287. case pause:
288. sysStatus = Status.teardown;
289. shutdown.set(true);
290. break;
291. case teardown:
292. sysStatus = Status.init;
293. break;
294. default:
295. break;
296. }
297. isSended=false;
298. } else {
299. System.out.println("返回错误:" + tmp);
300. }
301. }
302. private void doTeardown() {
303. StringBuilder sb = new StringBuilder();
304. sb.append("TEARDOWN ");
305. sb.append(this.address);
306. sb.append("/");
307. sb.append(VERSION);
308. sb.append("Cseq: ");
309. sb.append(seq++);
310. sb.append("/r/n");
311. sb.append("User-Agent: RealMedia Player HelixDNAClient/10.0.0.11279 (win32)/r/n");
312. sb.append("Session: ");
313. sb.append(sessionid);
314. sb.append("/r/n");
315. send(sb.toString().getBytes());
316. System.out.println(sb.toString());
317. }
318. private void doPlay() {
319. StringBuilder sb = new StringBuilder();
320. sb.append("PLAY ");
321. sb.append(this.address);
322. sb.append(VERSION);
323. sb.append("Session: ");
324. sb.append(sessionid);
325. sb.append("Cseq: ");
326. sb.append(seq++);
327. sb.append("/r/n");
328. sb.append("/r/n");
329. System.out.println(sb.toString());
330. send(sb.toString().getBytes());
331. }
332. private void doSetup() {
333. StringBuilder sb = new StringBuilder();
334. sb.append("SETUP ");
335. sb.append(this.address);
336. sb.append("/");
337. sb.append(trackInfo);
338. sb.append(VERSION);
339. sb.append("Cseq: ");
340. sb.append(seq++);
341. sb.append("/r/n");
342. sb.append("Transport: RTP/AVP;UNICAST;client_port=16264-16265;mode=play/r/n");
343. sb.append("/r/n");
344. System.out.println(sb.toString());
345. send(sb.toString().getBytes());
346. }
347. private void doOption() {
348. StringBuilder sb = new StringBuilder();
349. sb.append("OPTIONS ");
350. sb.append(this.address.substring(0, address.lastIndexOf("/")));
351. sb.append(VERSION);
352. sb.append("Cseq: ");
353. sb.append(seq++);
354. sb.append("/r/n");
355. sb.append("/r/n");
356. System.out.println(sb.toString());
357. send(sb.toString().getBytes());
358. }
359. private void doDescribe() {
360. StringBuilder sb = new StringBuilder();
361. sb.append("DESCRIBE ");
362. sb.append(this.address);
363. sb.append(VERSION);
364. sb.append("Cseq: ");
365. sb.append(seq++);
366. sb.append("/r/n");
367. sb.append("/r/n");
368. System.out.println(sb.toString());
369. send(sb.toString().getBytes());
370. }
371. private void doPause() {
372. StringBuilder sb = new StringBuilder();
373. sb.append("PAUSE ");
374. sb.append(this.address);
375. sb.append("/");
376. sb.append(VERSION);
377. sb.append("Cseq: ");
378. sb.append(seq++);
379. sb.append("/r/n");
380. sb.append("Session: ");
381. sb.append(sessionid);
382. sb.append("/r/n");
383. send(sb.toString().getBytes());
384. System.out.println(sb.toString());
385. }
386. public static void main(String[] args) {
387. try {
388. // RTSPClient(InetSocketAddress remoteAddress,
389. // InetSocketAddress localAddress, String address)
390. RTSPClient client = new RTSPClient(
391. new InetSocketAddress("218.207.101.236", 554),
392. new InetSocketAddress("192.168.2.28", 0),
393. "rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp");
394. client.start();
395. } catch (Exception e) {
396. e.printStackTrace();
397. }
398. }
399. }
其:rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp为我在网上找到的一个rtsp的sdp地址,读者可自行更换,RTSP的默认端口为554.
3.3 运行结果
运行RTSPClient.java,运行结果如下所示:
1. 端口打开成功
2. OPTIONS rtsp://218.207.101.236:554/mobile/3/67A451E937422331 RTSP/1.0
3. Cseq: 1
4. 返回内容:
5. RTSP/1.0 200 OK
6. Server: PVSS/1.4.8 (Build/20090111; Platform/Win32; Release/StarValley; )
7. Cseq: 1
8. Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD
9. DESCRIBE rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp RTSP/1.0
10. Cseq: 2
11. 返回内容:
12. RTSP/1.0 200 OK
13. Server: PVSS/1.4.8 (Build/20090111; Platform/Win32; Release/StarValley; )
14. Cseq: 2
15. Content-length: 421
16. Date: Mon, 03 Aug 2009 08:50:36 GMT
17. Expires: Mon, 03 Aug 2009 08:50:36 GMT
18. Content-Type: application/sdp
19. x-Accept-Retransmit: our-retransmit
20. x-Accept-Dynamic-Rate: 1
21. Content-Base: rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp/
22. v=0
23. o=MediaBox 127992 137813 IN IP4 0.0.0.0
24. s=RTSP Session
25. i=Starv Box Live Cast
26. c=IN IP4 218.207.101.236
27. t=0 0
28. a=range:npt=now-
29. a=control:*
30. m=video 0 RTP/AVP 96
31. b=AS:20
32. a=rtpmap:96 MP4V-ES/1000
33. a=fmtp:96 profile-level-id=8; config=000001b008000001b5090000010000000120008440fa282c2090a31f; decode_buf=12586
34. a=range:npt=now-
35. a=framerate:5
36. a=framesize:96 176-144
37. a=cliprect:0,0,144,176
38. a=control:trackID=1
39. SETUP rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp/trackID=1
40. RTSP/1.0
41. Cseq: 3
42. Transport: RTP/AVP;UNICAST;client_port=16264-16265;mode=play
43. 返回内容:
44. RTSP/1.0 200 OK
45. Server: PVSS/1.4.8 (Build/20090111; Platform/Win32; Release/StarValley; )
46. Cseq: 3
47. Session: 15470472221769
48. Date: Mon, 03 Aug 2009 08:50:36 GMT
49. Expires: Mon, 03 Aug 2009 08:50:36 GMT
50. Transport: RTP/AVP;UNICAST;mode=play;client_port=16264-16265;server_port=20080-20081
51. PLAY rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp RTSP/1.0
52. Session: 15470472221769
53. Cseq: 4
54. 返回内容:
55. RTSP/1.0 200 OK
56. Server: PVSS/1.4.8 (Build/20090111; Platform/Win32; Release/StarValley; )
57. Cseq: 4
58. Session: 15470472221769
59. RTP-Info: url=rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp/trackID=1;seq=0;rtptime=0
60. PAUSE rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp/ RTSP/1.0
61. Cseq: 5
62. Session: 15470472221769
63. 返回内容:
64. RTSP/1.0 200 OK
65. Server: PVSS/1.4.8 (Build/20090111; Platform/Win32; Release/StarValley; )
66. Cseq: 5
67. Session: 15470472221769
68. TEARDOWN rtsp://218.207.101.236:554/mobile/3/67A451E937422331/8jH5QPU5GWS07Ugn.sdp/ RTSP/1.0
69. Cseq: 6
70. User-Agent: RealMedia Player HelixDNAClient/10.0.0.11279 (win32)
71. Session: 15470472221769
72. 返回内容:
73. RTSP/1.0 200 OK
74. Server: PVSS/1.4.8 (Build/20090111; Platform/Win32; Release/StarValley; )
75. Cseq: 6
76. Session: 15470472221769
77. Connection: Close
78. 端口关闭成功
对照运行结果,读者可以熟悉RTSP的常用命令