Package cherrypy :: Package test :: Module test_http
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_http

  1  """Tests for managing HTTP issues (malformed requests, etc).""" 
  2   
  3  import errno 
  4  import mimetypes 
  5  import socket 
  6  import sys 
  7   
  8  import cherrypy 
  9  from cherrypy._cpcompat import HTTPConnection, HTTPSConnection, ntob, py3k 
10 11 12 -def encode_multipart_formdata(files):
13 """Return (content_type, body) ready for httplib.HTTP instance. 14 15 files: a sequence of (name, filename, value) tuples for multipart uploads. 16 """ 17 BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$' 18 L = [] 19 for key, filename, value in files: 20 L.append('--' + BOUNDARY) 21 L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % 22 (key, filename)) 23 ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream' 24 L.append('Content-Type: %s' % ct) 25 L.append('') 26 L.append(value) 27 L.append('--' + BOUNDARY + '--') 28 L.append('') 29 body = '\r\n'.join(L) 30 content_type = 'multipart/form-data; boundary=%s' % BOUNDARY 31 return content_type, body
32 33 34 from cherrypy.test import helper
35 36 37 -class HTTPTests(helper.CPWebCase):
38
39 - def make_connection(self):
40 if self.scheme == "https": 41 return HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 42 else: 43 return HTTPConnection('%s:%s' % (self.interface(), self.PORT))
44
45 - def setup_server():
46 class Root: 47 48 def index(self, *args, **kwargs): 49 return "Hello world!"
50 index.exposed = True 51 52 def no_body(self, *args, **kwargs): 53 return "Hello world!"
54 no_body.exposed = True 55 no_body._cp_config = {'request.process_request_body': False} 56 57 def post_multipart(self, file): 58 """Return a summary ("a * 65536\nb * 65536") of the uploaded 59 file. 60 """ 61 contents = file.file.read() 62 summary = [] 63 curchar = None 64 count = 0 65 for c in contents: 66 if c == curchar: 67 count += 1 68 else: 69 if count: 70 if py3k: 71 curchar = chr(curchar) 72 summary.append("%s * %d" % (curchar, count)) 73 count = 1 74 curchar = c 75 if count: 76 if py3k: 77 curchar = chr(curchar) 78 summary.append("%s * %d" % (curchar, count)) 79 return ", ".join(summary) 80 post_multipart.exposed = True 81 82 @cherrypy.expose 83 def post_filename(self, myfile): 84 '''Return the name of the file which was uploaded.''' 85 return myfile.filename 86 87 cherrypy.tree.mount(Root()) 88 cherrypy.config.update({'server.max_request_body_size': 30000000}) 89 setup_server = staticmethod(setup_server) 90
91 - def test_no_content_length(self):
92 # "The presence of a message-body in a request is signaled by the 93 # inclusion of a Content-Length or Transfer-Encoding header field in 94 # the request's message-headers." 95 # 96 # Send a message with neither header and no body. Even though 97 # the request is of method POST, this should be OK because we set 98 # request.process_request_body to False for our handler. 99 c = self.make_connection() 100 c.request("POST", "/no_body") 101 response = c.getresponse() 102 self.body = response.fp.read() 103 self.status = str(response.status) 104 self.assertStatus(200) 105 self.assertBody(ntob('Hello world!')) 106 107 # Now send a message that has no Content-Length, but does send a body. 108 # Verify that CP times out the socket and responds 109 # with 411 Length Required. 110 if self.scheme == "https": 111 c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 112 else: 113 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 114 c.request("POST", "/") 115 response = c.getresponse() 116 self.body = response.fp.read() 117 self.status = str(response.status) 118 self.assertStatus(411)
119
120 - def test_post_multipart(self):
121 alphabet = "abcdefghijklmnopqrstuvwxyz" 122 # generate file contents for a large post 123 contents = "".join([c * 65536 for c in alphabet]) 124 125 # encode as multipart form data 126 files = [('file', 'file.txt', contents)] 127 content_type, body = encode_multipart_formdata(files) 128 body = body.encode('Latin-1') 129 130 # post file 131 c = self.make_connection() 132 c.putrequest('POST', '/post_multipart') 133 c.putheader('Content-Type', content_type) 134 c.putheader('Content-Length', str(len(body))) 135 c.endheaders() 136 c.send(body) 137 138 response = c.getresponse() 139 self.body = response.fp.read() 140 self.status = str(response.status) 141 self.assertStatus(200) 142 self.assertBody(", ".join(["%s * 65536" % c for c in alphabet]))
143
144 - def test_post_filename_with_commas(self):
145 '''Testing that we can handle filenames with commas. This was 146 reported as a bug in: 147 https://bitbucket.org/cherrypy/cherrypy/issue/1146/''' 148 # We'll upload a bunch of files with differing names. 149 for fname in ['boop.csv', 'foo, bar.csv', 'bar, xxxx.csv', 'file"name.csv']: 150 files = [('myfile', fname, 'yunyeenyunyue')] 151 content_type, body = encode_multipart_formdata(files) 152 body = body.encode('Latin-1') 153 154 # post file 155 c = self.make_connection() 156 c.putrequest('POST', '/post_filename') 157 c.putheader('Content-Type', content_type) 158 c.putheader('Content-Length', str(len(body))) 159 c.endheaders() 160 c.send(body) 161 162 response = c.getresponse() 163 self.body = response.fp.read() 164 self.status = str(response.status) 165 self.assertStatus(200) 166 self.assertBody(fname)
167
168 - def test_malformed_request_line(self):
169 if getattr(cherrypy.server, "using_apache", False): 170 return self.skip("skipped due to known Apache differences...") 171 172 # Test missing version in Request-Line 173 c = self.make_connection() 174 c._output(ntob('GET /')) 175 c._send_output() 176 if hasattr(c, 'strict'): 177 response = c.response_class(c.sock, strict=c.strict, method='GET') 178 else: 179 # Python 3.2 removed the 'strict' feature, saying: 180 # "http.client now always assumes HTTP/1.x compliant servers." 181 response = c.response_class(c.sock, method='GET') 182 response.begin() 183 self.assertEqual(response.status, 400) 184 self.assertEqual(response.fp.read(22), ntob("Malformed Request-Line")) 185 c.close()
186
187 - def test_request_line_split_issue_1220(self):
188 Request_URI = "/index?intervenant-entreprise-evenement_classaction=evenement-mailremerciements&_path=intervenant-entreprise-evenement&intervenant-entreprise-evenement_action-id=19404&intervenant-entreprise-evenement_id=19404&intervenant-entreprise_id=28092" 189 self.assertEqual(len("GET %s HTTP/1.1\r\n" % Request_URI), 256) 190 self.getPage(Request_URI) 191 self.assertBody("Hello world!")
192
193 - def test_malformed_header(self):
194 c = self.make_connection() 195 c.putrequest('GET', '/') 196 c.putheader('Content-Type', 'text/plain') 197 # See https://bitbucket.org/cherrypy/cherrypy/issue/941 198 c._output(ntob('Re, 1.2.3.4#015#012')) 199 c.endheaders() 200 201 response = c.getresponse() 202 self.status = str(response.status) 203 self.assertStatus(400) 204 self.body = response.fp.read(20) 205 self.assertBody("Illegal header line.")
206
207 - def test_http_over_https(self):
208 if self.scheme != 'https': 209 return self.skip("skipped (not running HTTPS)... ") 210 211 # Try connecting without SSL. 212 conn = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 213 conn.putrequest("GET", "/", skip_host=True) 214 conn.putheader("Host", self.HOST) 215 conn.endheaders() 216 response = conn.response_class(conn.sock, method="GET") 217 try: 218 response.begin() 219 self.assertEqual(response.status, 400) 220 self.body = response.read() 221 self.assertBody("The client sent a plain HTTP request, but this " 222 "server only speaks HTTPS on this port.") 223 except socket.error: 224 e = sys.exc_info()[1] 225 # "Connection reset by peer" is also acceptable. 226 if e.errno != errno.ECONNRESET: 227 raise
228
229 - def test_garbage_in(self):
230 # Connect without SSL regardless of server.scheme 231 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 232 c._output(ntob('gjkgjklsgjklsgjkljklsg')) 233 c._send_output() 234 response = c.response_class(c.sock, method="GET") 235 try: 236 response.begin() 237 self.assertEqual(response.status, 400) 238 self.assertEqual(response.fp.read(22), 239 ntob("Malformed Request-Line")) 240 c.close() 241 except socket.error: 242 e = sys.exc_info()[1] 243 # "Connection reset by peer" is also acceptable. 244 if e.errno != errno.ECONNRESET: 245 raise
246